您当前的位置: 首页 >  rabbitmq

消息中间件系列教程(12) -RabbitMQ-消息确认机制

杨林伟 发布时间:2019-12-11 10:49:15 ,浏览量:3

引言

代码已上传到Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/RabbitMQ-Demo

场景:生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。

这个熟悉的场景容易的让我们想到了“事务”,其实RabbitMQ也是有事务机制的。

解决方案:

  1. AMQP 事务机制
  2. Confirm 模式

事务模式:

  • 「txSelect」 :将当前channel设置为transaction模式
  • 「txCommit」 :提交当前事务
  • 「txRollback」 :事务回滚
1. AMQP 事务机制案例

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:


	
		com.rabbitmq
		amqp-client
		3.6.5
	

3.连接工具类

package com.ylw.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQConnecUtils {

    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 2.设置服务器地址
        factory.setHost("127.0.0.1");

        // 3.设置协议端口号
        factory.setPort(5672);

        // 4.设置vhost
        factory.setVirtualHost("OrderHost");

        // 5.设置用户名称
        factory.setUsername("OrderAdmin");

        // 6.设置用户密码
        factory.setPassword("123456");

        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}

4.生产者

public class Producer {
    private static final String QUEUE_NAME = "test_trans_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();
       
        // 2.创建通道
        Channel channel = newConnection.createChannel();
       
        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       
        // 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务
        channel.txSelect();
        String msg = "test transaction msg ...";
        try {
            // 4.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // int i = 1 / 0;
            channel.txCommit();// 提交事务
            System.out.println("生产者发送消息:" + msg);
        } catch (Exception e) {
            System.out.println("消息进行回滚操作");
            channel.txRollback();// 回滚事务
        } finally {
            channel.close();
            newConnection.close();
        }

    }
}

5.消费者

public class Consumer {
    private static final String QUEUE_NAME = "test_trans_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();

        // 2.获取通道
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息->" + msgString);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

6.依次启动消费者和生产者,可以看到消费者能获取到消息: 在这里插入图片描述 7.现在模拟异常,把生产者的异常代码打开: 在这里插入图片描述 8.启动生产者,发现消费者没有获取到消息: 在这里插入图片描述

2. Confirm机制

和上面的代码一样,需要修改一下生产者,我们重新新建一个类ConfirmProducer

public class ConfirmProducer {

    private static final String QUEUE_NAME = "test_trans_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1.获取连接
        Connection newConnection = RabbitMQConnecUtils.newConnection();

        // 2.创建通道
        Channel channel = newConnection.createChannel();

        // 3.创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // confirm机制
        channel.confirmSelect();
        String msg = "test confirm msg ...";

        // 4.发送消息
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("生产者发送消息:" + msg);
        if (!channel.waitForConfirms()) {
            System.out.println("消息发送失败!");
        } else {
            System.out.println("消息发送成功!");
        }
        channel.close();
        newConnection.close();
    }
}

依次启动消费者和新建的生产者,可以看到生产者发送消息成功,消费者消费消息也成功: 在这里插入图片描述 在这里插入图片描述

本文完!

关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 3浏览

    0关注

    3279博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.1453s