您当前的位置: 首页 >  分布式

52分布式电商项目 - ActiveMQ例子

杨林伟 发布时间:2019-07-13 16:18:35 ,浏览量:3

代码已上传值Github 地址:https://github.com/ylw-github/ActiveMQ-Demo.git

点对点模式

点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 activemq 服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

消息生产者

创建工程 jmsDemo ,引入依赖

 
            org.apache.activemq
            activemq-all
            5.11.2
    

创建类 QueueProducer main 方法代码如下:

/**
 * =======================================================
 *
 * @desc: 消息生产者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueProducer
 * @date: 2019/7/13 15:17
 * 

* ======================================================= */ public class QueueProducer { public static void main(String[] args) { Connection connection = null; Session session = null; MessageProducer producer = null; try { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.获取连接 connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式) session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 producer = session.createProducer(queue); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界"); //8.发送消息 producer.send(textMessage); } catch (Exception e) { e.printStackTrace(); } finally { //9.关闭资源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }

上述代码中第 4 步创建 session 的两个参数: 第 1 个参数 是否使用事务 第 2 个参数 消息的确认模式

  • AUTO_ACKNOWLEDGE = 1 自动确认
  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  • SESSION_TRANSACTED = 0 事务提交并确认

运行后通过 ActiveMQ 管理界面查询 在这里插入图片描述

消息消费者
/**
 * =======================================================
 *
 * @desc: 消息消费者
 * @version: V1.0
 * @FileName: com.pyg.p2p QueueConsumer1
 * @date: 2019/7/13 15:17
 * 

* ======================================================= */ public class QueueConsumer2 { public static void main(String[] args) { Connection connection; Session session; MessageConsumer consumer; try { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); //2.获取连接 connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式) session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 consumer = session.createConsumer(queue); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到消息:" + textMessage.getText()); // 8.等待键盘输入 System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { //9.关闭资源 if (consumer != null) { try { consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }); } catch (Exception e) { e.printStackTrace(); } } }

运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有 一个消费者会接收到消息。

发布/订阅模式 消息生产者

创建类 TopicProducer ,main 方法代码如下:

public class TopicProducer {


    public static void main(String[] args) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new
                    ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
            //8.发送消息
            producer.send(textMessage);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //9.关闭资源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {

                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

消息消费者

创建类 TopicConsumer ,main 方法代码如下:

public class TopicConsumer1 {


    public static void main(String[] args) {


        try {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取
            // session (参数 1:是否启动事务,参数 2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            // Queue queue = session.createQueue("test-queue");
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收到消息:" + textMessage.getText());
                        // 8.等待键盘输入
                        System.in.read();
                    } catch (Exception e) {

                    } finally {
                        //9.关闭资源
                        if (consumer != null) {
                            try {
                                consumer.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行测试

同时开启 2 个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 3浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0515s