基于我的笔记 :基于Swoft2.x框架实现php操作rabbitMQ 的工作队列
RabbitMq重启之后queue_declare队列不会丢失- 为了不让队列消失,需要把队列声明为持久化(durable)。为此我们通过queue_declare的第三参数为true:
- queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改
$channel->queue_declare('queue', false, true, false, false);
消息持久化
注意:消息持久化
将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你可以使用publisher confirms。
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
手动应答
- 回调函数中设置手动应答
- 执行回调配置
# 消息
$msg = new AMQPMessage('swoft_queue_test'.time(),[
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化
]);
// 第四参数为自动应答, 设置为false
$channel->basic_consume('swoft_queue_test','',false,false,false,false,$callback);
公平调度
比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
$channel->basic_qos(null, 1, null);
完成所有的配置后 , 我们的代码就变成了:
完整代码 发送数据 /**
* @RequestMapping(route="index")
*/
public function index()
{
$connect = new AMQPStreamConnection('39.105.106.191',5672,'guest','guest');
$channel = $connect->channel();
// 第三参数持久化
$channel->queue_declare('swoft_queue_test', false,true,false,false);
# 消息
$msg = new AMQPMessage('swoft_queue_test'.time(),[
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化
]);
$channel->confirm_select(); // 发布确认模式
//推送成功
$channel->set_ack_handler(
function (AMQPMessage $message) {
echo "发送成功: " . $message->body . PHP_EOL;
}
);
//推送失败
$channel->set_nack_handler(
function (AMQPMessage $message) {
echo "发送失败: " . $message->body . PHP_EOL;
}
);
# 发送
$channel->basic_publish($msg,'','swoft_queue_test');
$channel->wait_for_pending_acks();
$channel->close();
$connect->close();
return ['code'=>0 ,"msg"=>"发送成功"];
}
接收数据
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?