安装好RabbitMQ,guest用户仅限本地主机连接,对外连接的话,我们需要创建用户,并设置好 Virtual host,一个broker里可以添加多个vhost。
忘记配置Tags,无法在rabbitmq的UI界面登陆,但是使用没问题,这里修改配置Tags。
下面使用 charge用户来连接。传送门:RabbitMQ简介和交换机入门使用
二、 SpringBoot 整合RabbitMQ创建一个 SpringBoot 项目,引入依赖
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
application.yml配置文件配置RabbitMQ
server:
port: 8088
spring:
#配置rabbitMq 服务器
rabbitmq:
virtual-host: /charge
host: 127.0.0.1
port: 5672
username: charge
password: charge
1、Direct Exchange交换机
直连型交换机,消息携带的路由键与队列名要完全匹配。
1. 编写RabbitConfig配置类
声明什么样功能的队列,交换机,自己决定,我这里声明持久的。
import org.springframework.amqp.core.*;
@Configuration
public class RabbitDirectConfig {
public static final String DIRECT_QUEUE1 = "TestDirectQueue";
public static final String DIRECT_EXCHANGE1 = "TestDirectExchange";
public static final String DIRECT_ROUTING_KEY1 = "TestDirectRoutingKey";
//队列
@Bean
public Queue getDirectQueue1() {
// 这里声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
return new Queue(DIRECT_QUEUE1,true);
}
//交换机
@Bean
DirectExchange getDirectExchange1() {
return new DirectExchange(DIRECT_EXCHANGE1,true,false);
}
//绑定 将队列和交换机绑定,并设置路由KEY
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(getDirectQueue1()).to(getDirectExchange1()).with(DIRECT_ROUTING_KEY1);
}
}
2. 编写消息的消费者
@Component
@RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE1)
public class RabbitDirectReceiver {
@RabbitHandler
public void handle1(String message){ //参数为消息内容
System.out.println("RabbitDirectReceiver handle1 >>>>>>" + message);
}
}
3.编写消息的生产者,这里通过测试类来发送消息
@Test
public void testDirect(){
rabbitTemplate.convertAndSend(
RabbitDirectConfig.DIRECT_EXCHANGE1, // 交换
RabbitDirectConfig.DIRECT_ROUTING_KEY1, // 路由key
" testDirect message "); // 发送的消息内容
}
启动项目,运行测试方法,消费者可获得消息内容
2、Topic Exchange交换机
主题交换机,消息携带的路由键与队列名属于模糊匹配。
- * (星号) :表示一个单词 (必须出现的)
- # (井号) :表示任意数量(零个或多个)单词
1. 编写RabbitConfig配置类
@Configuration
public class RabbitTopicConfig {
public static final String TOPIC_QUEUE1 = "TestTopicQueue1";
public static final String TOPIC_QUEUE2 = "TestTopicQueue2";
public static final String TOPIC_QUEUE3 = "TestTopicQueue3";
public static final String TOPIC_EXCHANGE1 = "TestTopicExchange";
//队列
@Bean
public Queue getQueue1() {
return new Queue(TOPIC_QUEUE1,true);
}
@Bean
public Queue getQueue2() {
return new Queue(TOPIC_QUEUE2,true);
}
@Bean
public Queue getQueue3() {
return new Queue(TOPIC_QUEUE3,true);
}
//Topic交换机
@Bean
TopicExchange getExchange1() {
return new TopicExchange(TOPIC_EXCHANGE1,true,false);
}
//绑定 将队列和交换机绑定到路由KEY
@Bean
Binding binding1() {
return BindingBuilder.bind(getQueue1()).to(getExchange1()).with("one.topic.*");
}
@Bean
Binding binding2() {
return BindingBuilder.bind(getQueue2()).to(getExchange1()).with("*.two.topic");
}
@Bean
Binding binding3() {
return BindingBuilder.bind(getQueue3()).to(getExchange1()).with("#.topic.#");
}
}
2. 编写消息的消费者
@Component
public class RabbitTopicReceiver {
//监听的队列
@RabbitListener(queues = RabbitTopicConfig.TOPIC_QUEUE1)
public void handle1(String message){ //参数为消息内容
System.out.println(" RabbitTopicReceiver handle1 >>>>>>" + message);
}
@RabbitListener(queues = RabbitTopicConfig.TOPIC_QUEUE2)
public void handle2(String message){
System.out.println("RabbitTopicReceiver handle2 >>>>>>" + message);
}
@RabbitListener(queues = RabbitTopicConfig.TOPIC_QUEUE3)
public void handle3(String message){
System.out.println("RabbitTopicReceiver handle3 >>>>>>" + message);
}
}
3.编写消息的生产者,这里通过测试类来发送消息
@Test
public void testTopic() {
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_EXCHANGE1, "one.topic.dfasf", "testDirect message==one.topic.dfasf");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_EXCHANGE1, "sadas.two.topic", "testDirect message==sadas.two.topic");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_EXCHANGE1, "one.topic", "testDirect message==one.topic");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPIC_EXCHANGE1, "topic.two", "testDirect message==topic.two");
}
把消费者注释掉,启动项目,生产消息,看到 队列1有两条,说明 * 表示一个必须出现的单词 约束不对吗?队列3有四条正确。
把消费者注释拿到,启动项目,看到 队列1确实消费了有两条,开发中多使用 # 。
3、Fanout Exchange交换机
扇型交换机,这个交换机没有路由键概念,绑了绑定路由键无所谓。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
1. 编写RabbitConfig配置类
@Configuration
public class RabbitFanoutConfig {
public static final String FANOUT_QUEUE1 = "TestFanoutQueue1";
public static final String FANOUT_QUEUE2 = "TestFanoutQueue2";
public static final String FANOUT_EXCHANGE1 = "TestFanoutExchange";
//队列
@Bean
public Queue getFanoutQueue1() {
return new Queue(FANOUT_QUEUE1,true);
}
@Bean
public Queue getFanoutQueue2() {
return new Queue(FANOUT_QUEUE2,true);
}
//Fanout交换机
@Bean
FanoutExchange getFanoutExchange1() {
return new FanoutExchange(FANOUT_EXCHANGE1,true,false);
}
//绑定 将队列和交换机绑定
@Bean
Binding bindingFanout1() {
return BindingBuilder.bind(getFanoutQueue1()).to(getFanoutExchange1());
}
@Bean
Binding bindingFanout2() {
return BindingBuilder.bind(getFanoutQueue2()).to(getFanoutExchange1());
}
}
2. 编写消息的消费者
@Component
public class RabbitFanoutReceiver {
//监听的队列
@RabbitListener(queues = "TestFanoutQueue1")
public void handle1(String message){ //参数为消息内容
System.out.println(" RabbitFanoutReceiver handle1 >>>>>>" + message);
}
@RabbitListener(queues = "TestFanoutQueue2")
public void handle2(String message){
System.out.println("RabbitFanoutReceiver handle2 >>>>>>" + message);
}
}
3. 编写消息的生产者,这里通过测试类来发送消息
@Test
public void testFanout() {
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUT_EXCHANGE1, null, " testFanout message");
}
生产一个消息,绑定的所有队列都可以消费
4、Headers Exchange交换机
headers交换器允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差。使用较少。
1. 编写RabbitConfig配置类
@Configuration
public class RabbitHeadersConfig {
//队列
@Bean
public Queue getHeadersQueue1() {
return new Queue("Headers_QUEUE1",true);
}
@Bean
public Queue getHeadersQueue2() {
return new Queue("Headers_QUEUE2",true);
}
//交换机
@Bean
HeadersExchange getHeadersExchange1() {
return new HeadersExchange("Headers_EXCHANGE1",true,false);
}
//绑定,路由和规则自定义,这里简单使用下
@Bean
Binding bindingDirect1() {
Map map = new HashMap();
map.put("name", "Headers");
return BindingBuilder.bind(getHeadersQueue1()).to(getHeadersExchange1()).whereAny(map).match();
}
@Bean
Binding bindingDirect2() {
// 只要存在 Headers_2,就可路由到。
return BindingBuilder.bind(getHeadersQueue2()).to(getHeadersExchange1()).where("Headers_2").exists();
}
}
2. 编写消息的消费者
@Component
public class RabbitHeadersReceiver {
//监听的队列
@RabbitListener(queues = "Headers_QUEUE1")
public void handle1(byte[] msg){
String message = new String(msg, 0,msg.length);
System.out.println(" RabbitHeadersReceiver handle1 >>>>>>" + message);
}
@RabbitListener(queues = "Headers_QUEUE2")
public void handle2(byte[] msg){
String message = new String(msg, 0, msg.length);
System.out.println("RabbitHeadersReceiver handle2 >>>>>>" + message);
}
}
3. 编写消息的生产者,这里通过测试类来发送消息
@Test
public void testHeaders() {
Message queue1msg = MessageBuilder
.withBody("queue1msg sjd".getBytes()) // 消息内容
.setHeader("name", "Headers") // 请求头
.build();
rabbitTemplate.send("Headers_EXCHANGE1", null, queue1msg);
Message queue1msg11 = MessageBuilder
.withBody("queue1msg11s asaljlfjd".getBytes()) // 消息内容
.setHeader("name", "Headers123123") // 请求头不一致的话,路由不到消息
.build();
rabbitTemplate.send("Headers_EXCHANGE1", null, queue1msg11);
Message queue2msg = MessageBuilder
.withBody("queue2msg sda 1000".getBytes()) // 消息内容
.setHeader("Headers_2", "1dfs0") // 请求头
.build();
rabbitTemplate.send("Headers_EXCHANGE1", null, queue2msg);
}
queue1msg11 请求头不符合要求。
—— Stay Hungry. Stay Foolish. 求知若饥,虚心若愚。