1、RabbitMQ官网路由(即Routing)模式的架构图
org.springframework.amqp
spring-rabbit
1.4.0.RELEASE
com.rabbitmq
amqp-client
3.4.1
3、MQ获取连接工具类代码如下:
package com.rf.rabiitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description: mq连接工具类
* @author: xiaozhi
* @create: 2020-04-21 09:03
*/
public class ConnectionUtil {
/**
* @Description: 获取mq连接方法
* @Param: []
* @Author: xz
* @return: com.rabbitmq.client.Connection
* @Date: 2020/4/21 9:19
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//以下信息,在安装mq后,登录mq客户端进行配置的信息
connectionFactory.setHost("localhost");//设置服务地址
connectionFactory.setPort(5672);//设置端口
connectionFactory.setVirtualHost("xzVirtualHosts");//设置虚拟主机名称
connectionFactory.setUsername("xz");//设置用户名
connectionFactory.setPassword("xz");//设置密码
//通过连接工厂获取连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
4、消息发送者,即服务端代码如下:
package com.rf.rabiitmq.routing;
import com.rabbitmq.client.Connection;
import com.rf.rabiitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
/**
* @description: 路由模式 发布消息
* @author: xiaozhi
* @create: 2020-04-21 15:40
*/
public class RoutSend {
private final static String EXCHANGE_NAME = "routing_exchange_name";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**
* 声明exchange
* @param1 交换交换的名字
* @param2 输入交换类型
* */
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "删除用户id=10的用户信息。";
/**
* @param1 交换机名称
* @param2 指定路由key
* @param3 其他属性
* @param4 body消息体
* */
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" 后台系统发布消息:'" + message + "'");
channel.close();
connection.close();
}
}
5、消息接收者,即客户端1代码如下:
package com.rf.rabiitmq.routing;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rf.rabiitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
/**
* @description: 客户端1
* @author: xiaozhi
* @create: 2020-04-21 17:11
*/
public class RoutRescv1 {
private final static String QUEUE_NAME = "routing_queue_name1";
private final static String EXCHANGE_NAME = "routing_exchange_name";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,并指定路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("客户端1接收消息: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
6、消息接收者,即客户端2代码如下:
package com.rf.rabiitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rf.rabiitmq.util.ConnectionUtil;
/**
* @description: 客户端2
* @author: xiaozhi
* @create: 2020-04-21 17:12
*/
public class RoutRescv2 {
private final static String QUEUE_NAME = "routing_queue_name2";
private final static String EXCHANGE_NAME = "routing_exchange_name";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,并指定路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("客户端2接收消息: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
7、先启动服务端、客户端1和客户端2代码程序,之后关闭服务端、客户端1和客户端2代码程序
- 原因:为了先创建队列和交换机
1)、启动消息接收者,即客户端1代码程序。输出如下图: 2)、启动消息接收者,即客户端2代码程序。输出如下图:
1)、启动消息发送者,即服务端的代码程序。控制台输出如下图: 2)查看客户端1即(消费者1)和客户端2(即消费者2)的控制台输出如下:
- 客户端1即(消费者1)接收到消息
- 客户端2即(消费者2)未接收到消息
1)、客户端1即(消费者1)绑定队列到交换机中路由key指定了select、insert、update、delete 4种方式,如下图: 2)、客户端2即(消费者2)绑定队列到交换机中路由key指定了select、update 2种方式,如下图:
3)服务端程序代码发送消息时绑定路由key是delete,所以匹配到了客户端1中绑定的路由key是delete,而客户端2只有select和update,所以客户端1接收到了消息,如下图: