摘要
主要是为讲述RabbitMQ与python的连接。不过这个MQ是一个docker的镜像。
RabbitMQ在Docker中的下载拉取镜像
docker pull rabbitmq:3-management
启动镜像(默认用户名密码),默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
启动镜像(设置用户名密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:3-management
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: 1.0
@author: xjl
@file: send_message_mq.py
@time: 2021/9/16 22:04
"""
import json
import pika
import datetime
# 生成消息入口处
def get_message():
for i in range(10): # 生成10条消息
message = json.dumps(
{'id': "10000%s" % i, "amount": 100 * i, "name": "tony", "createtime": str(datetime.datetime.now())})
producter(message)
# 消息生产者
def producter(message): # 消息生产者
# 获取与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.25.128', port=5672, credentials=pika.PlainCredentials('guest', 'guest')))
# 创建一个 AMQP 信道(Channel),建造一个大邮箱,隶属于这家邮局的邮箱
channel = connection.channel()
# 声明消息队列tester,消息将在这个队列传递,如不存在,则创建
channel.queue_declare(queue='tester')
# 向队列插入数值 routing_key的队列名为tester,body 就是放入的消息内容,exchange指定消息在哪个队列传递,这里是空的exchange但仍然能够发送消息到队列中,因为我们使用的是我们定义的空字符串“”exchange(默认的exchange)
channel.basic_publish(exchange='', routing_key='tester', body=message)
# 关闭连接
connection.close()
if __name__ == "__main__":
get_message() # 程序执行入口
Python消费MQ中数据
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@version: 1.0
@author: xjl
@file: comuse_message.py
@time: 2021/9/16 23:05
"""
import pika
import time
# 远程rabbitmq服务的配置信息
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.25.128', '5672', '/', credentials))
channel = connection.channel()
# 消费成功的回调函数
def callback(ch, method, properties, body):
print(" [%s] Received %r" % (time.time(), body))
# 开始依次消费balance队列中的消息
channel.basic_consume(queue='tester', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 启动消费