一、RocketMQ事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。 注意:
主要针对消息发送者。
RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
1、RocketMQ事务消息流程概要 上图说明了事务消息的大致方案,其中分为两个流程:
- 正常事务消息的发送及提交
- 事务消息的补偿流程。
- (1) 发送消息(half 消息):图1。
- (2) 服务端响应消息写入结果:图2。
- (3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行):图3。
- (4) 根据本地事务状态执行 Commit 或者 Rollback(Commit操作生成消息索引,消息对消费者可见):图4。
- (1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”:图5。
- (2) Producer 收到回查消息,检查回查消息对应的本地事务的状态:图6。
- (3) 根据本地事务状态,重新 Commit 或者 Rollback::图7。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
2、 事务消息状态事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.
CommitTransaction
: 提交状态,它允许消费者消费此消息(完成了图中 1,2,3,4 步,第 4 步是 Commit)。 - TransactionStatus.
RollbackTransaction
: 回滚状态,它代表该消息将被删除,不允许被消费(完成了图中 1,2,3,4 步, 第 4 步是 Rollback)。 - TransactionStatus.
Unknown
: 中间状态,它代表需要检查消息队列来确定状态(完成了图中 1,2,3 步, 但是没有 4 或者没有 7,无法 Commit 或 Rollback)。
- 事务消息不支持延时消息和批量消息。
- 事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过 Broker 的配置文件设置好。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认 情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用 户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事 务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。
使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态。
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者 producer,并指定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("mq_client_TransactionProducerGroup");
// 2.指定 Nameserver 地址,集群的话使用;分隔
producer.setNamesrvAddr("192.168.xxx.xxx:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 设置生产者回查线程池
producer.setExecutorService(executorService);
// 生产者设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 3.启动 producer
producer.start();
for (int i = 0; i
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?