您当前的位置: 首页 >  rabbitmq

消息中间件系列教程(09) -RabbitMQ -案例代码(发布订阅模式)

杨林伟 发布时间:2019-12-10 20:34:03 ,浏览量:3

引言

代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo

前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。

主要分为:

  1. 点对点队列模式(简单)
  2. 工作队列模式(公平性)
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics

本文主要讲解发布订阅模式。

发布订阅模式

在这里插入图片描述 功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。

思路解读:

  1. 一个生产者,多个消费者
  2. 每一个消费者都有自己的一个队列
  3. 生产者没有直接发消息到队列中,而是发送到交换机
  4. 每个消费者的队列都绑定到交换机上
  5. 消息通过交换机到达每个消费者的队列

该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列。注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。

1.用户发邮件案例讲解

1.新建Maven项目RabbitMQ-Demo

2.添加Maven依赖:


	
		com.rabbitmq
		amqp-client
		3.6.5
	

3.连接工具类

package com.ylw.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQConnecUtils {

    public static Connection newConnection() throws IOException, TimeoutException {
        // 1.定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 2.设置服务器地址
        factory.setHost("127.0.0.1");

        // 3.设置协议端口号
        factory.setPort(5672);

        // 4.设置vhost
        factory.setVirtualHost("OrderHost");

        // 5.设置用户名称
        factory.setUsername("OrderAdmin");

        // 6.设置用户密码
        factory.setPassword("123456");

        // 7.创建新的连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    }
}

1.1 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();

        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String msg = "fanout_exchange_msg";
        // 4.发送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        // System.out.println("生产者发送msg:" + msg);
        // // 5.关闭通道、连接
        // channel.close();
        // connection.close();
        // 注意:如果消费没有绑定交换机和队列,则消息会丢失

    }
}

1.2 消费者

邮件消费者:

public class ConsumerEmailFanout {
    private static final String QUEUE_NAME = "consumerFanout_email";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

短信消费者:

public class ConsumerSMSFanout {
    private static final String QUEUE_NAME = "ConsumerFanout_sms";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建新的连接
        Connection connection = RabbitMQConnecUtils.newConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        // 3.消费者关联队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取生产者消息:" + msg);
            }
        };
        // 5.消费者监听队列消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}

3. 测试

启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange): 在这里插入图片描述 启动邮件消费者和短信消费者,在控制台可以看出有两个队列: 在这里插入图片描述 再启动生产者,可以看到消费者消费信息: 在这里插入图片描述 在这里插入图片描述

关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 3浏览

    0关注

    3279博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.3524s