您当前的位置: 首页 >  rabbitmq

庄小焱

暂无认证

  • 3浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

RabbitMQ——RabbitMQ工作模式实战

庄小焱 发布时间:2021-09-23 23:39:24 ,浏览量:3

摘要

主要是讲解Rabbit的实战代码和分析。主要的代码在:https://github.com/2462612540/Senior-Architect/tree/a_1.0/RabbitMQ

简答模式下项目结构

简单模式下生产者代码
package com.xjl.mq.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Classname Producer_HelloWorld
 * @Description TODO
 * @Date 2021/9/21 8:49
 * @Created by xjl
 */
public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.25.128");//默认值localhost
        factory.setPort(5672);//设置的端口号
        factory.setVirtualHost("/"); //默认值是 /
        factory.setUsername("guest");
        factory.setPassword("guest");

        //3.创建连接connection
        Connection connection = factory.newConnection();

        //4.创建channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        /**
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
         * queue 表示队列的名称
         * durable 表示的持久化 当MQ 重启之后还在
         * exclusive
         *      是否独占。只能有一个消费者监听这队列当connection关闭时,
         *      是否删除队列
         * autoDelete:是否自动删除。当没有consumer时,自动删除掉
         * arguments:参数。
         */
        //如果没有一个名字叫hello_worLd的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world", true, false, false, null);

        // 6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *   exchange: 交换机的名称简单模式下交换机会使用默认
         *   routingKey:路由的名称
         *   props:配置信息
         *   body:发送的真实消息数据
         *
         */
        String body = "hello xjl……";
        channel.basicPublish("", "hello_world", null, body.getBytes());

        //7 释放资源
        channel.close();
        connection.close();
    }
}
简单模式下消费者代码
package com.xjl.mq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Classname Producer_HelloWorld
 * @Description TODO
 * @Date 2021/9/21 8:49
 * @Created by xjl
 */
public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.25.128");//默认值localhost
        factory.setPort(5672);//设置的端口号
        factory.setVirtualHost("/"); //默认值是 /
        factory.setUsername("guest");
        factory.setPassword("guest");

        //3.创建连接connection
        Connection connection = factory.newConnection();

        //4.创建channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        /**
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
         * queue 表示队列的名称
         * durable 表示的持久化 当MQ 重启之后还在
         * exclusive
         *      是否独占。只能有一个消费者监听这队列当connection关闭时,
         *      是否删除队列
         * autoDelete:是否自动删除。当没有consumer时,自动删除掉
         * arguments:参数。
         */
        //如果没有一个名字叫hello_worLd的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("hello_world", true, false, false, null);

        // 6.接受消息
        /**
         * basicconsume ( String queue, boolean autoAck,Consumer callback)
         *   queue:队列的名称
         *   autoback:是否自动确认
         *   callback:回调对象
         *
         */
        DefaultConsumer Consumer = new DefaultConsumer(channel) {
            /**
             * @description 这是一个回调方法 收到消息后会自动的执行该方法
             * @param: consumerTag 标识
             * @param: envelope 获取一些信息 交换机的路由的key
             * @param: properties
             * @param: body
             * @date: 2021/9/21 9:24
             * @return: void
             * @author: xjl
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag" + consumerTag);
                System.out.println("getExchange" + envelope.getExchange());
                System.out.println("getRoutingKey" + envelope.getRoutingKey());
                System.out.println("properties" + properties);
                System.out.println("body" + new String(body));
            }
        };
        channel.basicConsume("hello_world", true, Consumer);

        //7 释放资源
        // 不要关闭消费者 如果是关闭的资源,在下一次消息来的时候还怎么样的实现
    }
}
WorkQueue模式 workqueue模式下的生产者代码
package com.xjl.mq.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Classname Producer_HelloWorld
 * @Description TODO
 * @Date 2021/9/21 8:49
 * @Created by xjl
 */
public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置参数
        factory.setHost("192.168.25.128");//默认值localhost
        factory.setPort(5672);//设置的端口号
        factory.setVirtualHost("/"); //默认值是 /
        factory.setUsername("guest");
        factory.setPassword("guest");

        //3.创建连接connection
        Connection connection = factory.newConnection();

        //4.创建channel
        Channel channel = connection.createChannel();

        //5.创建队列Queue
        /**
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
         * queue 表示队列的名称
         * durable 表示的持久化 当MQ 重启之后还在
         * exclusive
         *      是否独占。只能有一个消费者监听这队列当connection关闭时,
         *      是否删除队列
         * autoDelete:是否自动删除。当没有consumer时,自动删除掉
         * arguments:参数。
         */
        //如果没有一个名字叫hello_worLd的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("work_queues", true, false, false, null);

        // 6.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         *   exchange: 交换机的名称简单模式下交换机会使用默认
         *   routingKey:路由的名称
         *   props:配置信息
         *   body:发送的真实消息数据
         *
         */
        String message = "hello work_queue……";
        for (int i = 1; i             
关注
打赏
1657692713
查看更多评论
0.0411s