1、RabbitMQ官网发布订阅(即publish–subscribe)模式的架构图
org.springframework.amqp
spring-rabbit
1.4.0.RELEASE
com.rabbitmq
amqp-client
3.4.1
3、MQ获取连接工具类代码如下:
package com.rf.rabiitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @description: mq连接工具类
* @author: xiaozhi
* @create: 2020-04-21 09:03
*/
public class ConnectionUtil {
/**
* @Description: 获取mq连接方法
* @Param: []
* @Author: xz
* @return: com.rabbitmq.client.Connection
* @Date: 2020/4/21 9:19
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//以下信息,在安装mq后,登录mq客户端进行配置的信息
connectionFactory.setHost("localhost");//设置服务地址
connectionFactory.setPort(5672);//设置端口
connectionFactory.setVirtualHost("xzVirtualHosts");//设置虚拟主机名称
connectionFactory.setUsername("xz");//设置用户名
connectionFactory.setPassword("xz");//设置密码
//通过连接工厂获取连接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
4、消息发送者,即服务端代码如下:
package com.rf.rabiitmq.pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rf.rabiitmq.util.ConnectionUtil;
/**
* @description: 发布订阅模式 消息发布者
* @author: xiaozhi
* @create: 2020-04-21 14:34
*/
public class PubsubSend {
private final static String EXCHANGE_NAME="exchange_name";//声明交换机名称
public static void main(String[] args) throws Exception{
//获取连接及通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**
* 声明exchange
* @param1 交换交换的名字
* @param2 输入交换类型
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//发布消息
String message="更新用户id=2的信息。";
/**
* @param1 交换机名称
* @param2 路由key
* @param3 其他属性
* @param4 body消息体
* */
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("后台系统发布消息,"+message);
//关闭通道及连接
channel.close();
connection.close();
}
}
5、消息接收者,即客户端1代码如下:
package com.rf.rabiitmq.pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rf.rabiitmq.util.ConnectionUtil;
/**
* @description: 发布订阅模式 客户端1
* @author: xiaozhi
* @create: 2020-04-21 14:59
*/
public class PubsubRecv1 {
private final static String QUEUE_NAME="pubsub_queue1";//声明队列名称
private final static String EXCHANGE_NAME="exchange_name";//声明交换机名称
public static void main(String[] args) throws Exception{
//获取连接及通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**
* 声明队列
* @param1 队列的名称
* @param2 是否持久化 如果true(该队列将在服务器重启后继续存在)
* @param3 是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景
* @param4 如果我们声明一个自动删除队列(服务器将在不再使用时删除它)
* @param5 参数队列的其他属性(构造参数)
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 绑定队列到交换机
* @param1 交换机的名称
* @param2 队列的名称
* @param3 用于绑定的例程键
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列的消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 监听队列 手动返回完成
* @param1 队列名称
* @param2 如果true,服务器应该考虑消息;如果是false,手动返回完成
* @param3 回调消费者对象的接口
* */
channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
//接收消息内容
while(true){
//等待下一个消息传递并返回它
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
//检索消息体
String message = new String(delivery.getBody());
System.out.println("客户端1接收消息:"+message);
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
6、消息接收者,即客户端2代码如下:
package com.rf.rabiitmq.pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rf.rabiitmq.util.ConnectionUtil;
/**
* @description: 发布订阅模式 客户端2
* @author: xiaozhi
* @create: 2020-04-21 15:28
*/
public class PubsubRecv2 {
private final static String QUEUE_NAME="pubsub_queue2";//声明队列名称
private final static String EXCHANGE_NAME="exchange_name";//声明交换机名称
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**
* 声明队列
* @param1 队列的名称
* @param2 是否持久化 如果true(该队列将在服务器重启后继续存在)
* @param3 是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景
* @param4 如果我们声明一个自动删除队列(服务器将在不再使用时删除它)
* @param5 参数队列的其他属性(构造参数)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 绑定队列到交换机
* @param1 交换机的名称
* @param2 队列的名称
* @param3 用于绑定的例程键
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列的消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
/**
* 监听队列 手动返回完成
* @param1 队列名称
* @param2 如果true,服务器应该考虑消息;如果是false,手动返回完成
* @param3 回调消费者对象的接口
* */
channel.basicConsume(QUEUE_NAME,false,queueingConsumer);
while(true){
//等待下一个消息传递并返回它
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
//检索消息体
String message = new String(delivery.getBody());
System.out.println("客户端2接收消息:"+message);
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
7、先启动服务端、客户端1和客户端2代码程序,之后关闭服务端、客户端1和客户端2代码程序
- 原因:为了先创建队列和交换机
1)、启动消息接收者,即客户端1代码程序。输出如下图: 2)、启动消息接收者,即客户端2代码程序。输出如下图:
1)、启动消息发送者,即服务端的代码程序。控制台输出如下图: 2)查看客户端1即(消费者1)和客户端2(即消费者2)的控制台输出如下:
客户端1和客户端2同时接收相同的消息