python 环境下 RabbitMQ 延时队列插件安装使用示例

安装主程序

scoop 安装页面:https://scoop.sh/

scoop bucket add extras # 安装扩展库
scoop install RabbitMQ 

安装服务

管理员身份运行

rabbitmq-service install

启动服务

管理员身份运行

rabbitmq-service start  #启动服务
rabbitmq-service stop   #停止服务

下载安装插件

rabbitmq-plugins enable rabbitmq_management  

管理后台 地址:http://127.0.0.1:15672/使用用户名/密码(guest/guest)

下载插延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

文件放到插件目录中:

%UserProFile%\scoop\apps\RabbitMQ\3.10.5\plugins

执行安装命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 启用
# rabbitmq-plugins disable rabbitmq_delayed_message_exchange # 禁用

重启服务

安装 python 包

pip install Pika
conda activate base
pip install Pika

代码调用

发送端 send.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test-exchange')  #创建队列 

 #创建 暂存区
channel.exchange_declare(exchange='test-exchange', 
                           exchange_type='x-delayed-message',
                           arguments={"x-delayed-type": "direct"})

channel.queue_bind(exchange='test-exchange', queue='test-exchange', routing_key='test-exchange')  # 队列 与 暂存区 绑定

# 发送消息
channel.basic_publish(exchange='test-exchange',  
                        routing_key='test-exchange',
                        properties=pika.BasicProperties(
                            headers={'x-delay': 6000}   # 设置4000毫秒延迟
                        ),
                        body='Hello World!')
print(" [x] Sent 'Hello World!'")

connection.close() # 关闭链接

接收端 receive.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# channel.queue_declare(queue='test-exchange')   防止空调用,建议先执行 发送

#接收回调
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(" [x] Received %r" % str(properties))



# 接收消息
channel.basic_consume(queue='test-exchange',
                      auto_ack=True,
                      on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

参考:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

https://juejin.cn/post/7095011255085662244

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

https://www.cloudamqp.com/blog/what-is-a-delayed-message-exchange-in-rabbitmq.html

下一篇