- 1. 消息有效期
- 1.1 默认情况
- 1.2 关于TTL
- 1.1.1 单条消息过期示例
- 1.1.2 消息队列过期示例
- 1.3 死信队列
- 1.3.1 死信交换机
- 1.3.2 死信队列
- 1.3.3 代码示例
- 2. 消息发送可靠性
- 2.1 RabbitMQ 消息发送机制
- 2.2 确认消息成功方法
- 2.2.1 开启事务机制
- 2.2.2 发送方确认机制
- 2.3 失败重试
- 2.3.1 自带重试机制
- 2.3.2 消息发送失败重试
- 3. 消息消费可靠性
- 3.1 消息消费两种模式
- 3.2 消息消费成功方法
- 3.3 消息拒绝
- 3.4 消息确认
- 3.4.1 自动确认
- 3.4.2 手动确认
- 3.4.2.1 推模式手动确认
- 3.4.2.2 拉模式手动确认
- 3.5 幂等性问题
你知道RabbitMQ
如何保证消息可靠性吗?这是面试时的一个问题,下面就来了解下RabbitMQ
中,如何保证消息的可靠性。先从消息有效期开始吧~
在RabbitMQ
中,默认情况下,如果在发送消息时,不设置消息过期相关参数,那么消息是永不过期的,即使消息没有被消费掉,消息也会一直存储在队列中
TTL
(Time-To-Live
),表示消息存活时间,即消息的有效期
可以通过给消息设置 TTL
让消息存活一定的时间,如果消息存活时间超过了 TTL
并且还没有被消息,那么消息就会变成死信,那什么是死信呢?带着疑问继续
在RabbitMQ
中,有两种方式给消息设置TTL
:
- 方式一:在声明队列时,可以给队列设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期
- 方式二:在发送消息时,设置消息的有效期,这样不同的消息就具有不同的有效期
- 如果两个都设置了,以时间短的为准
当给消息设置有效期后,如果消息过期没有被消费就会被从队列中删除,进入到死信队列,但是这两种方式对应的删除时机有点差异:
- 方式一:当为消息队列设置过期时间时,消息过期了就会被删除,因为消息进入
RabbitMQ
后是存在一个消息队列中,队列的头部是最早要过期的消息,所以RabbitMQ
只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除 - 方式二:当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会被删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除
单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可
配置文件 application.properties
:
server.port=8889
spring.rabbitmq.host=192.168.43.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
配置类TTLRabbitMQConfig
:
@Configuration
public class TTLRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_TTL_EXCHANGE_NAME = "scorpios_ttl_exchange_name";
// 发送队列名称
public static final String SCORPIOS_TTL_MSG_QUEUE = "scorpios_ttl_msg_queue";
@Bean
DirectExchange directExchange(){
return new DirectExchange(TTLRabbitMQConfig.SCORPIOS_TTL_EXCHANGE_NAME,true,false);
}
/**
* 第一个参数是消息队列的名字
* 第二个参数表示消息是否持久化
* 第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他
* 第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列
* @return
*/
@Bean
Queue queue() {
return new Queue(TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,true,false,false);
}
@Bean
Binding bindingMsg(){
return BindingBuilder.bind(queue()).to(directExchange()).with(TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE);
}
}
在创建队列时,第三个从参数表示排他性,如果设置为 true
,则该消息队列只有创建它的 Connection
才能访问,其他的 Connection
都不能访问该消息队列,如果试图在不同的连接中重新声明或者访问排他性队列,那么系统会报一个资源被锁定的错误。另一方面,对于排他性队列而言,当连接断掉的时候,该消息队列也会自动删除,无论该队列是否被声明为持久性队列都会被删除
消息发送着:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
// 创建消息对象
Message message = MessageBuilder.withBody("message set ttl ...".getBytes(StandardCharsets.UTF_8))
.setExpiration("20000")
.build();
rabbitTemplate.convertAndSend(TTLRabbitMQConfig.SCORPIOS_TTL_EXCHANGE_NAME,TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,message);
return "success";
}
}
在创建 Message
对象时,设置消息过期时间,这里设置消息过期时间为 20 秒
启动项目,在浏览器中输入:http://localhost:8889/send/message
当消息发送成功之后,由于没有消费者,所以这条消息并不会被消费,打开 RabbitMQ Web
管理页面,查看 Queues
选项卡,20
秒之后,会发现消息会被删除
一旦给消息队列设置消息过期时间,所有进入到该队列的消息都有一个相同的过期时间,下面来看看给消息队列设置过期时间
配置文件同上,在创建队列时,给队列设置消息过期时间,如下:
/**
* 第一个参数是消息队列的名字
* 第二个参数表示消息是否持久化
* 第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他
* 第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列
* 第五个参数表示给队列设置参数
* @return
*/
@Bean
Queue queue() {
Map setting = new HashMap();
setting.put("x-message-ttl", 20000);
return new Queue(TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,true,false,false,setting);
}
消息发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
// 创建消息对象
Message message = MessageBuilder.withBody("message set ttl ...".getBytes(StandardCharsets.UTF_8))
.build();
rabbitTemplate.convertAndSend(TTLRabbitMQConfig.SCORPIOS_TTL_EXCHANGE_NAME,TTLRabbitMQConfig.SCORPIOS_TTL_MSG_QUEUE,message);
return "success";
}
}
在创建消息时,不要再为消息设置过期时间,启动项目,在浏览器中输入:http://localhost:8889/send/message
打开 RabbitMQ Web
管理页面,查看 Queues
选项卡,可以看到,消息队列的 Features
属性为 D
和 TTL
,D 表示消息队列中消息持久化,TTL
则表示消息会过期,20
秒后刷新RabbitMQ Web
页面,发现消息数量已经恢复为 0
如果把消息的过期时间设置为0
呢?是什么意思呢?
这表示如果消息不能立马消费则会被立即丢掉
1.3 死信队列在上文提到,如果消息的过期时间到了,消息就会被删除,那么被删除的消息去哪了?真的被删除了吗?
1.3.1 死信交换机死信交换机,Dead-Letter-Exchange
即 DLX
。死信交换机用来接收死信消息(Dead Message
)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
- 消息被拒绝(
Basic.Reject/Basic.Nack
) ,并且设置requeue
参数为false
- 消息过期
- 队列达到最大长度
当消息在队列中变成了死信消息后,此时就会被发送到 DLX
,绑定 DLX
的消息队列则称为死信队列
DLX
本质上也是一个普普通通的交换机,可以为任意队列指定 DLX
,当该队列中存在死信消息时,RabbitMQ
就会自动的将这个死信消息发布到 DLX
上去,进而被路由到另一个绑定了 DLX
的队列上,即死信队列。
绑定了死信交换机的队列就是死信队列
1.3.3 代码示例配置文件 application.properties
:
server.port=8889
spring.rabbitmq.host=192.168.43.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
配置类:定义死信交换机和死信队列和普通的交换机、队列没啥区别
@Configuration
public class DLXRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_DLX_EXCHANGE = "scorpios_dlx_exchange";
// 发送队列名称
public static final String SCORPIOS_DLX_QUEUE = "scorpios_dlx_queue";
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
@Bean
DirectExchange dlxDirectExchange(){
return new DirectExchange(DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE,true,false);
}
@Bean
Queue dlxQueue() {
return new Queue(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE,true,false,false);
}
// 将死信队列和死信交换机绑定
@Bean
Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE);
}
// 创建一个普通队列,并配置死信交换机
@Bean
Queue msgQueue() {
Map setting = new HashMap();
// 设置死信交换机
setting.put("x-dead-letter-exchange", DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE);
// 设置死信 routing_key 与队列名称相同
setting.put("x-dead-letter-routing-key", DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE);
return new Queue(DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE, true, false, false, setting);
}
}
上面需要注意的是,定义一个普通队列,然后配置死信交换机,配置死信交换机有两个参数:
x-dead-letter-exchange
:配置死信交换机x-dead-letter-routing-key
:配置死信routing_key
如果发送到这个消息队列上的消息,发生了 Basic.Reject/Basic.Nack
或者过期等问题,就会被发送到 DLX
上,进入到与 DLX
绑定的消息队列上。
如果为死信队列配置消费者,那么这条消息最终会被死信队列的消费者所消费。死信消息队列消费者:
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = DLXRabbitMQConfig.SCORPIOS_DLX_QUEUE)
public void dlxConsume(String msg) {
log.info("死信队列收到的消息为:{}", msg);
}
}
消息发送者:
@Slf4j
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/message")
public String send() {
log.info("客户端发送消息");
// 创建消息对象
Message message = MessageBuilder.withBody("message set ttl ...".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000") // 5秒过期
.build();
rabbitTemplate.convertAndSend(DLXRabbitMQConfig.SCORPIOS_DLX_EXCHANGE,DLXRabbitMQConfig.SCORPIOS_MSG_QUEUE,message);
return "success";
}
}
启动程序,先查看RabbitMQ Web
客户端:
在浏览器中输入:http://localhost:8889/send/message
,查看控制台:
在发送消息时,设置了TTL
为5
秒,5
秒后消息没有被消费,则被死信队列的消费者消费
在 RabbitMQ
中,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue
(队列)中,再由不同的消费者(Consumer
)去消费
从消息发送者的角度考虑消息可靠性,主要从两方面:
- 消息成功到达
Exchange
- 消息成功到达
Queue
如果这两步都没问题,那么就可以认为消息是发送成功,如果这两步中任何一步出问题,那么消息就没有成功送达
如何知道上面两步是否成功呢?可以引入确认机制,要确保消息成功发送,可以做以下三个步骤:
- 确认消息到达
Exchange
- 确认消息到达
Queue
- 开启定时任务,定时投递那些发送失败的消息
上面三个步骤,前两步 RabbitMQ
则有现成的解决方案,第三步需要自己实现。确保消息成功到达 RabbitMQ
,有两种方法:
- 开启事务机制
- 发送方确认机制
两种方法,不可以同时开启,只能选择其中之一
2.2.1 开启事务机制开启RabbitMQ
事务机制,需要先提供一个RabbitMQ
事务管理器,配置类如下:
@Configuration
public class TransactionRabbitMQConfig {
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 发送队列名称
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
@Bean
DirectExchange directExchange(){
return new DirectExchange(TransactionRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
}
@Bean
Queue queue() {
return new Queue(TransactionRabbitMQConfig.SCORPIOS_MSG_QUEUE,true,false,false);
}
@Bean
Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(TransactionRabbitMQConfig.SCORPIOS_MSG_QUEUE);
}
// 自定义RabbitMQ事务管理器
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
}
消息生产者需要添加事务注解并设置通信信道为事务模式:
@Service
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
// 添加事务注解
@Transactional
public void sendMessage(){
// 设置通信信道为事务模式
rabbitTemplate.setChannelTransacted(true);
// 创建消息对象
Message message = MessageBuilder.withBody("message transaction ...".getBytes(StandardCharsets.UTF_8))
.build();
rabbitTemplate.convertAndSend(TransactionRabbitMQConfig.SCORPIOS_EXCHANGE_NAME, TransactionRabbitMQConfig.SCORPIOS_MSG_QUEUE,message);
// 模拟异常发生
int i = 1/0;
}
}
下面做两种操作:
- 在方法的最后,人为制造一个运行时异常 ,运行程序,发现消息并未发送成功
- 把
@Transactional
注解和setChannelTransacted
方法删除,运行程序,发现即使发生运行时异常,消息依然能发送到RabbitMQ
开启事务机制的测试结果:抛异常,RabbitMQ Web
客户端没有收到消息
未开启事务机制的测试结果:抛异常,RabbitMQ Web
客户端收到消息
当RabbitMQ
开启事务之后,RabbitMQ
生产者发送消息会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式
- 服务端给出回复,同意将信道设置为事务模式
- 客户端发送消息
- 客户端提交事务
- 服务端给出响应,确认事务提交
上面的步骤,除了第三步是本来就有的,其他几个步骤都是开启事务之后多出来的,这样分析,是不是觉得事务模式效率有点低
下面来看一下另一种方法:消息确认机制(publisher confirm
)
其实发送方消息确认机制,在之前的文章中有用过,先看下面配置,是不是很眼熟:
server.port=8889
spring.rabbitmq.host=192.168.43.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 消息到达交换器的确认回调
spring.rabbitmq.publisher-confirm-type=correlated
# 开启消息到达队列的回调
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type
属性配置有三值:
none
:表示禁用发布确认模式,默认correlated
:表示成功发布消息到交换器后会触发的回调方法simple
:类似correlated
,并且支持waitForConfirms()
和waitForConfirmsOrDie()
方法的调用
在配置类中添加回调的监听,配置类如下:
@Slf4j
@Configuration
public class ConfirmRabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback, InitializingBean {
@Autowired
RabbitTemplate rabbitTemplate;
// 交换机的名称
public static final String SCORPIOS_EXCHANGE_NAME = "scorpios_exchange_name";
// 发送队列名称
public static final String SCORPIOS_MSG_QUEUE = "scorpios_msg_queue";
@Bean
DirectExchange directExchange(){
return new DirectExchange(ConfirmRabbitMQConfig.SCORPIOS_EXCHANGE_NAME,true,false);
}
@Bean
Queue queue() {
return new Queue(ConfirmRabbitMQConfig.SCORPIOS_MSG_QUEUE,true,false,false);
}
@Bean
Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with(ConfirmRabbitMQConfig.SCORPIOS_MSG_QUEUE);
}
// 为RabbitTemplate绑定回调
@Override
public void afterPropertiesSet() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
// 消息到达交换机时回调
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("{}:消息成功到达交换机",correlationData.getId());
}else{
log.error("{}:消息发送失败", correlationData.getId());
}
}
// 消息路由到队列失败时回调
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
}
}
下面验证两个回到函数是否会被执行,先把消息发到不存在的交换机上:
@GetMapping("/send/exchange")
public String sendExchange() {
String uuid = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend("TestExchange", ConfirmRabbitMQConfig.SCORPIOS_MSG_QUEUE,"message confirm callback ...".getBytes(StandardCharsets.UTF_8),new CorrelationData(uuid));
return "success";
}
注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台日志:
再把消息发到不存在的队列上:
@GetMapping("/send/queue")
public String sendQueue() {
String uuid = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend(ConfirmRabbitMQConfig.SCORPIOS_EXCHANGE_NAME, "TestQueue","message confirm callback ...".getBytes(StandardCharsets.UTF_8),new CorrelationData(uuid));
return "success";
}
可以看到,消息虽然成功达到交换器了,但是没有成功路由到队列,因为队列不存在,控制台日志:
如果是消息批量处理,那么发送成功的回调监听是一样的,这就是 publisher-confirm
模式
相比于事务,这种模式下的消息吞吐量会得到极大的提升
2.3 失败重试失败重试有两种情况,一种是没找到 RabbitMQ
导致的失败重试,一种是找到 RabbitMQ
但是消息发送失败重试
对于第一种失败重试,就是发送方连不上RabbitMQ
,这种情况很好理解,只要你把RabbitMQ
的连接地址写错,启动项目,控制台就会一直报重连日志,这个重试机制和 RabbitMQ
本身没有关系,是利用 Spring 中的 retry 机制来完成的,可做如下配置:
# 开启重试机制
spring.rabbitmq.template.retry.enabled=true
# 重试起始间隔时间
spring.rabbitmq.template.retry.initial-interval=1000ms
# 最大重试次数
spring.rabbitmq.template.retry.max-attempts=10
# 最大重试间隔时间
spring.rabbitmq.template.retry.max-interval=10000ms
# 间隔时间乘数,此处为2,表示第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推
spring.rabbitmq.template.retry.multiplier=2
2.3.2 消息发送失败重试
对于第二种失败重试,消息发送失败重试主要是针对消息没有到达交换器的情况
如果消息没有成功到达交换器,可以用RabbitTemplate.ConfirmCallback
来触发消息发送失败回调,在这个回调中,可以做很多事情,比如可以把失败消息存入数据库,写个定时任务,不断重试,也可以把所有消息都做记录。
具体场景具体分析吧~
3. 消息消费可靠性上述确保了消息发送的可靠性,但还是要考虑一个问题:消息消费的可靠性
3.1 消息消费两种模式RabbitMQ
消息消费,有两种方式:
- 推(
push
):MQ
主动将消息推送给消费者,此方式需要消费者设置一个缓冲区去缓存消息,对于消费者来说,内存中总是有一堆需要处理的消息,这种方式效率比较高,也是目前大多数应用采用的消费方式 - 拉(
pull
):消费者主动从MQ
拉取消息,这种方式效率并不是很高,不过有的时候如果消费者需要批量拉取消息,可以采用这种方式
对于MQ
推(push
)方式,上面的例子都是这一种,也就是通过 @RabbitListener
注解去标记消费者,当监听的队列中有消息时,就会触发该方法:
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE)
public void consume(String msg) {
log.info("收到的消息为:{} ",msg);
}
}
对于消费者拉(pull
)取消息方式,使用RabbitTemplate
中的方法:
public void consume() throws UnsupportedEncodingException {
// 从指定队列拉取消息
Object message = rabbitTemplate.receiveAndConvert(RabbitConfig.SCORPIOS_MSG_QUEUE);
log.info("message: {} " + new String(((byte[]) message),"UTF-8"));
}
receiveAndConvert
方法执行完后,会从 MQ
上拉取一条消息下来,如果该方法返回值为 null
,表示该队列上没有消息
receiveAndConvert
方法有一个重载方法,可以在重载方法中传入一个等待超时时间N
秒,如果队列中没有消息了,则 receiveAndConvert
方法会阻塞N
秒,N
秒内如果队列中有了新消息就返回,N
秒后如果队列中还是没有新消息,就返回 null
,这个等待超时时间要是不设置的话,默认为 0
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可
3.2 消息消费成功方法为了确保消息能被消费者成功消费,RabbitMQ
中提供了消息消费确认机制,当消费者去消费消息时,可以通过设置 autoAck
参数来表示消息消费的确认方式
- 当
autoAck
为false
时,即使消费者已经收到消息,RabbitMQ
也不会立即移除消息,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除 - 当
autoAck
为true
时,消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使消息并没有到达消费者
在 RabbitMQ Web
管理页面中:
Ready
表示待消费的消息数量Unacked
表示已经发送给消费者,但是还没收到消费者ack
的消息数量
当把 autoAck
设置为 false
时,对于 RabbitMQ
来说,消费消息分成了两个部分:
- 待消费的消息
- 已经投递给消费者,但是还没有被消费者确认的消息
换言之,当设置 autoAck
为 false
时,消费者将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack
, RabbitMQ
收到确认的ack
才会认为这条消息消费成功了。如果 RabbitMQ
一直没有收到消费者的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ
就会将刚刚的消息重新放回队列中,等待下一次被消费
确保消息被成功消费有两种方式:手动 Ack
或者自动 Ack
,无论哪一种,最终都有可能导致消息被重复消费,所以还需要在处理消息时,解决幂等性问题
当消费者接收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息:
@Component
public class Consumer {
@RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE)
public void consume(Channel channel, Message message) {
// 第一步:获取消息编号
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 第二步:拒绝消息
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
上面代码调用 basicReject
方法来拒绝消费消息,方法第二个参数为布尔值,表示是否将消息重新放入队列,为 true
,被拒绝的消息会重新进入到消息队列中,等待下一次被消费;为 false
,被拒绝的消息就会被丢掉,不会有新的消费者去消费它,需要注意的是,basicReject
方法一次只能拒绝一条消息
消息确认分为自动确认和手动确认
3.4.1 自动确认默认情况下,消息消费就是自动确认的,下面的消费方法已经出现过很多次了
@Slf4j
@Component
public class Consumer{
@RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE)
public void consume(String msg) {
log.info("收到的消息为:{}",msg);
int i = 1 / 0;
}
}
通过 @RabbitListener
注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了
手动确认又可以分为两种:推模式手动确认与拉模式手动确认
3.4.2.1 推模式手动确认开启手动确认,需要在配置文件中开启:
# 表示将消息的确认模式改为手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消息消费者:
@RabbitListener(queues = RabbitConfig.SCORPIOS_MSG_QUEUE)
public void consume(Message message,Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 消费消息
String s = new String(message.getBody());
log.info("收到的消息为:{} ",s);
// 消费完成后,手动 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//手动 nack
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
上面的代码进行了异常处理,如果消息正常消费成功,会执行 basicAck
方法完成确认
如果消息消费失败,会进入异常处理中,执行 basicNack
方法,告诉 RabbitMQ
消息消费失败
basicAck()
:表示手动确认消息已经成功消费,该方法有两个参数:参数一表示消息的id
,参数二multiple
如果为false
,表示仅确认当前消息消费成功,如果为true
,则表示当前消息之前所有未被当前消费者确认的消息都消费成功basicNack()
:表示当前消息未被成功消费,该方法有三个参数:前两个参数意义同上,第三个参数requeue
上面也解释过,被拒绝的消息是否重新入队
当 basicNack
中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题
拉模式手动 ack
比较麻烦,在Spring
中封装的 RabbitTemplate
中并未找到对应的方法,所以需要用原生方法:
public void consume() {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
long deliveryTag = 0L;
try {
GetResponse getResponse = channel.basicGet(RabbitConfig.SCORPIOS_MSG_QUEUE, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
log.info("接受到的消息为:{}",new String((getResponse.getBody()), "UTF-8"));
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
3.5 幂等性问题
上面说过,确保消息被成功消费有两种方式:手动 Ack
或者自动 Ack
,但无论哪一种,最终都有可能导致消息被重复消费,这样就存在解决幂等性问题。
幂等性指的是多次操作,结果是一致的,比如多次操作数据库数据是一致的,幂等性是分布式环境下常见的问题。
消息被重复消费情况:
消费者在消费完一条消息后,向 RabbitMQ
发送一个 ack
确认,如果此时网络断开或者其他原因导致 RabbitMQ
并没有收到这个确认ack
,RabbitMQ
并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同样,消息在发送的时候,同一条消息也可能会发送两次。
幂等性问题基本上都可以从业务上来处理,常见的解决幂等性的方式有以下:
- 唯一索引:保证插入的数据只有一条,可以用
Redis
实现 Token
机制:每次接口请求前先获取一个token
,然后再下次请求的时候在请求的header
体中加上这个token
,后台进行验证,如果验证通过删除token
,下次请求再次判断token
- 悲观锁或者乐观锁:悲观锁可以保证每次
for update
时其他sql
无法update
数据(在数据库引擎是innodb
的时候,select
的条件必须是唯一索引,防止锁全表)【分布锁思路】 - 先查询后判断:首先通过查询数据库是否存在数据,如果存在证明已经请求过了,直接拒绝该请求,如果没有存在,就证明是第一次进来,直接放行
代码地址:https://github.com/Hofanking/springboot-rabbitmq-example