Messaging
Spring Framework为与消息传递系统的集成提供了广泛的支持,从简化使用JMS API JmsTemplate
到完整的基础架构,以异步接收消息。Spring AMQP为高级消息队列协议提供了类似的功能集。Spring Boot还为RabbitTemplate
RabbitMQ 提供了自动配置选项。Spring WebSocket本身包含对STOMP消息传递的支持,Spring Boot通过启动器和少量自动配置支持它。Spring Boot也支持Apache Kafka。
JMS
该javax.jms.ConnectionFactory
接口提供了一种创建javax.jms.Connection
与JMS代理交互的标准方法 。尽管Spring需要 ConnectionFactory
使用JMS,但您通常不需要直接使用它,而是可以依赖于更高级别的消息传递抽象。(有关详细信息,请参阅Spring Framework参考文档的 相关部分。)Spring Boot还会自动配置发送和接收消息所需的基础结构。
ActiveMQ支持
当ActiveMQ在类路径上可用时,Spring Boot也可以配置a ConnectionFactory
。如果代理存在,则会自动启动并配置嵌入式代理(前提是未通过配置指定代理URL)。
如果使用
spring-boot-starter-activemq
,则提供连接或嵌入ActiveMQ实例的必要依赖项,以及与JMS集成的Spring基础结构。
ActiveMQ配置由外部配置属性控制 spring.activemq.*
。例如,您可以在以下部分声明以下部分 application.properties
:
1 | spring.activemq.broker-url=tcp://192.168.1.210:9876 |
默认情况下,使用合理设置CachingConnectionFactory
包装本机ConnectionFactory
,您可以通过以下方式控制外部配置属性 spring.jms.*
:
1 | spring.jms.cache.session-cache-size=5 |
如果您更愿意使用本机池,则可以通过向相应位置添加依赖关系 org.messaginghub:pooled-jms
并进行相应配置来实现JmsPoolConnectionFactory
,如以下示例所示:
1 | spring.activemq.pool.enabled=true |
有关
ActiveMQProperties
更多支持的选项,请参阅 。您还可以注册任意数量的bean,以实现ActiveMQConnectionFactoryCustomizer
更高级的自定义。
默认情况下,ActiveMQ会创建一个目标(如果它尚不存在),以便根据提供的名称解析目标。
Artemis Support
Spring Boot可以ConnectionFactory
在检测到Artemis在类路径上可用时 自动配置。如果代理存在,则会自动启动并配置嵌入式代理(除非已明确设置mode属性)。支持的模式是embedded
(明确表示需要嵌入式代理,如果代理在类路径上不可用则发生错误)和native
(使用netty
传输协议连接到代理)。配置后者后,Spring Boot会ConnectionFactory
使用默认设置配置 连接到本地计算机上运行的代理的连接。
如果使用
spring-boot-starter-artemis
,则提供连接到现有Artemis实例的必要依赖项,以及与JMS集成的Spring基础结构。添加org.apache.activemq:artemis-jms-server
到您的应用程序可让您使用嵌入模式。
Artemis配置由外部配置属性控制 spring.artemis.*
。例如,您可以在以下部分声明以下部分 application.properties
:
1 | spring.artemis.mode=native |
嵌入代理时,您可以选择是否要启用持久性并列出应该可用的目标。这些可以指定为以逗号分隔的列表,使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义类型的bean org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或org.apache.activemq.artemis.jms.server.config.TopicConfiguration
。
默认情况下,使用合理设置CachingConnectionFactory
包装本机ConnectionFactory
,您可以通过以下方式控制外部配置属性 spring.jms.*
:
1 | spring.jms.cache.session-cache-size=5 |
如果您更愿意使用本机池,则可以通过向相应位置添加依赖关系 org.messaginghub:pooled-jms
并进行相应配置来实现JmsPoolConnectionFactory
,如以下示例所示:
1 | spring.artemis.pool.enabled=true |
有关ArtemisProperties
更多支持选项,请参阅
不涉及JNDI查找,并使用name
Artemis配置中的属性或通过配置提供的名称来解析目标名称。
使用JNDI ConnectionFactory
如果您在应用程序服务器中运行应用程序,Spring Boot会尝试ConnectionFactory
使用JNDI 查找JMS 。默认情况下,将检查java:/JmsXA
和java:/XAConnectionFactory
位置。spring.jms.jndi-name
如果需要指定备用位置,可以使用该属性,如以下示例所示:
1 | spring.jms.jndi-name=java:/MyConnectionFactory |
发送消息
Spring JmsTemplate
是自动配置的,你可以直接将它自动装入自己的bean中,如下例所示:
1 | import org.springframework.beans.factory.annotation.Autowired; |
JmsMessagingTemplate
可以以类似的方式注射。如果定义了一个DestinationResolver
或一个MessageConverter
bean,它将自动关联到自动配置JmsTemplate
。
接收消息
当存在JMS基础结构时,可以使用任何bean来注释@JmsListener
以创建侦听器端点。如果JmsListenerContainerFactory
未定义,则自动配置默认值。如果定义了一个DestinationResolver
或一个 MessageConverter
bean,它将自动关联到默认工厂。
默认情况下,默认工厂是事务性的。如果您在JtaTransactionManager
存在a的基础结构中运行 ,则默认情况下它与侦听器容器相关联。如果不是,sessionTransacted
则启用该标志。在后一种情况下,您可以通过添加@Transactional
侦听器方法(或其委托)将本地数据存储事务与传入消息的处理相关联。这确保了在本地事务完成后确认传入消息。这还包括发送已在同一JMS会话上执行的响应消息。
以下组件在someQueue
目标上创建侦听器端点:
1 |
|
有关 更多详细信息,请参阅Javadoc
@EnableJms
。
如果您需要创建更多JmsListenerContainerFactory
实例,或者如果要覆盖默认实例,则Spring Boot提供了一个DefaultJmsListenerContainerFactoryConfigurer
可用于初始化a DefaultJmsListenerContainerFactory
的设置,其设置与自动配置的设置相同。
例如,以下示例公开了另一个使用特定工厂的工厂 MessageConverter
:
1 |
|
然后您可以在任何@JmsListener
注释方法中使用工厂,如下所示:
1 |
|
AMQP
高级消息队列协议(AMQP)是面向消息的中间件的平台中立的线级协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot提供了几种通过RabbitMQ使用AMQP的便利,包括spring-boot-starter-amqp
“Starter”。
RabbitMQ支持
RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring用于RabbitMQ
通过AMQP协议进行通信。
RabbitMQ配置由外部配置属性控制 spring.rabbitmq.*
。例如,您可以在以下部分声明以下部分 application.properties
:
1 | spring.rabbitmq.host=localhost |
如果ConnectionNameStrategy
上下文中存在bean,则它将自动用于命名由自动配置创建的连接ConnectionFactory
。有关RabbitProperties
更多支持的选项,请参阅 。
有关详细信息,请参阅 了解RabbitQQ使用的协议AMQP。
发送消息
Spring的AmqpTemplate
和AmqpAdmin
被自动配置,您可以直接自动装配它们变成自己的豆类,如下面的例子:
1 | import org.springframework.amqp.core.AmqpAdmin; |
RabbitMessagingTemplate
可以以类似的方式注射。如果MessageConverter
定义了bean,它将自动关联到自动配置AmqpTemplate
。
如有必要,任何org.springframework.amqp.core.Queue
定义为bean的都会自动用于在RabbitMQ实例上声明相应的队列。
要重试操作,可以启用重试AmqpTemplate
(例如,在代理连接丢失的情况下):
1 | spring.rabbitmq.template.retry.enabled=true |
默认情况下禁用重试。您还可以RetryTemplate
通过声明RabbitRetryTemplateCustomizer
bean来以编程方式自定义。
接收消息
当Rabbit基础结构存在时,可以使用任何bean来注释 @RabbitListener
以创建侦听器端点。如果RabbitListenerContainerFactory
未定义,SimpleRabbitListenerContainerFactory
则会自动配置默认值,您可以使用该spring.rabbitmq.listener.type
属性切换到直接容器 。如果 定义了a MessageConverter
或MessageRecoverer
bean,它将自动与默认工厂关联。
以下示例组件在someQueue
队列上创建侦听器端点:
1 |
|
有关更多详细信息,请参阅Javadoc
@EnableRabbit
。
如果您需要创建更多RabbitListenerContainerFactory
实例,或者如果要覆盖默认实例,Spring Boot提供了一个SimpleRabbitListenerContainerFactoryConfigurer
和一个 DirectRabbitListenerContainerFactoryConfigurer
可用于初始化aSimpleRabbitListenerContainerFactory
和a DirectRabbitListenerContainerFactory
的设置,其设置与自动配置使用的工厂相同。
您选择的容器类型无关紧要。这两个bean通过自动配置公开。
例如,以下配置类公开了另一个使用特定的工厂MessageConverter
:
1 |
|
然后您可以在任何@RabbitListener
注释方法中使用工厂,如下所示:
1 |
|
您可以启用重试来处理侦听器抛出异常的情况。默认RejectAndDontRequeueRecoverer
使用,但您可以定义MessageRecoverer
自己的。当重试耗尽时,如果代理配置了这样做,则拒绝该消息并将其丢弃或路由到死信交换。默认情况下,禁用重试。您还可以RetryTemplate
通过声明RabbitRetryTemplateCustomizer
bean来以编程方式自定义。
重要:
默认情况下,如果禁用重试并且侦听器抛出异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将
defaultRequeueRejected
属性设置为false
以便尝试零重新传递,或者抛出一个AmqpRejectAndDontRequeueException
信号来表示应该拒绝该消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。
Apache Kafka支持
通过提供spring-kafka
项目的自动配置来支持Apache Kafka。
Kafka配置由外部配置属性控制 spring.kafka.*
。例如,您可以在以下部分声明以下部分 application.properties
:
1 | spring.kafka.bootstrap-servers=localhost:9092 |
要在启动时创建主题,请添加类型的bean
NewTopic
。如果主题已存在,则忽略该bean。
有关更多支持选项,请参阅KafkaProperties
发送消息
Spring KafkaTemplate
是自动配置的,您可以直接在自己的bean中自动装配它,如下例所示:
1 |
|
如果
spring.kafka.producer.transaction-id-prefix
定义了属性,KafkaTransactionManager
则会自动配置a。此外,如果RecordMessageConverter
定义了bean,它将自动与自动配置相关联KafkaTemplate
。
接收消息
当存在Apache Kafka基础结构时,可以使用任何bean来注释 @KafkaListener
以创建侦听器端点。如果KafkaListenerContainerFactory
未定义,则使用定义的键自动配置默认值 spring.kafka.listener.*
。
以下组件在someTopic
主题上创建侦听器端点:
1 |
|
如果KafkaTransactionManager
定义了bean,它将自动关联到容器工厂。类似地,如果一个RecordMessageConverter
,ErrorHandler
或AfterRollbackProcessor
豆被定义,它被自动关联为出厂默认。
ChainedKafkaTransactionManager
必须标记自定义,@Primary
因为它通常引用自动配置的KafkaTransactionManager
bean。
Kafka Streams
Spring for Apache Kafka提供了一个工厂bean来创建一个StreamsBuilder
对象并管理其流的生命周期。Spring Boot会自动配置所需的KafkaStreamsConfiguration
bean,只要kafka-streams
在类路径上,并通过@EnableKafkaStreams
注释启用Kafka Streams 。
启用Kafka Streams意味着必须设置应用程序ID和引导服务器。前者可以使用配置spring.kafka.streams.application-id
,spring.application.name
如果没有设置则默认为默认 值。后者可以全局设置或专门为流而重写。
使用专用属性可以使用其他几个属性; 可以使用spring.kafka.streams.properties
命名空间设置其他任意Kafka属性。有关更多信息,另请参见第34.3.4节“其他Kafka属性”。
要使用工厂bean,只需连接StreamsBuilder
到您的bean,@Bean
如下例所示:
1 |
|
默认情况下,由其StreamBuilder
创建的对象管理的流将自动启动。您可以使用该spring.kafka.streams.auto-startup
属性自定义此行为 。
附加Kafka属性
自动配置支持的属性显示在 附录A,常见应用程序属性中。请注意,在大多数情况下,这些属性(连字符或camelCase)直接映射到Apache Kafka点状属性。有关详细信息,请参阅Apache Kafka文档。
这些属性中的前几个适用于所有组件(生产者,使用者,管理员和流),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka指定重要性为HIGH,MEDIUM或LOW的属性。Spring Boot自动配置支持所有HIGH重要性属性,一些选定的MEDIUM和LOW属性,以及任何没有默认值的属性。
只有Kafka支持的属性的一部分可以直接通过 KafkaProperties
该类获得。如果您希望使用不直接支持的其他属性配置生产者或使用者,请使用以下属性:
1 | spring.kafka.properties.prop.one=first |
这将常见的prop.one
Kafka属性设置为first
(适用于生产者,使用者和管理员),prop.two
admin属性second
,prop.three
使用者属性third
,prop.four
生产者属性fourth
和 prop.five
streams属性fifth
。
您还可以JsonDeserializer
按如下方式配置Spring Kafka :
1 | spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer |
同样,您可以禁用JsonSerializer
在标头中发送类型信息的默认行为:
1 | spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer |
重要:
以这种方式设置的属性将覆盖Spring Boot明确支持的任何配置项。