消息
1. JMS
这javax.jms.ConnectionFactory接口提供了一种创建javax.jms.Connection用于与 JMS 代理交互。
虽然 Spring 需要一个ConnectionFactory要使用 JMS,您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。
(有关详细信息,请参阅 Spring Framework 参考文档的相关部分。
Spring Boot 还会自动配置发送和接收消息所需的基础设施。
1.1. ActiveMQ“经典”支持
当 ActiveMQ“Classic”在类路径上可用时,Spring Boot 还可以配置ConnectionFactory.
如果代理存在,则会自动启动和配置嵌入式代理(前提是未通过配置指定代理 URL,并且未在配置中禁用嵌入式代理)。
如果您使用spring-boot-starter-activemq,提供了连接或嵌入 ActiveMQ“经典”实例所需的依赖项,以及与 JMS 集成的 Spring 基础设施。 |
ActiveMQ“经典”配置由spring.activemq.*.
默认情况下,ActiveMQ“经典”自动配置为使用 VM 传输,该传输会启动嵌入在同一 JVM 实例中的代理。
您可以通过配置spring.activemq.in-memory属性,如以下示例所示:
spring.activemq.in-memory=false
spring:
activemq:
in-memory: false
如果配置代理 URL,则嵌入代理也将被禁用,如以下示例所示:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
如果您想完全控制嵌入式代理,请参阅 ActiveMQ “Classic” 文档以获取更多信息。
默认情况下,一个CachingConnectionFactory包装原生ConnectionFactory使用合理的设置,您可以通过spring.jms.*:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过向org.messaginghub:pooled-jms并配置JmsPoolConnectionFactory因此,如以下示例所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
看ActiveMQProperties以获取更多支持的选项。
您还可以注册任意数量的 bean,以实现ActiveMQConnectionFactoryCustomizer用于更高级的自定义。 |
默认情况下,ActiveMQ“经典”会创建一个目标(如果该目标尚不存在),以便根据其提供的名称解析目标。
1.2. ActiveMQ Artemis 支持
Spring Boot 可以自动配置ConnectionFactory当它检测到 ActiveMQ Artemis 在类路径上可用时。
如果代理存在,则会自动启动和配置嵌入式代理(除非已显式设置 mode 属性)。
支持的模式包括embedded(明确需要嵌入式代理,并且如果代理在类路径上不可用,则应发生错误)和native(要使用netty传输协议)。
配置后者时,Spring Boot 会配置一个ConnectionFactory连接到具有默认设置的本地计算机上运行的代理。
如果您使用spring-boot-starter-artemis,提供了连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础设施。
添加org.apache.activemq:artemis-jms-server允许您使用嵌入式模式。 |
ActiveMQ Artemis 配置由spring.artemis.*.
例如,您可以在application.properties:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
嵌入代理时,您可以选择是否要启用持久性并列出应提供的目标。
这些可以指定为逗号分隔的列表以使用默认选项创建它们,或者您可以定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration或org.apache.activemq.artemis.jms.server.config.TopicConfiguration,分别用于高级队列和主题配置。
默认情况下,一个CachingConnectionFactory包装原生ConnectionFactory使用合理的设置,您可以通过spring.jms.*:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过向org.messaginghub:pooled-jms并配置JmsPoolConnectionFactory因此,如以下示例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
看ArtemisProperties以获取更多支持的选项。
不涉及 JNDI 查找,并且使用其中一种name属性或通过配置提供的名称。
1.3. 使用 JNDI ConnectionFactory
如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试查找 JMSConnectionFactory通过使用 JNDI。
默认情况下,java:/JmsXA和java:/XAConnectionFactory位置被检查。
您可以使用spring.jms.jndi-name属性,如果需要指定替代位置,如以下示例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
1.4. 发送消息
Spring的JmsTemplate是自动配置的,您可以将其直接自动连接到您自己的 bean,如以下示例所示:
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
JmsMessagingTemplate可以以类似的方式注射。
如果DestinationResolver或MessageConverterbean 时,它会自动与自动配置的JmsTemplate. |
1.5. 接收消息
当 JMS 基础设施存在时,可以使用@JmsListener以创建侦听器端点。
如果没有JmsListenerContainerFactory,则会自动配置默认值。
如果DestinationResolver一个MessageConverter或javax.jms.ExceptionListenerbean 被定义,它们会自动与默认工厂相关联。
默认情况下,默认工厂是事务性的。
如果您在JtaTransactionManager存在,则默认情况下它与侦听器容器相关联。
如果没有,则sessionTransacted标志已启用。
在后一种情况下,可以通过添加@Transactional在您的侦听器方法(或其委托)上。
这可确保在本地事务完成后确认传入消息。
这还包括发送已在同一 JMS 会话上执行的响应消息。
以下组件在someQueue目的地:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
看Javadoc 的@EnableJms了解更多详情。 |
如果您需要创建更多JmsListenerContainerFactory实例,或者如果你想覆盖默认值,Spring Boot 提供了一个DefaultJmsListenerContainerFactoryConfigurer可用于初始化DefaultJmsListenerContainerFactory使用与自动配置的设置相同的设置。
例如,以下示例公开了另一个使用特定MessageConverter:
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后您可以在任何@JmsListener-annotated 方法如下所示:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
2. AMQP
高级消息队列协议 (AMQP) 是一种平台中立的线级协议,用于面向消息的中间件。
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。
Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp“入门”。
2.1. RabbitMQ 支持
RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。
弹簧用途RabbitMQ通过 AMQP 协议进行通信。
RabbitMQ 配置由spring.rabbitmq.*.
例如,您可以在application.properties:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用addresses属性:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,host和port属性被忽略。
如果地址使用amqps协议,SSL 支持会自动启用。 |
看RabbitProperties了解更多受支持的基于属性的配置选项。
配置 RabbitMQ 的较低级别详细信息ConnectionFactorySpring AMQP 使用的,定义一个ConnectionFactoryCustomizer豆。
如果ConnectionNameStrategybean 存在于上下文中,它将自动用于命名由自动配置的CachingConnectionFactory.
| 有关更多详细信息,请参阅了解 AMQP,RabbitMQ 使用的协议。 |
2.2. 发送消息
Spring的AmqpTemplate和AmqpAdmin自动配置,您可以将它们直接自动连接到您自己的 bean,如以下示例所示:
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
RabbitMessagingTemplate可以以类似的方式注射。
如果MessageConverterbean 时,它会自动与自动配置的AmqpTemplate. |
如有必要,任何org.springframework.amqp.core.Queue定义为 bean 自动用于在 RabbitMQ 实例上声明相应的队列。
要重试作,您可以在AmqpTemplate(例如,如果代理连接丢失):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下,重试处于禁用状态。
您还可以自定义RetryTemplate通过声明RabbitRetryTemplateCustomizer豆。
如果您需要创建更多RabbitTemplate实例,或者如果你想覆盖默认值,Spring Boot 提供了一个RabbitTemplateConfigurerbean,您可以使用它来初始化RabbitTemplate与自动配置使用的工厂设置相同。
2.3. 向流发送消息
要向特定流发送消息,请指定流的名称,如以下示例所示:
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果MessageConverter,StreamMessageConverter或ProducerCustomizerbean 时,它会自动与自动配置的RabbitStreamTemplate.
如果您需要创建更多RabbitStreamTemplate实例,或者如果你想覆盖默认值,Spring Boot 提供了一个RabbitStreamTemplateConfigurerbean,您可以使用它来初始化RabbitStreamTemplate与自动配置使用的工厂设置相同。
2.4. 接收消息
当 Rabbit 基础设施存在时,任何 bean 都可以使用@RabbitListener以创建侦听器端点。
如果没有RabbitListenerContainerFactory已定义,默认值为SimpleRabbitListenerContainerFactory自动配置,您可以使用spring.rabbitmq.listener.type财产。
如果MessageConverter或MessageRecovererbean 时,它会自动与默认工厂相关联。
以下示例组件在someQueue队列:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
看Javadoc 的@EnableRabbit了解更多详情。 |
如果您需要创建更多RabbitListenerContainerFactory实例,或者如果你想覆盖默认值,Spring Boot 提供了一个SimpleRabbitListenerContainerFactoryConfigurer和DirectRabbitListenerContainerFactoryConfigurer可用于初始化SimpleRabbitListenerContainerFactory和DirectRabbitListenerContainerFactory与自动配置使用的工厂设置相同。
| 您选择哪种容器类型并不重要。 这两个 bean 由自动配置公开。 |
例如,以下配置类公开另一个使用特定MessageConverter:
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后您可以在任何@RabbitListener-annotated 方法,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试来处理侦听器引发异常的情况。
默认情况下,RejectAndDontRequeueRecoverer,但您可以定义一个MessageRecoverer你自己的。
当重试用尽时,消息将被拒绝,并丢弃或路由到死信交换(如果代理配置为这样做)。
默认情况下,重试处于禁用状态。
您还可以自定义RetryTemplate通过声明RabbitRetryTemplateCustomizer豆。
默认情况下,如果禁用重试并且侦听器抛出异常,则将无限期重试传递。
您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为false以便尝试零重新传递或抛出AmqpRejectAndDontRequeueException以表示应拒绝消息。
后者是启用重试并达到最大传递尝试次数时使用的机制。 |
3. Apache Kafka 支持
Apache Kafka 通过提供自动配置spring-kafka项目。
Kafka 配置由spring.kafka.*.
例如,您可以在application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建主题,请添加类型为NewTopic.
如果主题已经存在,则忽略 bean。 |
看KafkaProperties以获取更多支持的选项。
3.1. 发送消息
Spring的KafkaTemplate自动配置,您可以直接在自己的 bean 中自动连接它,如以下示例所示:
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
如果属性spring.kafka.producer.transaction-id-prefix定义时,一个KafkaTransactionManager自动配置。
此外,如果RecordMessageConverterbean 时,它会自动与自动配置的KafkaTemplate. |
3.2. 接收消息
当 Apache Kafka 基础设施存在时,任何 bean 都可以使用@KafkaListener以创建侦听器端点。
如果没有KafkaListenerContainerFactory,则默认的自动配置为spring.kafka.listener.*.
以下组件在someTopic主题:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果KafkaTransactionManagerbean 时,它会自动关联到容器工厂。
同样,如果RecordFilterStrategy,CommonErrorHandler,AfterRollbackProcessor或ConsumerAwareRebalanceListenerbean 时,它会自动与默认工厂相关联。
根据侦听器类型,一个RecordMessageConverter或BatchMessageConverterbean 与默认工厂相关联。
如果只有一个RecordMessageConverterbean 存在于批处理侦听器中,它被包装在BatchMessageConverter.
一个定制ChainedKafkaTransactionManager必须标记@Primary因为它通常引用自动配置的KafkaTransactionManager豆。 |
3.3. Kafka 流
Spring for Apache Kafka 提供了一个工厂 bean 来创建一个StreamsBuilder对象并管理其流的生命周期。
Spring Boot 自动配置所需的KafkaStreamsConfigurationbean 只要kafka-streams位于类路径上,并且 Kafka Streams 由@EnableKafkaStreams注解。
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。
前者可以使用spring.kafka.streams.application-id,默认为spring.application.name如果未设置。
后者可以全局设置,也可以仅针对流专门覆盖。
使用专用属性可以使用多个其他属性;其他任意 Kafka 属性可以使用spring.kafka.streams.propertiesNamespace。
另请参阅其他 Kafka 属性以获取更多信息。
要使用工厂豆,请电线StreamsBuilder进入您的@Bean如以下示例所示:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,由StreamBuilder对象会自动启动。
您可以使用spring.kafka.streams.auto-startup财产。
3.4. 其他 Kafka 属性
自动配置支持的属性显示在附录的“集成属性”部分。 请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 虚线属性。 有关详细信息,请参阅 Apache Kafka 文档。
不包含客户端类型的属性 (producer,consumer,admin或streams) 被认为是通用的,适用于所有客户。
如果需要,可以针对一个或多个客户端类型覆盖大多数这些常见属性。
Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值的属性。
只有 Kafka 支持的属性子集可直接通过KafkaProperties类。
如果要使用不直接支持的其他属性配置单个客户端类型,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这设置了共同的prop.oneKafka 属性设置为first(适用于生产者、使用者、管理员和流),则prop.twoadmin 属性设置为second这prop.threeconsumer 属性设置为third这prop.fourproducer 属性设置为fourth和prop.fivestreams 属性设置为fifth.
您还可以配置 Spring KafkaJsonDeserializer如下:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,您可以禁用JsonSerializer在标头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
| 以这种方式设置的属性会覆盖 Spring Boot 显式支持的任何配置项。 |
3.5. 使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。
要使用此功能,请使用@EmbeddedKafka从spring-kafka-test模块。
有关更多信息,请参阅 Spring for Apache Kafka 参考手册。
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起使用,您需要重新映射嵌入式代理地址的系统属性(由EmbeddedKafkaBroker)添加到 Apache Kafka 的 Spring Boot 配置属性中。
有几种方法可以做到这一点:
-
提供系统属性以将嵌入式代理地址映射到
spring.kafka.bootstrap-servers在测试类中:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在
@EmbeddedKafka注解:
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
4. RS袜子
RSocket 是一种用于字节流传输的二进制协议。 它通过通过单个连接传递异步消息来实现对称交互模型。
这spring-messagingSpring Framework 的模块在客户端和服务器端都为 RSocket 请求者和响应者提供支持。
有关更多详细信息,请参阅 Spring Framework 参考的 RSocket 部分,包括 RSocket 协议的概述。
4.1. RSocket 策略自动配置
Spring Boot 自动配置RSocketStrategies提供编码和解码 RSocket 有效负载所需的所有基础设施的 bean。
默认情况下,自动配置将尝试配置以下内容(按顺序):
-
CBOR 编解码器与 Jackson
-
带有 Jackson 的 JSON 编解码器
这spring-boot-starter-rsocketstarter 提供了这两个依赖项。
请参阅 Jackson 支持部分,了解有关自定义可能性的更多信息。
开发人员可以自定义RSocketStrategies组件,通过创建实现RSocketStrategiesCustomizer接口。
请注意,他们的@Order很重要,因为它决定了编解码器的顺序。
4.2. RSocket 服务器自动配置
Spring Boot 提供 RSocket 服务器自动配置。
所需的依赖项由spring-boot-starter-rsocket.
Spring Boot 允许从 WebFlux 服务器通过 WebSocket 公开 RSocket,或建立独立的 RSocket 服务器。 这取决于应用程序的类型及其配置。
对于 WebFlux 应用程序(类型为WebApplicationType.REACTIVE),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
spring:
rsocket:
server:
mapping-path: "/rsocket"
transport: "websocket"
| 仅 Reactor Netty 支持将 RSocket 插入 Web 服务器,因为 RSocket 本身是使用该库构建的。 |
或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。除了依赖项要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898
spring:
rsocket:
server:
port: 9898
4.3. Spring Messaging RSocket 支持
Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。
这意味着 Spring Boot 将创建一个RSocketMessageHandlerbean 的 bean,它将处理对应用程序的 RSocket 请求。
4.4. 使用 RSocketRequester 调用 RSocket 服务
一旦RSocket在服务器和客户端之间建立通道,任何一方都可以向另一方发送或接收请求。
作为服务器,您可以被注入RSocketRequesterRSocket 的任何处理程序方法上的实例@Controller.
作为客户端,您需要先配置并建立 RSocket 连接。
Spring Boot 自动配置RSocketRequester.Builder对于具有预期编解码器的此类情况,并应用任何RSocketConnectorConfigurer豆。
这RSocketRequester.Builderinstance 是一个原型 bean,这意味着每个注入点都会为您提供一个新实例。
这是故意这样做的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。
以下代码显示了一个典型的示例:
@Service
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
}
public Mono<User> someRSocketCall(String name) {
return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
}
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
private val rsocketRequester: RSocketRequester
init {
rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
}
fun someRSocketCall(name: String): Mono<User> {
return rsocketRequester.route("user").data(name).retrieveMono(
User::class.java
)
}
}
5. 弹簧集成
Spring Boot 为使用 Spring Integration 提供了多种便利,包括spring-boot-starter-integration“入门”。
Spring Integration 提供了对消息传递和其他传输(如 HTTP、TCP 等)的抽象。
如果 Spring Integration 在您的类路径上可用,则通过@EnableIntegration注解。
Spring Integration轮询逻辑依赖于。在自动配置的TaskScheduler.
默认值PollerMetadata(每秒轮询无限数量的消息)可以使用spring.integration.poller.*配置属性。
Spring Boot 还配置了一些由其他 Spring Integration 模块的存在触发的功能。
如果spring-integration-jmx也在类路径上,消息处理统计信息通过 JMX 发布。
如果spring-integration-jdbc可用,则可以在启动时创建默认数据库模式,如下行所示:
spring.integration.jdbc.initialize-schema=always
spring:
integration:
jdbc:
initialize-schema: "always"
如果spring-integration-rsocket可用,开发人员可以使用"spring.rsocket.server.*"属性并让它使用IntegrationRSocketEndpoint或RSocketOutboundGateway组件来处理传入的 RSocket 消息。
该基础设施可以处理 Spring Integration RSocket 通道适配器和@MessageMapping处理程序(给定"spring.integration.rsocket.server.message-mapping-enabled"已配置)。
Spring Boot 还可以自动配置ClientRSocketConnector使用配置属性:
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket server over TCP
spring:
integration:
rsocket:
client:
host: "example.org"
port: 9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
# Connecting to a RSocket Server over WebSocket
spring:
integration:
rsocket:
client:
uri: "ws://example.org"
6. Web套接字
Spring Boot 为嵌入式 Tomcat、Jetty 和 Undertow 提供 WebSockets 自动配置。 如果将 war 文件部署到独立容器,Spring Boot 会假定容器负责配置其 WebSocket 支持。
Spring Framework 为 MVC Web 应用程序提供了丰富的 WebSocket 支持,可以通过spring-boot-starter-websocket模块。
WebSocket 支持也可用于响应式 Web 应用程序,并且需要将 WebSocket API 包含在spring-boot-starter-webflux:
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
</dependency>