代码已上传至我的Github仓库:https://github.com/ylw-github/Spring-MQTT-Demo.git
整个代码的结构:
依赖:
4.0.0
com.ylw
spring-mqtt-demo
1.0-SNAPSHOT
producer
org.springframework.integration
spring-integration-core
4.3.9.RELEASE
org.springframework.integration
spring-integration-mqtt
4.3.9.RELEASE
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.0
配置文件(mqtt_producer.xml):
tcp://127.0.0.1:1883
单元测试类(ProducerTestUnit.java):
package com.ylw;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_producer.xml")
public class ProducerTestUnit {
@Autowired
private MqttPahoMessageHandler mqtt;
@Test
public void sendTextQueue() {
sendMqttMsg("testTopic", 2, "hello world send...");
}
/**
* @param topicName 主题名字
* @param message 发送的消息
* @author YangLinWei
* @date 2019/9/4 10:17
* @qos 请求服务质量,0:至多一次,1:至少一次,2:刚好一次
*/
private void sendMqttMsg(String topicName, int qos, String message) {
Message messages = MessageBuilder.withPayload(message).setHeader(MqttHeaders.TOPIC, topicName)
.setHeader(MqttHeaders.QOS, qos)
.setHeader(MqttHeaders.RETAINED, true).build();
mqtt.handleMessage(messages);
}
}
消费者(consumer):
依赖:
spring-mqtt-demo
com.ylw
1.0-SNAPSHOT
4.0.0
consumer
org.springframework.integration
spring-integration-core
4.3.9.RELEASE
org.springframework.integration
spring-integration-mqtt
4.3.9.RELEASE
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.0
配置文件(mqtt_consumer.xml):
单元测试类(ConsumerTestUnit.java):
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:mqtt_consumer.xml")
public class ConsumerTestUnit {
@Autowired
MqttClient client;
@Autowired
private MqttConnectOptions options;
@Test
public void testQueue() {
try {
client.connect(options);
client.subscribe("testTopic", 2);
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息监听类(MqttReciever.java):
package com.ylw;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttReciever implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Client 接收消息主题 : " + topic);
System.out.println("Client 接收消息Qos : " + message.getQos());
System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}