消息
1. JMS
这jakarta.jms.ConnectionFactory
接口提供了一种创建jakarta.jms.Connection
用于与 JMS 代理交互。
虽然 Spring 需要一个ConnectionFactory
要使用 JMS,您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。
(有关详细信息,请参阅 Spring Framework 参考文档的相关部分。
Spring Boot 还会自动配置发送和接收消息所需的基础设施。
1.1. ActiveMQ Artemis 支持
Spring Boot 可以自动配置ConnectionFactory
当它检测到 ActiveMQ Artemis 在类路径上可用时。
如果代理存在,则会自动启动和配置嵌入式代理(除非已显式设置 mode 属性)。
支持的模式包括embedded
(明确需要嵌入式代理,并且如果代理在类路径上不可用,则应发生错误)和native
(要使用netty
传输协议)。
配置后者时,Spring Boot 会配置一个ConnectionFactory
连接到具有默认设置的本地计算机上运行的代理。
如果您使用spring-boot-starter-artemis ,提供了连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础设施。
添加org.apache.activemq:artemis-jakarta-server 允许您使用嵌入式模式。 |
ActiveMQ Artemis 配置由spring.artemis.*
.
例如,您可以在application.properties
:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
嵌入代理时,您可以选择是否要启用持久性并列出应提供的目标。
这些可以指定为逗号分隔的列表以使用默认选项创建它们,或者您可以定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或org.apache.activemq.artemis.jms.server.config.TopicConfiguration
,分别用于高级队列和主题配置。
默认情况下,一个CachingConnectionFactory
包装原生ConnectionFactory
使用合理的设置,您可以通过spring.jms.*
:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过添加依赖项来实现org.messaginghub:pooled-jms
并配置JmsPoolConnectionFactory
因此,如以下示例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
看ArtemisProperties
以获取更多支持的选项。
不涉及 JNDI 查找,并且使用其中一种name
属性或通过配置提供的名称。
1.2. 使用 JNDI ConnectionFactory
如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试查找 JMSConnectionFactory
通过使用 JNDI。
默认情况下,java:/JmsXA
和java:/XAConnectionFactory
位置被检查。
您可以使用spring.jms.jndi-name
属性,如果需要指定替代位置,如以下示例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
1.3. 发送消息
Spring的JmsTemplate
是自动配置的,您可以将其直接自动连接到您自己的 bean,如以下示例所示:
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
JmsMessagingTemplate 可以以类似的方式注射。
如果DestinationResolver 或MessageConverter bean 时,它会自动与自动配置的JmsTemplate . |
1.4. 接收消息
当 JMS 基础设施存在时,可以使用@JmsListener
以创建侦听器端点。
如果没有JmsListenerContainerFactory
,则会自动配置默认值。
如果DestinationResolver
一个MessageConverter
或jakarta.jms.ExceptionListener
bean 被定义,它们会自动与默认工厂相关联。
默认情况下,默认工厂是事务性的。
如果您在JtaTransactionManager
存在,则默认情况下它与侦听器容器相关联。
如果没有,则sessionTransacted
标志已启用。
在后一种情况下,可以通过添加@Transactional
在您的侦听器方法(或其委托)上。
这可确保在本地事务完成后确认传入消息。
这还包括发送已在同一 JMS 会话上执行的响应消息。
以下组件在someQueue
目的地:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
看Javadoc 的@EnableJms 了解更多详情。 |
如果您需要创建更多JmsListenerContainerFactory
实例,或者如果你想覆盖默认值,Spring Boot 提供了一个DefaultJmsListenerContainerFactoryConfigurer
可用于初始化DefaultJmsListenerContainerFactory
使用与自动配置的设置相同的设置。
例如,以下示例公开了另一个使用特定MessageConverter
:
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后您可以在任何@JmsListener
-annotated 方法如下所示:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
2. AMQP
高级消息队列协议 (AMQP) 是一种平台中立的线级协议,用于面向消息的中间件。
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。
Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp
“入门”。
2.1. RabbitMQ 支持
RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 配置由spring.rabbitmq.*
.
例如,您可以在application.properties
:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用addresses
属性:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,host 和port 属性被忽略。
如果地址使用amqps 协议,SSL 支持会自动启用。 |
看RabbitProperties
了解更多受支持的基于属性的配置选项。
配置 RabbitMQ 的较低级别详细信息ConnectionFactory
Spring AMQP 使用的,定义一个ConnectionFactoryCustomizer
豆。
如果ConnectionNameStrategy
bean 存在于上下文中,它将自动用于命名由自动配置的CachingConnectionFactory
.
有关更多详细信息,请参阅了解 AMQP,RabbitMQ 使用的协议。 |
2.2. 发送消息
Spring的AmqpTemplate
和AmqpAdmin
自动配置,您可以将它们直接自动连接到您自己的 bean,如以下示例所示:
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
RabbitMessagingTemplate 可以以类似的方式注射。
如果MessageConverter bean 时,它会自动与自动配置的AmqpTemplate . |
如有必要,任何org.springframework.amqp.core.Queue
定义为 bean 自动用于在 RabbitMQ 实例上声明相应的队列。
要重试作,您可以在AmqpTemplate
(例如,如果代理连接丢失):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下,重试处于禁用状态。
您还可以自定义RetryTemplate
通过声明RabbitRetryTemplateCustomizer
豆。
如果您需要创建更多RabbitTemplate
实例,或者如果你想覆盖默认值,Spring Boot 提供了一个RabbitTemplateConfigurer
bean,您可以使用它来初始化RabbitTemplate
与自动配置使用的工厂设置相同。
2.3. 向流发送消息
要向特定流发送消息,请指定流的名称,如以下示例所示:
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果MessageConverter
,StreamMessageConverter
或ProducerCustomizer
bean 时,它会自动与自动配置的RabbitStreamTemplate
.
如果您需要创建更多RabbitStreamTemplate
实例,或者如果你想覆盖默认值,Spring Boot 提供了一个RabbitStreamTemplateConfigurer
bean,您可以使用它来初始化RabbitStreamTemplate
与自动配置使用的工厂设置相同。
2.4. 接收消息
当 Rabbit 基础设施存在时,任何 bean 都可以使用@RabbitListener
以创建侦听器端点。
如果没有RabbitListenerContainerFactory
已定义,默认值为SimpleRabbitListenerContainerFactory
自动配置,您可以使用spring.rabbitmq.listener.type
财产。
如果MessageConverter
或MessageRecoverer
bean 时,它会自动与默认工厂相关联。
以下示例组件在someQueue
队列:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
看Javadoc 的@EnableRabbit 了解更多详情。 |
如果您需要创建更多RabbitListenerContainerFactory
实例,或者如果你想覆盖默认值,Spring Boot 提供了一个SimpleRabbitListenerContainerFactoryConfigurer
和DirectRabbitListenerContainerFactoryConfigurer
可用于初始化SimpleRabbitListenerContainerFactory
和DirectRabbitListenerContainerFactory
与自动配置使用的工厂设置相同。
您选择哪种容器类型并不重要。 这两个 bean 由自动配置公开。 |
例如,以下配置类公开另一个使用特定MessageConverter
:
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后您可以在任何@RabbitListener
-annotated 方法,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试来处理侦听器引发异常的情况。
默认情况下,RejectAndDontRequeueRecoverer
,但您可以定义一个MessageRecoverer
你自己的。
当重试用尽时,消息将被拒绝,并丢弃或路由到死信交换(如果代理配置为这样做)。
默认情况下,重试处于禁用状态。
您还可以自定义RetryTemplate
通过声明RabbitRetryTemplateCustomizer
豆。
默认情况下,如果禁用重试并且侦听器抛出异常,则将无限期重试传递。
您可以通过两种方式修改此行为:将defaultRequeueRejected 属性设置为false 以便尝试零重新传递或抛出AmqpRejectAndDontRequeueException 以表示应拒绝消息。
后者是启用重试并达到最大传递尝试次数时使用的机制。 |
3. Apache Kafka 支持
Apache Kafka 通过提供自动配置spring-kafka
项目。
Kafka 配置由spring.kafka.*
.
例如,您可以在application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建主题,请添加类型为NewTopic . 如果主题已经存在,则忽略 bean。 |
看KafkaProperties
以获取更多支持的选项。
3.1. 发送消息
Spring的KafkaTemplate
自动配置,您可以直接在自己的 bean 中自动连接它,如以下示例所示:
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
如果属性spring.kafka.producer.transaction-id-prefix 定义时,一个KafkaTransactionManager 自动配置。此外,如果RecordMessageConverter bean 时,它会自动与自动配置的KafkaTemplate . |
3.2. 接收消息
当 Apache Kafka 基础设施存在时,任何 bean 都可以使用@KafkaListener
以创建侦听器端点。
如果没有KafkaListenerContainerFactory
,则默认的自动配置为spring.kafka.listener.*
.
以下组件在someTopic
主题:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果KafkaTransactionManager
bean 时,它会自动关联到容器工厂。
同样,如果RecordFilterStrategy
,CommonErrorHandler
,AfterRollbackProcessor
或ConsumerAwareRebalanceListener
bean 时,它会自动与默认工厂相关联。
根据侦听器类型,一个RecordMessageConverter
或BatchMessageConverter
bean 与默认工厂相关联。如果只有一个RecordMessageConverter
bean 存在于批处理侦听器中,它被包装在BatchMessageConverter
.
一个定制ChainedKafkaTransactionManager 必须标记@Primary 因为它通常引用自动配置的KafkaTransactionManager 豆。 |
3.3. Kafka 流
Spring for Apache Kafka 提供了一个工厂 bean 来创建一个StreamsBuilder
对象并管理其流的生命周期。Spring Boot 自动配置所需的KafkaStreamsConfiguration
bean 只要kafka-streams
位于类路径上,并且 Kafka Streams 由@EnableKafkaStreams
注解。
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用spring.kafka.streams.application-id
,默认为spring.application.name
如果未设置。后者可以全局设置或仅针对流专门覆盖。
使用专用属性可以使用几个附加属性;其他任意 Kafka 属性可以使用spring.kafka.streams.properties
Namespace。 另请参阅其他 Kafka 属性以获取更多信息。
要使用工厂豆,请电线StreamsBuilder
进入您的@Bean
如以下示例所示:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,由StreamBuilder
对象自动启动。您可以使用spring.kafka.streams.auto-startup
财产。
3.4. 其他 Kafka 属性
自动配置支持的属性显示在附录的“集成属性”部分中。请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 虚线属性。有关详细信息,请参阅 Apache Kafka 文档。
不包含客户端类型的属性 (producer
,consumer
,admin
或streams
) 在其名称中被视为通用并适用于所有客户端。如果需要,可以针对一种或多种客户端类型覆盖大多数这些公共属性。
Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值的属性。
只有 Kafka 支持的属性子集可直接通过KafkaProperties
类。 如果要使用不直接支持的其他属性配置单个客户端类型,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这设置了共同的prop.one
Kafka 属性设置为first
(适用于生产者、使用者、管理员和流),则prop.two
admin 属性设置为second
这prop.three
consumer 属性设置为third
这prop.four
producer 属性设置为fourth
和prop.five
streams 属性设置为fifth
.
您还可以配置 Spring KafkaJsonDeserializer
如下:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,您可以禁用JsonSerializer
在标头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以这种方式设置的属性会覆盖 Spring Boot 显式支持的任何配置项。 |
3.5. 使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。要使用此功能,请使用@EmbeddedKafka
从spring-kafka-test
模块。 有关更多信息,请参阅 Spring for Apache Kafka 参考手册。
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起使用,您需要重新映射嵌入式代理地址的系统属性(由EmbeddedKafkaBroker
) 添加到 Apache Kafka 的 Spring Boot 配置属性中。有几种方法可以做到这一点:
-
提供系统属性以将嵌入式代理地址映射到
spring.kafka.bootstrap-servers
在测试类中:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在
@EmbeddedKafka
注解:
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
4. RS袜子
RSocket 是一种用于字节流传输的二进制协议。 它通过通过单个连接传递异步消息来实现对称交互模型。
这spring-messaging
Spring Framework 的模块在客户端和服务器端都为 RSocket 请求者和响应者提供支持。
有关更多详细信息,请参阅 Spring Framework 参考的 RSocket 部分,包括 RSocket 协议的概述。
4.1. RSocket 策略自动配置
Spring Boot 自动配置RSocketStrategies
提供编码和解码 RSocket 有效负载所需的所有基础设施的 bean。
默认情况下,自动配置将尝试配置以下内容(按顺序):
-
CBOR 编解码器与 Jackson
-
带有 Jackson 的 JSON 编解码器
这spring-boot-starter-rsocket
starter 提供了这两个依赖项。
请参阅 Jackson 支持部分,了解有关自定义可能性的更多信息。
开发人员可以自定义RSocketStrategies
组件,通过创建实现RSocketStrategiesCustomizer
接口。
请注意,他们的@Order
很重要,因为它决定了编解码器的顺序。
4.2. RSocket 服务器自动配置
Spring Boot 提供 RSocket 服务器自动配置。
所需的依赖项由spring-boot-starter-rsocket
.
Spring Boot 允许从 WebFlux 服务器通过 WebSocket 公开 RSocket,或建立独立的 RSocket 服务器。 这取决于应用程序的类型及其配置。
对于 WebFlux 应用程序(类型为WebApplicationType.REACTIVE
),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
spring:
rsocket:
server:
mapping-path: "/rsocket"
transport: "websocket"
仅 Reactor Netty 支持将 RSocket 插入 Web 服务器,因为 RSocket 本身是使用该库构建的。 |
或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。 除了依赖项要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898
spring:
rsocket:
server:
port: 9898
4.3. Spring Messaging RSocket 支持
Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。
这意味着 Spring Boot 将创建一个RSocketMessageHandler
bean 的 bean,它将处理对应用程序的 RSocket 请求。
4.4. 使用 RSocketRequester 调用 RSocket 服务
一旦RSocket
在服务器和客户端之间建立通道,任何一方都可以向另一方发送或接收请求。
作为服务器,您可以被注入RSocketRequester
RSocket 的任何处理程序方法上的实例@Controller
.
作为客户端,您需要先配置并建立 RSocket 连接。
Spring Boot 自动配置RSocketRequester.Builder
对于具有预期编解码器的此类情况,并应用任何RSocketConnectorConfigurer
豆。
这RSocketRequester.Builder
instance 是一个原型 bean,这意味着每个注入点都会为您提供一个新实例。
这是故意这样做的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。
以下代码显示了一个典型的示例:
@Service
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
}
public Mono<User> someRSocketCall(String name) {
return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
}
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
private val rsocketRequester: RSocketRequester
init {
rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
}
fun someRSocketCall(name: String): Mono<User> {
return rsocketRequester.route("user").data(name).retrieveMono(
User::class.java
)
}
}
5. 弹簧集成
Spring Boot 为使用 Spring Integration 提供了多种便利,包括spring-boot-starter-integration
“入门”。
Spring Integration 提供了对消息传递和其他传输(如 HTTP、TCP 等)的抽象。
如果 Spring Integration 在您的类路径上可用,则通过@EnableIntegration
注解。
Spring Integration轮询逻辑依赖于。在自动配置的TaskScheduler
.
默认值PollerMetadata
(每秒轮询无限数量的消息)可以使用spring.integration.poller.*
配置属性。
Spring Boot 还配置了一些由其他 Spring Integration 模块的存在触发的功能。
如果spring-integration-jmx
也在类路径上,消息处理统计信息通过 JMX 发布。
如果spring-integration-jdbc
可用,则可以在启动时创建默认数据库模式,如下行所示:
spring.integration.jdbc.initialize-schema=always
spring:
integration:
jdbc:
initialize-schema: "always"
如果spring-integration-rsocket
可用,开发人员可以使用"spring.rsocket.server.*"
属性并让它使用IntegrationRSocketEndpoint
或RSocketOutboundGateway
组件来处理传入的 RSocket 消息。
该基础设施可以处理 Spring Integration RSocket 通道适配器和@MessageMapping
处理程序(给定"spring.integration.rsocket.server.message-mapping-enabled"
已配置)。
Spring Boot 还可以自动配置ClientRSocketConnector
使用配置属性:
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket server over TCP
spring:
integration:
rsocket:
client:
host: "example.org"
port: 9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
# Connecting to a RSocket Server over WebSocket
spring:
integration:
rsocket:
client:
uri: "ws://example.org"
6. Web套接字
Spring Boot 为嵌入式 Tomcat、Jetty 和 Undertow 提供 WebSockets 自动配置。 如果将 war 文件部署到独立容器,Spring Boot 会假定容器负责配置其 WebSocket 支持。
Spring Framework 为 MVC Web 应用程序提供了丰富的 WebSocket 支持,可以通过spring-boot-starter-websocket
模块。
WebSocket 支持也可用于响应式 Web 应用程序,并且需要将 WebSocket API 包含在spring-boot-starter-webflux
:
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
</dependency>