您当前的位置: 首页 >  rabbitmq

苗先生的PHP记录

暂无认证

  • 0浏览

    0关注

    190博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

PHP RabbitMQ消息持久化以及手动应答

苗先生的PHP记录 发布时间:2022-06-07 19:03:25 ,浏览量:0

基于我的笔记 :基于Swoft2.x框架实现php操作rabbitMQ 的工作队列

RabbitMq重启之后queue_declare队列不会丢失
  1. 为了不让队列消失,需要把队列声明为持久化(durable)。为此我们通过queue_declare的第三参数为true:
  2. queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改
$channel->queue_declare('queue', false, true, false, false);
消息持久化 注意:消息持久化

将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你可以使用publisher confirms。

$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
 );
手动应答
  1. 回调函数中设置手动应答
  2. 执行回调配置
# 消息
        $msg = new AMQPMessage('swoft_queue_test'.time(),[
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化
        ]);

 // 第四参数为自动应答, 设置为false
        $channel->basic_consume('swoft_queue_test','',false,false,false,false,$callback);
公平调度

比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

$channel->basic_qos(null, 1, null);

完成所有的配置后 , 我们的代码就变成了:

完整代码 发送数据
 /**
     * @RequestMapping(route="index")
     */
    public function index()
    {
        $connect = new AMQPStreamConnection('39.105.106.191',5672,'guest','guest');
        $channel = $connect->channel();
        // 第三参数持久化
        $channel->queue_declare('swoft_queue_test', false,true,false,false);
        # 消息
        $msg = new AMQPMessage('swoft_queue_test'.time(),[
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 消息持久化
        ]);

        $channel->confirm_select(); // 发布确认模式

        //推送成功
        $channel->set_ack_handler(
            function (AMQPMessage $message) {
                echo "发送成功: " . $message->body . PHP_EOL;
            }
        );

        //推送失败
        $channel->set_nack_handler(
            function (AMQPMessage $message) {
                echo "发送失败: " . $message->body . PHP_EOL;
            }
        );
        # 发送
        $channel->basic_publish($msg,'','swoft_queue_test');
        $channel->wait_for_pending_acks();

        $channel->close();
        $connect->close();
        return ['code'=>0 ,"msg"=>"发送成功"];
    }
接收数据
            
关注
打赏
1665468453
查看更多评论
0.0445s