您当前的位置: 首页 > 

苗先生的PHP记录

暂无认证

  • 0浏览

    0关注

    190博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

think-queue

苗先生的PHP记录 发布时间:2021-06-16 10:17:06 ,浏览量:0

这是我看到最全的队列了,可以读一下

以下为手动复制的快照 , 避免日后删掉看不到

前言 当前笔记中的内容针对的是 thinkphp-queue 的 v1.1.2 版本,现在官方已经更新到了 v1.1.3 版本, 下文中提到的几个Bug在最新的master分支上均已修复。 笔记中的部分内容还未更新。

传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。

一般来说,可以抽离的任务具有以下的特点:

允许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式)

允许延后:

抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务;

允许异步:

业务处理过程中的邮件,短信等通知

允许并行:

用户支付成功之后,邮件通知,微信通知,短信通知可以由多个不同的消费者并行执行,通知到达的时间不要求先后顺序。

允许失败和重试

强一致性的业务放入核心流程处理 无一致性要求或最终一致即可的业务放入队列处理 thinkphp-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等 队列的多队列, 内存限制 ,启动,停止,守护等 消息队列可降级为同步执行 thinkphp-queue 内置了 Redis,Database,Topthink ,Sync这四种驱动。本文主要介绍 thinkphp-queue 结合其内置的 redis 驱动的使用方式和基本原理。

注1:如无特殊说明,下文中的 ‘消息’ 和 ‘任务’两个词指代的是同一个概念,即队列中的一个成员。该成员对消息队列而言是其内部保存的消息; 对业务应用而言是一个待执行的任务。请根据语境区分。

注2:本文编写时(2017-02-15)使用的 thinkphp-queue 的版本号是 v1.1.2 。该版本中部分功能并未全部完成,如 subscribe 模式,以及存在几个bug(稍后会提及)。如有变更,请以官方最新版为准。

一 代码示例 先通过一段代码,了解一下 thinkphp-queue 的基本使用流程。

目标:

在业务控制器中推送一个新消息到一个名为 ‘helloJobQueue’ 的队列中,该消息中包含我们自定义的业务数据,然后,编写一个名为 Hello 的消费者类,并通过命令行去调用该消费者类获取这个消息,拿到定义的数据。

1.1 安装 thinkphp-queue composer install thinkphp-queue 1.2 搭建消息队列的存储环境 使用 Redis [推荐]

安装并启动 Redis 服务 使用数据库 [不推荐]

CREATE TABLE prefix_jobs ( id int(11) NOT NULL AUTO_INCREMENT, queue varchar(255) NOT NULL, payload longtext NOT NULL, attempts tinyint(3) unsigned NOT NULL, reserved tinyint(3) unsigned NOT NULL, reserved_at int(10) unsigned DEFAULT NULL, available_at int(10) unsigned NOT NULL, created_at int(10) unsigned NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 1.3 配置消息队列的驱动 根据选择的存储方式,在 \application\extra\queue.php 这个配置文件中,添加消息队列对应的驱动配置

   return [
       'connector'  => 'Redis',		    // Redis 驱动
       'expire'     => 60,				// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 
       'default'    => 'default',		// 默认的队列名称
       'host'       => '127.0.0.1',	    // redis 主机ip
       'port'       => 6379,			// redis 端口
       'password'   => '',				// redis 密码
       'select'     => 0,				// 使用哪一个 db,默认为 db0
       'timeout'    => 0,				// redis连接的超时时间
       'persistent' => false,			// 是否是长连接
     
   //    'connector' => 'Database',   // 数据库驱动
   //    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
   //    'default'   => 'default',    // 默认的队列名称
   //    'table'     => 'jobs',       // 存储消息的表名,不带前缀
   //    'dsn'       => [],

   //    'connector'   => 'Topthink',	// ThinkPHP内部的队列通知服务平台 ,本文不作介绍
   //    'token'       => '',
   //    'project_id'  => '',
   //    'protocol'    => 'https',
   //    'host'        => 'qns.topthink.com',
   //    'port'        => 443,
   //    'api_version' => 1,
   //    'max_retries' => 3,
   //    'default'     => 'default',

   //    'connector'   => 'Sync',		// Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
   ];

1.3.1 配置文件中的 expire 参数说明

expire 参数指的是任务的过期时间。 过期的任务,其准确的定义是

任务的状态为执行中 任务的开始执行的时刻 + expire > 当前时刻 expire 不为null 时 ,thinkphp-queue 会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。

expire 为null 时,thinkphp-queue 不会检查过期的任务,性能相对较高一点。但是需要注意:

这些执行超时的任务会一直留在消息队列中,需要开发者另行处理(删除或者重发)! [ Bug ]在redis 驱动下,expire 设置为 null 时,无法实现任务的延迟执行! (Database 驱动下无影响) 对expire 参数理解或者使用不当时,很容易产生一些bug,后面会举例提到。

1.4 消息的创建与推送 我们在业务控制器中创建一个新的消息,并推送到 helloJobQueue 队列

新增 \application\index\controller\JobTest.php 控制器,在该控制器中添加 actionWithHelloJob 方法

            
关注
打赏
1665468453
查看更多评论
0.0368s