学习使用,老鸟飞过,欢迎评论交流
为什么要用到延迟队列在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为“支付超时”并关闭订单以及回退库存操作,那如何在下单30后准时检查支付结果处理订单状态呢?
你可能想到了一个最简单的方法,就是使用定时任务扫描订单表,判断时间是否支付超时,这样的方式无疑是一种很消耗性能的做法,你试想一下,定时扫描一张数据量很大的表去判断时间和状态,而且99%的扫描都是无效的操作。
那么该如何优雅的解决上述问题呢?我们可以采用延迟队列来实现,Redis和MQ都可以做到,本文章采用RabbitMQ的延迟队列来实现。
延迟队列实现原理说到延迟队列就要说一说消息的过期时间(存活时间)TTL,RabbitMQ可以给队列设置过期时间,也可以单独给每个消息设置过期时间,如果到了过期时间消息没被消费该消息就会标记为死信消息。
除此之外还有那些消息会成为死信消息?
- 一是设置了TTL的消息到了TTL过期时间还没被消费,会成为死信
- 二是消息被消费者拒收,并且reject方法的参数里requeue是false,意味这这个消息不会重回队列,该消息会成为死信,
- 三是由于队列大小限制,新的消息进来队列可能满了,MQ会淘汰掉最老的消息,这些消息可能会成为死信消息
成为死信的消息会进入一个死信交换机(Dead Letter Exchange)中,死信交换机也是一个普通的交换机而已,根据这一特点,我们可以准备一个队列来接收死信交换机中的死信消息,然后准备一个消费者来消费该队列中的消息,这样一来我们的延迟队列就有思路了,还是按照订单为例流程如下:
- 下单成功(生产者),加入下单消息到队列(order.message)
- 队列设置TTL过期时间(10000毫秒),同时指定了死信交换机“delay-exchange”和死信交换机转发消息的队列“delay-message”
- 消息进入队列,等待一段时间,如果TTL时间到,订单消息会被MQ扔给死信交换机,死信交换机会把消息扔给指定的死信队列delay-message
- 消费者正好监听了死信队列delay-message,就可以获取到消息进行消费,比如检查该消息对应的订单是否支付,做出退库存处理等。
整体效果就是,消息进入order.message队列 延迟 10秒后就 会进入delay-message队列然后被消费者消费处理,这就是一个延迟队列的效果。
注意,这里的delay-exchange死信交换机其实就是一个普通的交换机而已,所以我们可以把上面的两个交换机合并成一个,如下:
第一步,你需要集成RabbitMQ,我这里使用的是SpringBoot集成MQ
org.springframework.boot
spring-boot-starter-amqp
第二步,对MQ做一些配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
第三步,定义交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
//rabbitMQ的配置
@Configuration
public class MQConfig {
//交换机
public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
//订单队列,该队列中的消息设置过期时间
public static final String QUEUE_ORDER = "QUEUE_ORDER";
//该队列用来接收死信交换机转发过来的消息
public static final String QUEUE_DELAY = "QUEUE_DELAY";
//队列的路由键,该路由键用来接收订单消息传出到订单队列
public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
//该路由键用来接收死信交换机转发过来的消息
public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";
//定义交换机
@Bean
public Exchange exchangeDelay(){
return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
}
//该队列中的消息需要设置ttl
@Bean(QUEUE_ORDER)
public Queue queueOrder(){
Map map = new HashMap();
map.put("x-dead-letter-exchange", EXCHNAGE_DELAY); //过期的消息给哪个交换机的名字
map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY); //死信交换机把消息个哪个个routingkey
map.put("x-message-ttl", 10000); //队列过期时间10s
return new Queue(QUEUE_ORDER,true,false,false,map);
}
//该队列接收死信交换机转发过来的消息
@Bean(QUEUE_DELAY)
public Queue queueDelay(){
return new Queue(QUEUE_DELAY,true);
}
@Bean
public Binding queueOrderBinding(){
return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
}
@Bean
public Binding queueDelayBinding(){
return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
第四部,写一个消息发送者
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MQApplication.class)
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate ;
@Test
public void sendDelayMessage() throws InterruptedException {
System.out.println("发送消息:我是一个延迟消息,开始时间:"+System.currentTimeMillis());
rabbitTemplate.convertAndSend(
MQConfig.EXCHNAGE_DELAY,
MQConfig.ROUTINGKEY_QUEUE_ORDER,
"我是一个延迟消息"
);
Thread.sleep(20000);
}
}
第五步,写一个消费者
@Component
public class Consumer {
@RabbitListener(queues = MQConfig.QUEUE_DELAY)
public void handler(String message){
System.out.println("收到消息:"+message+",结束时间:"+System.currentTimeMillis());
}
}
第六步,测试效果
-
生产者执行后,观察MQ,QUEUE_ORDER中有消息
-
等待10s之后,消息进入QUEUE_DELAY队列
-
控制台打印效果
Producer: 发送消息:我是一个延迟消息,开始时间:1606295976347
Consumer: 收到消息:我是一个延迟消息,结束时间:1606295986418
发送消息到收到消息的时间差为 10071 , 忽略网络开销,延迟时间差不多就是我们设置的TTL时间
文章结束,希望对你有所帮助