您当前的位置: 首页 > 

墨家巨子@俏如来

暂无认证

  • 0浏览

    0关注

    188博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

三.RocketMQ极简入门-RocketMQ普通消息发送

墨家巨子@俏如来 发布时间:2021-09-12 10:04:38 ,浏览量:0

前言

RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消息,批量发送,消息过滤等等。本篇文章来探讨一下 普通消息的发送 在这里插入图片描述

文章目录
    • 前言
    • 普通消息发送
      • 同步发送
      • 异步发送
      • 单向发送
      • 消费者案例
      • 总结

普通消息发送

普通消息这里介绍三种发送方式,同步发送,异步发送,单向发送。我们先导入需要的依赖,版本尽量和RocketMQ的安装版本一致。


     
         org.apache.rocketmq
         rocketmq-client
         4.8.0
     
 

【注意】请保持 RocketMQ和Name Server 是启动状态,见《RocketMQ安装》`

同步发送

同步消息是发送者发送消息,需要等待结果的返回,才能继续发送第二条消息,这是一种阻塞式模型,虽然消息可靠性高,但是阻塞导致性能低下。API : SendResult result = producer.send(message); 发送者代码示例:

public class Producer {

    //演示消息同步发送
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        //生产者
        DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup");
        //设置name server地址
        producer.setNamesrvAddr("127.0.0.1:9876");
		//设置队列数量为2,默认为4,根据情况设置
        producer.setDefaultTopicQueueNums(2); 
        //启动
        producer.start();
		//发16个消息
        for (int i = 0 ; i  SendCallback 回调来接收发送的结果,回调中包括了:onSuccess 和 onException两个回调方法来表示成功和失败。

单向发送

这种方式指的是发送者发送消息后无需等待Broker的结果返回,Broker也不会返回结果,消息是单向的,该方式性能最高,但是消息可靠性低。API : producer.sendOneway(message) 示例代码:

//. . .上面案例一样,部分代码省略. . .
Message message = new Message("oneway-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8));
producer.sendOneway(message);

通过 :producer.sendOneway(message)来发送消息是没有返回结果的,也无需任何等待,性能是最高的,但是数据的安全性最低,所以对于一些可被丢失的消息,比如:操作日志等就可以使用这种模式了。

消费者案例

下面是消费者端的代码

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //创建消费者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("syn-consumerGroup");
        //设置name server 地址
        defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
        //从开始位置消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //广播模式
        defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
        //订阅
        defaultMQPushConsumer.subscribe("syn-topic","sms");
		//注册消息监听器
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                list.forEach(message->{
                    System.out.println(new String(message.getBody(), CharsetUtil.UTF_8));
                });

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        defaultMQPushConsumer.start();
    }
}

解释一下相关的类

  • DefaultMQPushConsumer : 消费者组,基于push模式

  • defaultMQPushConsumer.setNamesrvAddr(“127.0.0.1:9876”) : name server地址

  • defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET):从哪个位置开始消费,FIRST代表最前面

  • defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING) : 消息的消费模式广播模式,默认是 MessageModel.CLUSTERING 集群 模式。

  • defaultMQPushConsumer.subscribe(“syn-topic”,“sms”) :订阅哪个Topic中的哪个Tags中的消息

  • defaultMQPushConsumer.registerMessageListener :注册消息监听并处理消息,通常支持 MessageListenerConcurrently 并发和 MessageListenerOrderly 顺序 两种监听器。当监听器监听到消息,通过回调监听器中的 consumeMessage 方法来传递和处理消息。

    1. List list : 消息列表 MessageExt中包含了消息的Body,消息的storeSzie,queueId等信息
    2. ConsumeConcurrentlyContext : 消费者上下文
    3. ConsumeConcurrentlyStatus :消息应答(签收),包括 CONSUME_SUCCESS 消费成功和 RECONSUME_LATER 消费失败两种结果。
  • defaultMQPushConsumer.start() :启动消费者,这个代码要写在注册了监听器的后面。

总结

下面对三种发送方式做一个对比

  • 可靠性最高: 同步发送 > 异步发送 > 单向发送
  • 性能最高:单向发送 > 异步发送 > 同步发送

使用场景建议如下

  • 如果是比较重要的不可丢失的消息,且对时效性要去不高建议使用同步发送,如转账消息
  • 如果是不重要的可失败的消息,比如日志消息,建议使用单向发送
  • 如果对时效性要求比较高,且消息不能丢失,可以尝试使用异步发送

文章到这就结束了,点赞还是要求一下的,万一屏幕面前的大帅哥,或者大漂亮一不小心就一键三连了啦,那我就是熬夜到头发掉光,也出下章。

关注
打赏
1651329177
查看更多评论
0.0780s