Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 本身内容很多,而且它还有很多外部的依赖,想要熟悉 SCS,必须要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来文章将从以下几点跟大家进行介绍:
- 什么是 Spring Messaging;
- 什么是 Spring Integration;
- 什么是 SCS及其功能;
Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。
- 比如消息
Messaging
对应的模型就包括一个消息体 Payload 和消息头 Header:
package org.springframework.messaging;
public interface Message {
T getPayload();
MessageHeaders getHeaders();
}
- 消息通道
MessageChannel
用于接收消息,调用send
方法可以将消息发送至该消息通道中 :
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message message, long timeout);
}
消息通道里的消息如何被消费呢?
- 由消息通道的子接口可订阅的消息通道
SubscribableChannel
实现,被MessageHandler
消息处理器所订阅:
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
- 由
MessageHandler
真正地消费/处理消息:
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message message) throws MessagingException;
}
Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:
- 消息接收参数及返回值处理:消息接收参数处理器
HandlerMethodArgumentResolver
配合@Header
,@Payload
等注解使用;消息接收后的返回值处理器HandlerMethodReturnValueHandler
配合@SendTo
注解使用; - 消息体内容转换器
MessageConverter
; - 统一抽象的消息发送模板
AbstractMessageSendingTemplate
; - 消息通道拦截器
ChannelInterceptor
;
Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。它提出了不少新的概念,包括消息的路由 MessageRoute
、消息的分发 MessageDispatcher
、消息的过滤 Filter
、消息的转换 Transformer
、消息的聚合 Aggregator
、消息的分割 Splitter
等等。同时还提供了 MessageChannel
和MessageHandler
的实现,分别包括 DirectChannel
、ExecutorChannel
、PublishSubscribeChannel
和MessageFilter
、ServiceActivatingHandler
、MethodInvokingSplitter
等内容。
首先为大家介绍几种消息的处理方式:
- 消息的分割:
- 消息的聚合:
- 消息的过滤:
- 消息的分发:
接下来,我们以一个最简单的例子来尝试一下 Spring Integration:
SubscribableChannel messageChannel = new DirectChannel(); // 1
messageChannel.subscribe(msg -> { // 2
System.out.println("receive: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3
- 构造一个可订阅的消息通道
messageChannel
; - 使用
MessageHandler
去消费这个消息通道里的消息; - 发送一条消息到这个消息通道,消息最终被消息通道里的
MessageHandler
所消费,最后控制台打印出:receive: msg from alibaba
;
DirectChannel
内部有个 UnicastingDispatcher
类型的消息分发器,会分发到对应的消息通道 MessageChannel
中,从名字也可以看出来,UnicastingDispatcher
是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy
负载均衡策略,默认只有轮询的实现,可以进行扩展。
我们对上段代码做一点修改,使用多个 MessageHandler
去处理消息:
SubscribableChannel messageChannel = new DirectChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
由于 DirectChannel
内部的消息分发器是 UnicastingDispatcher
单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个 MessageHandler
。控制台打印出:
receive1: msg from alibaba
receive2: msg from alibaba
既然存在单播的消息分发器 UnicastingDispatcher
,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher
,它被 PublishSubscribeChannel
这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler
:
SubscribableChannel messageChannel = new PublishSubscribeChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
发送两个消息,都被所有的 MessageHandler
所消费。控制台打印:
receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba
Spring Cloud Stream
SCS与各模块之间的关系是:
- SCS 在 Spring Integration 的基础上进行了封装,提出了
Binder
,Binding
,@EnableBinding
,@StreamListener
等概念; - SCS 与 Spring Boot Actuator 整合,提供了
/bindings
,/channels
endpoint; - SCS 与 Spring Boot Externalized Configuration 整合,提供了
BindingProperties
,BinderProperties
等外部化配置类; - SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。
- SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。
Binder
是提供与外部消息中间件集成的组件,为构造 Binding
提供了 2 个方法,分别是 bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。
从图中可以看出,Binding
是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用 RocketMQ Binder 的例子,然后分析一下它的底层处理原理:
- 启动类及消息的发送:
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class }) // 1
public class SendAndReceiveApplication {
public static void main(String[] args) {
SpringApplication.run(SendAndReceiveApplication.class, args);
}
@Bean // 2
public CustomRunner customRunner() {
return new CustomRunner();
}
public static class CustomRunner implements CommandLineRunner {
@Autowired
private Source source;
@Override
public void run(String... args) throws Exception {
int count = 5;
for (int index = 1; index
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?