- 1. 延迟队列
- 2. 插件实现延迟队列
- 2.1 插件安装
- 2.2 代码实现
- 3. DLX 实现延迟队列
- 3.1 DLX + TTL
- 3.2 代码示例
- 3.3 小结
延迟队列,顾名思义,就是让队列中的消息不要立刻被消费,而是要延迟一定的时间。
延迟?是不是想到了定时任务,在日常开发中,定时任务肯定并不陌生,SpringBoot
也提供很好的支持,比如定时统计任务、定时日志备份……
这样看来,延迟队列作用似乎和定时任务相同,二者可以相互替代,但仔细想想二者区别,还是适用于不同场景
对于任务开始时间确定的需求,用定时任务没有问题,但如果任务开始时间不确定呢?比如:
-
在购买商品时,下完订单后
30
分钟内要付款,要不然订单会取消 -
在抢购商品时,可以设置商品开始抢购提醒
-
送外卖时,如果没有按照指定时间送达,临近超时,会提醒外卖小哥
上面这些情况,用定时任务似乎很难办到,因为任务开始的时间并不确定,而用延迟队列的话,很容易实现
在RabbitMQ
上实现定时任务有两种方式:
- 使用
RabbitMQ
插件rabbitmq_delayed_message_exchange
插件来实现定时任务 - 利用
RabbitMQ
自带的消息过期和死信队列机制,实现定时任务
定时任务不适合开始时间不确定的情况
2. 插件实现延迟队列rabbitmq_delayed_message_exchange
插件是RabbitMQ
提供的开源项目,可以用来实现延迟消费消息,从名字是就能看出其意思,下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
注意要选择与RabbitMQ
相同或者相近的版本
由于RabbitMQ
是按装在Docker
上,所以要将插件拷贝到Docker
容器中
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez myrabbitmq:/plugins
第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置
接下来再执行如下命令进入到 RabbitMQ
容器中:
docker exec -it myrabbitmq /bin/bash
进入到容器之后,查看所有的插件,发现并没有启用:
rabbitmq-plugins list
启用插件命令,再查看:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这样插件就按照好了
2.2 代码实现配置文件:
server.port=8889
spring.rabbitmq.host=192.168.43.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
配置类:
@Configuration
public class PluginRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 队列名称
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
// 交换机类型,固定值x-delayed-message
public static final String SCORPIOS_EXCHANGE_TYPE = "x-delayed-message";
@Bean
CustomExchange customExchange(){
Map setting = new HashMap();
setting.put("x-delayed-type", "direct");
return new CustomExchange(PluginRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,PluginRabbitMQConfig.SCORPIOS_EXCHANGE_TYPE,true,false, setting);
}
@Bean
Queue queue() {
return new Queue(PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE,true,false,false);
}
@Bean
Binding bindingMsg(){
return BindingBuilder.bind(queue()).to(customExchange()).with(PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE).noargs();
}
}
上面创建了一个CustomExchange
交换机 ,这是一个 Spring
提供的交换机,创建需要参数如下:
- 交换机名称
- 交换机类型,固定值
- 交换机是否持久化
- 如果没有队列绑定到交换机,交换机是否删除
- 其他参数
最后一个setting
参数中,指定交换机消息分发的类型,就是交换机的那几种类型:direct
、fanout
、topic
以及 header
,选择哪种类型,交换机分发消息方式就按哪种方式。
启动项目后,打开RabbitMQ Web
页面,可以看到:
消息消费者:
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE)
public void consume(String msg) {
log.info("队列收到的消息为:{}", msg);
}
}
此处创建CustomExchange时,需要指定一个交换机类型,此值为固定值:x-delayed-message
否则控制台会报connection reset错误!
消息发送:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
log.info("发送消息....");
// 创建消息对象,在消息头中延迟10秒
Message msg = MessageBuilder.withBody(("delay message plugin..." + new Date()).getBytes(StandardCharsets.UTF_8)).setHeader("x-delay", 10000).build();
rabbitTemplate.convertAndSend(PluginRabbitMQConfig.SCORPIOS_EXCHANGE_NAME, PluginRabbitMQConfig.SCORPIOS_MSG_QUEUE, msg);
return "success";
}
}
打开浏览器,输入地址:http://localhost:8889/send/message,控制台输入如下:
上面使用了RabbitMQ
提供的插件rabbitmq_delayed_message_exchange
实现了延迟队列,细细想来,延迟队列不就是让消息延迟指定的时间再去被消费么?而之前已经了解过死信交换机和死信队列,再来回忆下。
死信交换机用来接收死信消息(Dead Message
)的,一般消息变成死信消息有如下几种情况:
- 消息被拒绝(
Basic.Reject/Basic.Nack
) ,并且设置requeue
参数为false
- 消息过期
- 队列达到最大长度
死信队列:绑定死信交换机的消息队列是死信队列。当消息在队列中变成了死信消息后,此时就会被发送到死信交换机。
换句话说,没人消费的消息,最终会进入死信队列。DLX
+ TTL
是不是可以实现延迟队列
假设一条消息需要延迟30
分钟执行,那么就设置这条消息的有效期为30
分钟,同时为这条消息配置死信交换机和死信routing_key
,并且不为这个消息队列设置消费者,那么30
分钟后,这条消息由于没有被消费者消费而进入死信队列,此时死信队列的消费者就会消费这条过期的消息
配置文件:
server.port=8889
spring.rabbitmq.host=192.168.43.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
配置文件:
@Configuration
public class DLXRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_DLX_EXCHANGE = "scorpios_dlx_exchange";
// 发送队列名称
public static final String SCORPIOS_DLX_QUEUE = "scorpios_dlx_queue";
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
// 死信交换机
@Bean
DirectExchange dlxDirectExchange(){
return new DirectExchange(DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE,true,false);
}
// 死信队列
@Bean
Queue dlxQueue() {
return new Queue(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE,true,false,false);
}
// 将死信队列和死信交换机绑定
@Bean
Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE);
}
// 创建一个普通队列,并把它与死信交换机
@Bean
Queue msgQueue() {
Map setting = new HashMap();
// 设置死信交换机
setting.put("x-dead-letter-exchange", DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE);
// 设置死信 routing_key 与队列名称相同
setting.put("x-dead-letter-routing-key", DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE);
return new Queue(DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE, true, false, false, setting);
}
}
上面的代码复用了介绍死信交换机和死信队列时的代码,不多说
死信队列配置一个消费者:
@Slf4j
@Component
public class Consumer {
// 死信队列的消费者
@RabbitListener(queues = DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE)
public void dlxConsume(String msg) {
log.info("死信队列收到的消息为:{}", msg);
}
}
消息发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
log.info("客户端发送消息");
// 创建消息对象
Message message = MessageBuilder.withBody("delay message dlx and ttl ...".getBytes(StandardCharsets.UTF_8))
.setExpiration("10000")
.build();
rabbitTemplate.convertAndSend(DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE,message);
return "success";
}
}
打开浏览器,输入地址:http://localhost:8889/send/message,控制台输入如下:
使用DLX Exchange + TTL
实现延迟队列,核心思想就是:给消息设置指定的过期时间,而消息队列并没有消费者,当过期时间到了以后,就会进入到死信交换机,最终被死信队列的消费者消费。