安装主程序
scoop 安装页面:https://scoop.sh/
scoop bucket add extras # 安装扩展库
scoop install RabbitMQ
安装服务
管理员身份运行
rabbitmq-service install
启动服务
管理员身份运行
rabbitmq-service start #启动服务
rabbitmq-service stop #停止服务
下载安装插件
rabbitmq-plugins enable rabbitmq_management
管理后台 地址:http://127.0.0.1:15672/使用用户名/密码(guest/guest)
下载插延迟队列插件
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
文件放到插件目录中:
%UserProFile%\scoop\apps\RabbitMQ\3.10.5\plugins
执行安装命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 启用
# rabbitmq-plugins disable rabbitmq_delayed_message_exchange # 禁用
重启服务
安装 python 包
pip install Pika
conda activate base
pip install Pika
代码调用
发送端 send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test-exchange') #创建队列
#创建 暂存区
channel.exchange_declare(exchange='test-exchange',
exchange_type='x-delayed-message',
arguments={"x-delayed-type": "direct"})
channel.queue_bind(exchange='test-exchange', queue='test-exchange', routing_key='test-exchange') # 队列 与 暂存区 绑定
# 发送消息
channel.basic_publish(exchange='test-exchange',
routing_key='test-exchange',
properties=pika.BasicProperties(
headers={'x-delay': 6000} # 设置4000毫秒延迟
),
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close() # 关闭链接
接收端 receive.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# channel.queue_declare(queue='test-exchange') 防止空调用,建议先执行 发送
#接收回调
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
print(" [x] Received %r" % str(properties))
# 接收消息
channel.basic_consume(queue='test-exchange',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
参考:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
https://juejin.cn/post/7095011255085662244
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://www.cloudamqp.com/blog/what-is-a-delayed-message-exchange-in-rabbitmq.html