您当前的位置: 首页 >  rabbitmq

小志的博客

暂无认证

  • 2浏览

    0关注

    1217博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

RabbitMQ的路由(即Routing)模式示例代码

小志的博客 发布时间:2020-04-22 16:28:37 ,浏览量:2

1、RabbitMQ官网路由(即Routing)模式的架构图

在这里插入图片描述

2、pom文件需要引入有关MQ的2个依赖包
 
   org.springframework.amqp
     spring-rabbit
     1.4.0.RELEASE
 
 
     com.rabbitmq
     amqp-client
     3.4.1
 

3、MQ获取连接工具类代码如下:
package com.rf.rabiitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @description: mq连接工具类
 * @author: xiaozhi
 * @create: 2020-04-21 09:03
 */
public class ConnectionUtil {
    /** 
    * @Description: 获取mq连接方法
    * @Param: [] 
    * @Author: xz  
    * @return: com.rabbitmq.client.Connection
    * @Date: 2020/4/21 9:19
    */ 
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //以下信息,在安装mq后,登录mq客户端进行配置的信息
        connectionFactory.setHost("localhost");//设置服务地址
        connectionFactory.setPort(5672);//设置端口
        connectionFactory.setVirtualHost("xzVirtualHosts");//设置虚拟主机名称
        connectionFactory.setUsername("xz");//设置用户名
        connectionFactory.setPassword("xz");//设置密码
        //通过连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        return connection;
    }
}

4、消息发送者,即服务端代码如下:
package com.rf.rabiitmq.routing;
import com.rabbitmq.client.Connection;
import com.rf.rabiitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;

/**
 * @description: 路由模式   发布消息
 * @author: xiaozhi
 * @create: 2020-04-21 15:40
 */
public class RoutSend {
    private final static String EXCHANGE_NAME = "routing_exchange_name";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /**
         * 声明exchange
         * @param1 交换交换的名字
         * @param2 输入交换类型
         * */
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String message = "删除用户id=10的用户信息。";
        /**
         * @param1 交换机名称
         * @param2 指定路由key
         * @param3 其他属性
         * @param4 body消息体
         * */
        channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
        System.out.println(" 后台系统发布消息:'" + message + "'");
        channel.close();
        connection.close();
    }
}
5、消息接收者,即客户端1代码如下:
package com.rf.rabiitmq.routing;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rf.rabiitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
/**
 * @description: 客户端1
 * @author: xiaozhi
 * @create: 2020-04-21 17:11
 */
public class RoutRescv1 {
    private final static String QUEUE_NAME = "routing_queue_name1";
    private final static String EXCHANGE_NAME = "routing_exchange_name";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机,并指定路由key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("客户端1接收消息: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

6、消息接收者,即客户端2代码如下:
package com.rf.rabiitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rf.rabiitmq.util.ConnectionUtil;

/**
 * @description: 客户端2
 * @author: xiaozhi
 * @create: 2020-04-21 17:12
 */
public class RoutRescv2 {
    private final static String QUEUE_NAME = "routing_queue_name2";
    private final static String EXCHANGE_NAME = "routing_exchange_name";
    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 绑定队列到交换机,并指定路由key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("客户端2接收消息: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

7、先启动服务端、客户端1和客户端2代码程序,之后关闭服务端、客户端1和客户端2代码程序
  • 原因:为了先创建队列和交换机 在这里插入图片描述在这里插入图片描述
8、启动客户端代码程序:

1)、启动消息接收者,即客户端1代码程序。输出如下图: 在这里插入图片描述 2)、启动消息接收者,即客户端2代码程序。输出如下图: 在这里插入图片描述

9、启动服务端代码程序

1)、启动消息发送者,即服务端的代码程序。控制台输出如下图: 在这里插入图片描述2)查看客户端1即(消费者1)和客户端2(即消费者2)的控制台输出如下: 在这里插入图片描述在这里插入图片描述

10、结论
  • 客户端1即(消费者1)接收到消息
  • 客户端2即(消费者2)未接收到消息
11、结论的原因

1)、客户端1即(消费者1)绑定队列到交换机中路由key指定了select、insert、update、delete 4种方式,如下图: 在这里插入图片描述2)、客户端2即(消费者2)绑定队列到交换机中路由key指定了select、update 2种方式,如下图: 在这里插入图片描述 3)服务端程序代码发送消息时绑定路由key是delete,所以匹配到了客户端1中绑定的路由key是delete,而客户端2只有select和update,所以客户端1接收到了消息,如下图: 在这里插入图片描述

关注
打赏
1661269038
查看更多评论
立即登录/注册

微信扫码登录

0.1023s