RabbitMQ 之 优先级队列
1. 使用场景
- 1. 使用场景
- 2. 实战
- 2.1 设置优先级队列
- 2.2 消息生产者
- 2.3 消息消费者
- 2.4 验证结果
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款,那么就会给用户推送一条短信提醒,很简单的一个功能对吧?但是,天猫商家对我们来说肯定要分大客户和小客户的对吧,比如像苹果、小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 Redis 来存放的定时轮询,大家都知道 Redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
2. 实战 2.1 设置优先级队列package com.wpp.rabbitmq_csdn.priority;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author wpp
* @Date 2021/09/28/15:01
* @Description
*/
@Configuration
public class PriorityQueueConfig {
public static final String EXCHANGE_NAME = "X";
public static final String QUEUE_NAME = "Q";
@Bean("exchange")
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean("queue")
public Queue queue() {
Map args = new HashMap();
// 最大优先级为 10
args.put("x-max-priority", 10);
return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("queue") Queue queue, @Qualifier("exchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("XQ");
}
}
2.2 消息生产者
package com.wpp.rabbitmq_csdn.priority;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author wpp
* @Date 2022/08/30/14:11
* @Description
*/
@Slf4j
@RestController
public class PriorityProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping(value = "sendPriorityMessage")
public void sendPriorityMessage() {
for(int i = 1;i {
correlationData.getMessageProperties().setPriority(5);
return correlationData;
});
} else {
rabbitTemplate.convertAndSend("X", "XQ",message);
}
}
}
}
2.3 消息消费者
注意:消费者代码我们需要写在另外一个项目中,因为测试的时候需要先不启动消费者
package com.wpp.rabbitmq_csdn.priority;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Author wpp
* @Date 2022/08/30/14:10
* @Description
*/
@Slf4j
@Component
public class PriorityConsumer {
@RabbitListener(queues = "Q")
public void receiveD(Channel channel, Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到的消息{}", new Date().toString(), msg);
}
}
2.4 验证结果
我们先启动生产者和队列所在项目,给消息在队列里一个排序的时间,然后再启动消费者,结果如下: