您当前的位置: 首页 > 

墨家巨子@俏如来

暂无认证

  • 1浏览

    0关注

    188博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

你还在用定时任务处理过期数据吗?看我MQ延时队列如何优雅的解决

墨家巨子@俏如来 发布时间:2020-11-25 17:30:49 ,浏览量:1

学习使用,老鸟飞过,欢迎评论交流

为什么要用到延迟队列

在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为“支付超时”并关闭订单以及回退库存操作,那如何在下单30后准时检查支付结果处理订单状态呢?

你可能想到了一个最简单的方法,就是使用定时任务扫描订单表,判断时间是否支付超时,这样的方式无疑是一种很消耗性能的做法,你试想一下,定时扫描一张数据量很大的表去判断时间和状态,而且99%的扫描都是无效的操作。

那么该如何优雅的解决上述问题呢?我们可以采用延迟队列来实现,Redis和MQ都可以做到,本文章采用RabbitMQ的延迟队列来实现。

延迟队列实现原理

说到延迟队列就要说一说消息的过期时间(存活时间)TTL,RabbitMQ可以给队列设置过期时间,也可以单独给每个消息设置过期时间,如果到了过期时间消息没被消费该消息就会标记为死信消息。

除此之外还有那些消息会成为死信消息?

  • 一是设置了TTL的消息到了TTL过期时间还没被消费,会成为死信
  • 二是消息被消费者拒收,并且reject方法的参数里requeue是false,意味这这个消息不会重回队列,该消息会成为死信,
  • 三是由于队列大小限制,新的消息进来队列可能满了,MQ会淘汰掉最老的消息,这些消息可能会成为死信消息

成为死信的消息会进入一个死信交换机(Dead Letter Exchange)中,死信交换机也是一个普通的交换机而已,根据这一特点,我们可以准备一个队列来接收死信交换机中的死信消息,然后准备一个消费者来消费该队列中的消息,这样一来我们的延迟队列就有思路了,还是按照订单为例流程如下: 在这里插入图片描述

  1. 下单成功(生产者),加入下单消息到队列(order.message)
  2. 队列设置TTL过期时间(10000毫秒),同时指定了死信交换机“delay-exchange”和死信交换机转发消息的队列“delay-message”
  3. 消息进入队列,等待一段时间,如果TTL时间到,订单消息会被MQ扔给死信交换机,死信交换机会把消息扔给指定的死信队列delay-message
  4. 消费者正好监听了死信队列delay-message,就可以获取到消息进行消费,比如检查该消息对应的订单是否支付,做出退库存处理等。

整体效果就是,消息进入order.message队列 延迟 10秒后就 会进入delay-message队列然后被消费者消费处理,这就是一个延迟队列的效果。

注意,这里的delay-exchange死信交换机其实就是一个普通的交换机而已,所以我们可以把上面的两个交换机合并成一个,如下: 在这里插入图片描述

延迟队列实战

第一步,你需要集成RabbitMQ,我这里使用的是SpringBoot集成MQ


      org.springframework.boot
      spring-boot-starter-amqp

第二步,对MQ做一些配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /

第三步,定义交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

//rabbitMQ的配置
@Configuration
public class MQConfig {
    //交换机
    public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
    //订单队列,该队列中的消息设置过期时间
    public static final String QUEUE_ORDER = "QUEUE_ORDER";
    //该队列用来接收死信交换机转发过来的消息
    public static final String QUEUE_DELAY = "QUEUE_DELAY";
    //队列的路由键,该路由键用来接收订单消息传出到订单队列
    public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
    //该路由键用来接收死信交换机转发过来的消息
    public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";

    //定义交换机
    @Bean
    public Exchange exchangeDelay(){
        return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
    }
    //该队列中的消息需要设置ttl
    @Bean(QUEUE_ORDER)
    public Queue queueOrder(){
        Map map = new HashMap();
        map.put("x-dead-letter-exchange", EXCHNAGE_DELAY);    //过期的消息给哪个交换机的名字
        map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY);   //死信交换机把消息个哪个个routingkey
        map.put("x-message-ttl", 10000);    //队列过期时间10s
        return new Queue(QUEUE_ORDER,true,false,false,map);
    }
    //该队列接收死信交换机转发过来的消息
    @Bean(QUEUE_DELAY)
    public Queue queueDelay(){
        return new Queue(QUEUE_DELAY,true);
    }
    @Bean
    public Binding queueOrderBinding(){
        return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
    }
    @Bean
    public Binding queueDelayBinding(){
        return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
    }
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

第四部,写一个消息发送者

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MQApplication.class)
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate ;

   

    @Test
    public void sendDelayMessage() throws InterruptedException {
        System.out.println("发送消息:我是一个延迟消息,开始时间:"+System.currentTimeMillis());
        rabbitTemplate.convertAndSend(
                MQConfig.EXCHNAGE_DELAY,
                MQConfig.ROUTINGKEY_QUEUE_ORDER,
                "我是一个延迟消息"
        );

        Thread.sleep(20000);
    }
}

第五步,写一个消费者

@Component
public class Consumer {

    @RabbitListener(queues = MQConfig.QUEUE_DELAY)
    public void handler(String message){
        System.out.println("收到消息:"+message+",结束时间:"+System.currentTimeMillis());
    }
}

第六步,测试效果

  • 生产者执行后,观察MQ,QUEUE_ORDER中有消息 在这里插入图片描述

  • 等待10s之后,消息进入QUEUE_DELAY队列在这里插入图片描述

  • 控制台打印效果

Producer:   发送消息:我是一个延迟消息,开始时间:1606295976347
Consumer: 收到消息:我是一个延迟消息,结束时间:1606295986418

发送消息到收到消息的时间差为 10071 , 忽略网络开销,延迟时间差不多就是我们设置的TTL时间

文章结束,希望对你有所帮助

关注
打赏
1651329177
查看更多评论
立即登录/注册

微信扫码登录

0.0742s