一、生产者
1.1 pom
4.0.0
com.baizhi
rabbitmq_springboot
pom
0.0.1-SNAPSHOT
rabbitmq-producer
rabbitmq-consumer
rabbitmq_springboot
Demo project for Spring Boot
1.8
UTF-8
UTF-8
2.1.14.RELEASE
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-logging
org.springframework.amqp
spring-rabbit-test
test
org.springframework.boot
spring-boot-dependencies
${spring-boot.version}
pom
import
org.apache.maven.plugins
maven-compiler-plugin
1.8
1.8
UTF-8
org.springframework.boot
spring-boot-maven-plugin
1.2 yaml
server:
port: 44000
spring:
application:
name: rabbitmq-producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
1.3 主启动
package com.best;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Jiang Akang
* employeeId: BG435424
* @date 2020/11/20
**/
@SpringBootApplication
public class RabbitMqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqProducerApplication.class, args);
}
}
1.4 配置类
package com.best.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Jiang Akang
* employeeId: BG435424
* @date 2020/11/20
**/
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORMA_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明QUEUE_INFORMA_EMAIL队列
@Bean(QUEUE_INFORMA_EMAIL)
public Queue QUEUE_INFORMA_EMAIL() {
return new Queue(QUEUE_INFORMA_EMAIL);
}
//声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
return new Queue(QUEUE_INFORM_SMS);
}
//@Qualifier根据名称注入bean
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
//.noargs绑定的时候不需要指定参数
@Bean
public Binding BINGDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORMA_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding BIDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
1.5 测试类
package com.best;
import com.best.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Jiang Akang
* employeeId: BG435424
* @date 2020/11/20
**/
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
//使用rabbitTemplate发送消息
@Test
public void testSendEmail() {
//匹配ROUTINGKEY_EMAIL路由键,发送消息到ROUTINGKEY_EMAIL队列
String message = "send email message to ROUTINGKEY_EMAIL";
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message);
}
//使用rabbitTemplate发送消息
@Test
public void testSendEmail1() {
//匹配ROUTINGKEY_SMS路由键,发送消息到ROUTINGKEY_SMS队列
String message = "send email message to ROUTINGKEY_SMS";
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms", message);
}
}
二、消费者
1.1 yaml
server:
port: 44001
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
1.2 主启动
package com.best;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Jiang Akang
* employeeId: BG435424
* @date 2020/11/20
**/
@SpringBootApplication
public class RabbitMqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqConsumerApplication.class, args);
}
}
1.3 配置类
package com.best.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Jiang Akang
* employeeId: BG435424
* @date 2020/11/20
**/
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORMA_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//声明QUEUE_INFORMA_EMAIL队列
@Bean(QUEUE_INFORMA_EMAIL)
public Queue QUEUE_INFORMA_EMAIL() {
return new Queue(QUEUE_INFORMA_EMAIL);
}
//声明QUEUE_INFORM_SMS队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
return new Queue(QUEUE_INFORM_SMS);
}
//@Qualifier根据名称注入bean
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
//.noargs绑定的时候不需要指定参数
@Bean
public Binding BINGDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORMA_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding BIDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
1.4 业务类
package com.best.rabbitmq;
import com.best.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author Jiang Akang
* employeeId: BG435424
* @date 2020/11/20
**/
@Component
public class ReceiveHandler {
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORMA_EMAIL})
public void send_email(String msg, Message message, Channel channel) {
System.out.println("receive message is: " + msg);
}
}
三、测试
运行生产者,往里面发送几条消息
打开rabbitMq: http://localhost:15672/#/queues
可以看到消息的内容
启动消费者,消费者监听着QUEUE_INFORMA_EMAIL队列消息,如果有消息,就会接受到
源码