您当前的位置: 首页 >  spring

10 Spring集成MQTT(生产者与消费者)

杨林伟 发布时间:2019-09-04 11:42:49 ,浏览量:2

代码已上传至我的Github仓库:https://github.com/ylw-github/Spring-MQTT-Demo.git

整个代码的结构: 在这里插入图片描述

生产者(producer)

依赖:



    4.0.0
    
        com.ylw
        spring-mqtt-demo
        1.0-SNAPSHOT
    

    producer

    
        
            org.springframework.integration
            spring-integration-core
            4.3.9.RELEASE
        
        
            org.springframework.integration
            spring-integration-mqtt
            4.3.9.RELEASE
        
        
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.2.0
        

    


配置文件(mqtt_producer.xml):




    
        
        
        
            
                tcp://127.0.0.1:1883
            
        
    

    
        
        
    



单元测试类(ProducerTestUnit.java):

package com.ylw;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_producer.xml")
public class ProducerTestUnit {
    @Autowired
    private MqttPahoMessageHandler mqtt;

    @Test
    public void sendTextQueue() {
        sendMqttMsg("testTopic", 2, "hello world send...");

    }

    /**
     * @param topicName 主题名字
     * @param message   发送的消息
     * @author YangLinWei
     * @date 2019/9/4  10:17
     * @qos 请求服务质量,0:至多一次,1:至少一次,2:刚好一次
     */
    private void sendMqttMsg(String topicName, int qos, String message) {
        Message messages = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topicName)
                .setHeader(MqttHeaders.QOS, qos)
                .setHeader(MqttHeaders.RETAINED, true).build();

        mqtt.handleMessage(messages);
    }
}
消费者(consumer):

依赖:



    
        spring-mqtt-demo
        com.ylw
        1.0-SNAPSHOT
    
    4.0.0

    consumer
    
        
            org.springframework.integration
            spring-integration-core
            4.3.9.RELEASE
        
        
            org.springframework.integration
            spring-integration-mqtt
            4.3.9.RELEASE
        
        
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.2.0
        
    



配置文件(mqtt_consumer.xml):





    

    
        
        
        
        
        
    

    
        
        
        
    



单元测试类(ConsumerTestUnit.java):

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_consumer.xml")
public class ConsumerTestUnit {

    @Autowired
    MqttClient client;

    @Autowired
    private MqttConnectOptions options;

    @Test
    public void testQueue() {
        try {
            client.connect(options);
            client.subscribe("testTopic", 2);
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息监听类(MqttReciever.java):

package com.ylw;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttReciever implements MqttCallback {
    @Override
    public void connectionLost(Throwable throwable) {

    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Client 接收消息主题 : " + topic);
        System.out.println("Client 接收消息Qos : " + message.getQos());
        System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 2浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.2805s