您当前的位置: 首页 >  rabbitmq

$驽马十驾$

暂无认证

  • 0浏览

    0关注

    31博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

RabbitMQ 之 优先级队列

$驽马十驾$ 发布时间:2022-09-23 16:19:06 ,浏览量:0

RabbitMQ 之 优先级队列
  • 1. 使用场景
  • 2. 实战
    • 2.1 设置优先级队列
    • 2.2 消息生产者
    • 2.3 消息消费者
    • 2.4 验证结果

1. 使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款,那么就会给用户推送一条短信提醒,很简单的一个功能对吧?但是,天猫商家对我们来说肯定要分大客户和小客户的对吧,比如像苹果、小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 Redis 来存放的定时轮询,大家都知道 Redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

2. 实战 2.1 设置优先级队列
package com.wpp.rabbitmq_csdn.priority;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author wpp
 * @Date 2021/09/28/15:01
 * @Description
 */
@Configuration
public class PriorityQueueConfig {

    public static final String EXCHANGE_NAME = "X";
    public static final String QUEUE_NAME = "Q";


    @Bean("exchange")
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }


    @Bean("queue")
    public Queue queue() {
        Map args = new HashMap();
        // 最大优先级为 10
        args.put("x-max-priority", 10);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
    }

    @Bean
    public Binding queueBindingExchange(@Qualifier("queue") Queue queue, @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XQ");
    }

}

2.2 消息生产者
package com.wpp.rabbitmq_csdn.priority;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author wpp
 * @Date 2022/08/30/14:11
 * @Description
 */
@Slf4j
@RestController
public class PriorityProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping(value = "sendPriorityMessage")
    public void sendPriorityMessage() {
        for(int i = 1;i  {
                    correlationData.getMessageProperties().setPriority(5);
                    return correlationData;
                });
            } else {
                rabbitTemplate.convertAndSend("X", "XQ",message);
            }
        }
    }

}

2.3 消息消费者

注意:消费者代码我们需要写在另外一个项目中,因为测试的时候需要先不启动消费者

package com.wpp.rabbitmq_csdn.priority;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;


/**
 * @Author wpp
 * @Date 2022/08/30/14:10
 * @Description
 */
@Slf4j
@Component
public class PriorityConsumer {

    @RabbitListener(queues = "Q")
    public void receiveD(Channel channel, Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到的消息{}", new Date().toString(), msg);

    }


}

2.4 验证结果

我们先启动生产者和队列所在项目,给消息在队列里一个排序的时间,然后再启动消费者,结果如下: 在这里插入图片描述

关注
打赏
1663377658
查看更多评论
立即登录/注册

微信扫码登录

0.0578s