关键词搜索

源码搜索 ×
×

tp6 think-queue 消息队列/任务 安装使用

发布2022-02-17浏览5523次

详情内容

0、启用命令方式

listen、work只能运行一种,运行work,再运行listen,那么就会停止work

第一种:指定任务名称运行

  1. php think queue:listen --queue 任务名称
  2. php think queue:work --queue 任务名称

缺点:每次创建的任务都需要人工在命令窗口中监听运行才能生效

例子:先创建A任务了,已经运行监听php think queue:listen --queue A任务,如果需要创建另一个B任务并运行,那么还需要再一次运行监听php think queue:listen --queue B任务,否则B任务不可能进行。

第二种:不指定任务名称运行(推荐)

  1. php think queue:listen
  2. php think queue:work

优点:只要在命令窗口中运行监听一次,

只要运行监听php think queue:listen或php think queue:work 一次,那么开发人员随时创建的任务都可以监听和运行,例子,这个月创建A了任务,下个月再创建B任务,只要推送就不需要再次运行监听命令;

停止命令:

php think queue:restart

一、前言
为了实现订单超时删除功能,使用到think-queue队列,本文记录安装和使用think-php,Redis及进程常驻Supervisor

thinkphp-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
队列的多队列, 内存限制 ,启动,停止,守护等
消息队列可降级为同步执行

二、安装
安装ThinkPHP6
使用composer安装。

composer create-project topthink/think tp

安装think-queue
在ThinkPHP6的安装目录下输入下面命令安装。

composer require topthink/think-queue


安装Redis
下载Redis,可以从官网下载redis,本文以redis-6.0.6为例。

  1. wget http://download.redis.io/releases/redis-6.0.6.tar.gz
  2. tar xvf redis-6.0.6.tar.gz

编译并安装Redis
编译时使用make MALLOC=libc编译,使用make编译时可能会报找不到jemalloc/jemalloc.h的错误,或者think-queue使用redis驱动时无法工作。为此坑曾经付出了2天时间的代价。

  1. cd redis-6.0.6
  2. make MALLOC=libc
  3. make install

配置Redis并启动Redis服务
配置conf,将redis.conf里的“daemonize no”改为“daemonize yes”,让redis在后台运行

  1. mkdir /etc/redis
  2. cp redis-6.0.6/redis.conf /etc/redis/redis.conf
  3. #将redis.conf里的“daemonize no”改为“daemonize yes”,让redis在后台运行。


启动redis

redis-server /etc/redis/redis.conf #启动redis服务


查看redis服务是否运行

ps -ef |grep redis #查看redis是否运行


停止redis

redis-cli shutdown #停止redis服务


安装phpredis
PHP使用Redis需要安装Redis扩展库phpredis,最好下载发布版本。本文以5.3.1为例。

下载phpredis

  1. wget https://github.com/phpredis/phpredis/archive/5.3.1.tar.gz
  2. tar xvf 5.3.1.tar.gz


编译phpredis

  1. phpize
  2. whereis php-config #命令返回 php-config: /opt/lampp/bin/php-config,下一步要用
  3. ./configure --with-php-config=/opt/lampp/bin/php-config #设置php-config
  4. make
  5. make install


php添加redis扩展
在php.ini里增加redis.so。然后可以从phpinfo查看是否有

  1. echo "extension=redis.so" >> /opt/lampp/etc/php.ini
  2. php -i | grep redis #查看是否支持redis扩展

三、配置queue.php(重点注意)

安装好,首先修改默认配置\config\queue.php

配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动。

如果不修改这里,在命令窗口启动的时候,不会有任何的提示

四、测试(队列生产者)

参考:【消息队列学习一】TP6 基于 redis 实现消息队列和延迟队列 - 奇点原生 - 博客园

消息队列实现过程流程图
1、通过生产者推送消息到消息队列服务中
2、消息队列服务将收到的消息存入redis队列中(zset)
3、消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
4、处理获取下来的消息调用业务类进行处理相关业务
5、业务处理后,需要从队列中删除消息

4.1调用创建一个生产者

  1. <?php
  2. namespace app\api\controller;
  3. use app\BaseController;
  4. use think\facade\Queue;
  5. class Index extends BaseController
  6. {
  7. public function index()
  8. {
  9. // echo phpinfo();exit();
  10. // 1.当前任务由哪个类来负责处理
  11. // 当轮到该任务时,系统将生成该类的实例,并调用其fire方法
  12. $jobHandlerClassName = 'app\api\controller\Job1';
  13. // 2.当任务归属的队列名称,如果为新队列,会自动创建
  14. $jobQueueName = "helloJobQueue";
  15. // 3.当前任务所需业务数据,不能为resource类型,其他类型最终将转化为json形式的字符串
  16. $jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];
  17. // 4.将该任务推送到消息列表,等待对应的消费者去执行
  18. // 入队列,later延迟执行,单位秒,push立即执行
  19. $isPushed = Queue::later(10, $jobHandlerClassName, $jobData, $jobQueueName);
  20. // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
  21. if ($isPushed !== false) {
  22. echo '推送成功';
  23. } else {
  24. echo '推送失败';
  25. }
  26. }
  27. }

4.2消费者和删除。(任务处理)

  1. <?php
  2. namespace app\api\controller;
  3. use think\facade\Log;
  4. use think\queue\Job;
  5. class Job1
  6. {
  7. /**
  8. * fire方法是消息队列默认调用的方法
  9. * @param Job $job 当前的任务对象
  10. * @param array $data 发布任务时自定义的数据
  11. */
  12. public function fire(Job $job, array $data)
  13. {
  14. // 有些任务在到达消费者时,可能已经不再需要执行了
  15. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
  16. if (!$isJobStillNeedToBeDone) {
  17. $job->delete();
  18. return;
  19. }
  20. $isJobDone = $this->doHelloJob($data);
  21. if ($isJobDone){
  22. $job->delete();
  23. echo "删除任务" . $job->attempts() . '\n';
  24. }else{
  25. if ($job->attempts() > 3){
  26. $job->delete();
  27. echo "超时任务删除" . $job->attempts() . '\n';
  28. }
  29. }
  30. }
  31. /**
  32. * 有些消息在到达消费者时,可能已经不再需要执行了
  33. * @param array $data
  34. * @return bool
  35. */
  36. private function checkDatabaseToSeeIfJobNeedToBeDone(array $data) {
  37. return true;
  38. }
  39. /**
  40. * 根据消息中的数据进行实际的业务处理...
  41. * @param array $data
  42. * @return bool
  43. */
  44. private function doHelloJob(array $data)
  45. {
  46. echo '执行业务逻辑:' . $data['bizId'] . '\n';
  47. return true;
  48. }
  49. }

4.3 测试:在浏览器上浏览刚刚创建的方法:http://www.xxx.com/api/index/index

如果成功了,只会输出"推送成功"四个字,有多余的字符都是不成功的

 访问后,可以看到已经向消息队列服务推送了消息,此时我们需要在项目根目录下运行命令创建工作进程来处理队列中的消息

4.4 运行命令

php think queue:work --queue helloJobQueue

 通过上述可以看到,当我们开启了work进程时,就会从队列中获取任务,然后找到消费者执行后续的业务逻辑。

因为这里我采用的push 表示立即执行,所以只要队列中有就会立马执行,如果我们需要使用到延时场景,例如订单支付超时,这时我们就可以使用later即可

五、多模块多功能实现

修改生产者代码

  1. <?php
  2. namespace app\api\controller;
  3. use app\BaseController;
  4. use think\facade\Queue;
  5. class Index extends BaseController
  6. {
  7. public function index()
  8. {
  9. // echo phpinfo();exit();
  10. // 1.当前任务由哪个类来负责处理
  11. // 当轮到该任务时,系统将生成该类的实例,并调用其fire方法
  12. $jobHandlerClassName = 'app\api\controller\Job1';
  13. // 2.当任务归属的队列名称,如果为新队列,会自动创建
  14. $jobQueueName = "helloJobQueue";
  15. // 3.当前任务所需业务数据,不能为resource类型,其他类型最终将转化为json形式的字符串
  16. $jobData = ['ts' => time(), 'bizId' => uniqid(), 'a' => 1];
  17. // 4.将该任务推送到消息列表,等待对应的消费者去执行
  18. // 入队列,later延迟发送,单位秒,push立即发送
  19. $isPushed = Queue::later(10, $jobHandlerClassName, $jobData, $jobQueueName);
  20. // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
  21. if ($isPushed !== false) {
  22. echo '推送成功';
  23. } else {
  24. echo '推送失败';
  25. }
  26. }
  27. /**
  28. * 多模块延迟队列实现
  29. */
  30. public function pay(){
  31. $orderData = [
  32. "orderId" => uniqid()
  33. ];
  34. $isPushed = Queue::later(60, "app\api\controller\PayMessage", json_encode($orderData), "helloJobQueue");
  35. if ($isPushed)echo "\n 订单支付成功 \n";
  36. $email = [
  37. "email" => "1234567890@qq.com"
  38. ];
  39. $isPushed = Queue::later(120, "app\api\controller\EmailMessage", json_encode($email), "helloJobQueue");
  40. if ($isPushed)echo "\n 邮件发送成功 \n";
  41. }
  42. }

新增支付消息消费者

  1. <?php
  2. namespace app\api\controller;
  3. use think\queue\Job;
  4. class PayMessage
  5. {
  6. public function fire(Job $job, $data){
  7. $data = json_decode($data, true);
  8. if ($this->doJob($data)){
  9. $job->delete();
  10. }else{
  11. if ($job->attempts() > 3){
  12. print_r("订单超时:" . $data['orderId']);
  13. $job->delete();
  14. }
  15. }
  16. }
  17. public function doJob($data){
  18. print_r("发送支付成功通知:" . $data['orderId'] );
  19. return true;
  20. }
  21. }

新增邮箱发送消费者

  1. <?php
  2. namespace app\api\controller;
  3. use think\queue\Job;
  4. class EmailMessage
  5. {
  6. public function fire(Job $job, $data){
  7. $data = json_decode($data, true);
  8. if ($this->doJob($data)){
  9. $job->delete();
  10. }else{
  11. if ($job->attempts() > 3){
  12. print_r("\n 邮件发送超时:" . $data['orderId'] . '\n ');
  13. $job->delete();
  14. }
  15. }
  16. }
  17. public function doJob($data){
  18. print_r("\n 发送邮件:" . $data['email'] .'\n ');
  19. return true;
  20. }
  21. }

通过浏览器模拟访问

 因为本次我们使用的是延时队列所以我们可以到redis中查看

  1. 127.0.0.1:6379> keys *
  2. 1) "{queues:helloJobQueue}:delayed"

 当延时时间到了后,我们可以继续看到工作进程及时的进行消费

六、如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2

  1. namespace app\lib\job;
  2. use think\queue\Job;
  3. class Job2{
  4. public function task1(Job $job, $data){
  5. }
  6. public function task2(Job $job, $data){
  7. }
  8. public function failed($data){
  9. }
  10. }

七、官方例子:

创建任务类

单模块项目推荐使用 app\job 作为任务类的命名空间 多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)

还有个可选的任务失败执行的方法 failed 传入的参数为$data(发布任务时自定义的数据)

下面写两个例子

namespace app\job;

use think\queue\Job;

class Job1{
    
    public function fire(Job $job, $data){
    
            //....这里执行具体的任务 
            
             if ($job->attempts() > 3) {
                  //通过这个方法可以检查这个任务已经重试了几次了
             }
            
            
            //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
            $job->delete();
            
            // 也可以重新发布这个任务
            $job->release($delay); //$delay为延迟时间
          
    }
    
    public function failed($data){
    
        // ...任务达到最大重试次数后,失败了
    }

}
namespace app\lib\job;

use think\queue\Job;

class Job2{
    
    public function task1(Job $job, $data){
    
          
    }
    
    public function task2(Job $job, $data){
    
          
    }
    
    public function failed($data){
    
          
    }

}

发布任务

think\facade\Queue::push($job, $data = '', $queue = null) 和 think\facade\Queue::later($delay, $job, $data = '', $queue = null) 两个方法,前者是立即执行,后者是在$delay秒后执行

$job 是任务名
单模块的,且命名空间是app\job的,比如上面的例子一,写Job1类名即可
多模块的,且命名空间是app\module\job的,写model/Job1即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2

$data 是你要传到任务里的参数

$queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填

监听任务并执行

&> php think queue:listen --queue 任务名称

&> php think queue:work --queue  任务名称

两种,具体的可选参数可以输入命令加 --help 查看

可配合supervisor使用,保证进程常驻

八、work 模式和 listen 模式的区别

两者都可以用于处理消息队列中的任务

区别在于:

  • 2.3.1 执行原理不同

    • work 命令是单进程的处理模式。

      按照是否设置了 --daemon 参数,work命令又可分为单次执行和循环执行两种模式。

      • 单次执行:不添加 --daemon参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当队列为空时,会sleep一段时间然后退出。
      • 循环执行:添加了 --daemon参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当队列为空时,会在每次循环中sleep一段时间。
    • listen 命令是 双进程 + 管道 的处理模式。

      listen命令所在的进程会循环地创建 单次执行模式的 work 进程,每次创建的 work 进程只消费一个消息就会结束, 然后 listen 进程再创建一个新的 work 进程,

      • listen 进程会定时检查当前的 work 进程执行时间是否超过了 --timeout 参数的值, 如果已超时, 则 listen 进程会 kill 掉 work 进程, 然后抛出异常
      • listen 进程会通过管道来监听当前的 work 进程的输出, 当 work 进程有输出时, listen 进程会将输出写入到 stdout / stderr
      • listen 进程会定时通过 proc_get_status() 来监控当前的 work 进程是否仍在运行, work 进程消费完一个任务之后, work 进程就结束了,其状态会变成 terminated, 此时 listen 进程就会重新创建一个新的 work 进程并对其计时, 新的 work 进程开始消费下一个任务
  • 2.3.2 结束时机不同

    • work 命令的结束时机在上面的执行原理部分已叙述,此处不再重复
    • listen 命令中,listen 进程和 work 进程会在以下情况下结束:
      • listen 进程会定时检查当前的 work 进程的执行时间是否超过了 --timeout 参数的值,如果已超时, 此时 listen 进程会先 kill 掉当前的 work 进程, 然后抛出一个 ProcessTimeoutException 异常并结束 listen 进程
      • listen 进程会定时检查自身使用的内存是否超过了 --memory 参数的值,如果已超过, 此时 listen 进程会直接 die 掉, work 进程也会自动结束.
  • 2.3.3 性能不同

    • work 命令是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;

    • 而listen模式则是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本。

      因此: work 模式的性能会比listen模式高

      注意:当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。

  • 2.3.4 超时控制能力

    • work 模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。

      举例来说,假如你在某次上线之后,在上文中的 \application\index\job\Hello.php 消费者的fire方法中添加了一段死循环 :

      public function fire(){
         while(true){ //死循环
             $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
             sleep(1);
         }
      }  

      那么这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理 helloJobQueue 队列的其他work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。

      work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过期任务重发能力。

    • 而 listen 命令可以限制 listen 进程创建的 work 进程的最大执行时间。

      listen 命令可通过 --timeout 参数限制 work 进程允许运行的最长时间,超过该时间限制后, work 进程会被强制 kill 掉, listen 进程本身也会抛出异常并结束;

    • 这里有必要补充一下 expire 和 timeout 之间的区别:

      • expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,而且,expire 和 timeout 是两个不同层次上的概念:

      • expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的 work 进程) 。expire 针对的对象是 任务

      • timeout 是指 work 进程的超时时间。这个时间只对当前执行的 listen 命令有效。timeout 针对的对象是 work 进程

  • 2.3.5 使用场景不同

    根据上面的介绍,可以看到,

    work 命令的适用场景是:

    • 任务数量较多
    • 性能要求较高
    • 任务的执行时间较短
    • 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑

    listen命令的适用场景是:

    • 任务数量较少
    • 任务的执行时间较长(如生成大型的excel报表等),
    • 任务的执行时间需要有严格限制

2.4 消息队列的开始,停止与重启

  • 开始一个消息队列:

    php think queue:work
  • 停止所有的消息队列:

    php think queue:restart
  • 重启所有的消息队列:

    php think queue:restart 
    php think queue:work 

九、特别例子()

路径:app\job\controller\Test

调用:

$isPushed = \think\facade\Queue::later(3, '\job\BaseJob',  ['class'=>'app\job\controller\Test2','method'=>'orders','attempts'=>10,'delayed'=>3,'data'=>['test' => 'testvalue']], 'default');

1、'\job\BaseJob':处理公共事务部分

2、'class'=>'app\job\controller\Test2' :处理自己部分业务的代码文件

3、'method'=>'orders',:处理自己部分业务的代码文件的方法

    const ATTEMPTS=10;//重试次数 默认=10

    const DELAYED=10;//重试延迟时间 单位秒

  1. <?php
  2. namespace app\job\controller;
  3. use app\BaseController;
  4. /**
  5. * @name 用于测试
  6. * @date 2021-11-29
  7. * @ruturn array
  8. */
  9. class Index extends BaseController
  10. {
  11. public function index3()
  12. {
  13. $isPushed = \think\facade\Queue::later(3, '\job\BaseJob', ['class'=>'app\job\controller\Test2','method'=>'orders','attempts'=>10,'delayed'=>3,'data'=>['test' => 'testvalue']], 'default');
  14. // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
  15. if ($isPushed !== false) {
  16. echo '推送成功3';
  17. } else {
  18. echo '推送失败';
  19. }
  20. }
  21. }

\job\BaseJob 路径:\extend\job\BaseJob.php

代码:

  1. <?php
  2. /*
  3. * @Description: 消息队列think-queue
  4. * @Date: 2021-09-13 17:02:21
  5. * @LastEditTime: 2021-09-13 18:17:27
  6. */
  7. namespace job;
  8. use Exception;
  9. use think\queue\Job;
  10. class BaseJob
  11. {
  12. const ATTEMPTS=10;//重试次数 默认=10
  13. const DELAYED=10;//重试延迟时间 单位秒
  14. /**
  15. * @name 创建队列(创建任务类)
  16. * @method Model
  17. * @date 2021-11-29
  18. * @param string/array [必填] $consumer 默认固定=
  19. * @param string/array [必填] $queueName 队列名称自定义但要与启动的进程一致
  20. * @param int/string/array [必填/选填]
  21. * @调用
  22. * @ruturn
  23. */
  24. function fire(Job $job, $data){
  25. $data['attempts']=($data['attempts']??self::ATTEMPTS)-1;
  26. $data['DELAYED']=$data['DELAYED']??self::DELAYED;
  27. $works = $job->attempts();//任务已经重试了几次
  28. try{
  29. $handle=new $data['class'];
  30. $result=$handle?->{$data['method']}($data['data']);
  31. if($result===null&&$works<$data['attempts']){
  32. $job->release($data['delayed']);//重新发布这个任务 $data['delayed']=延迟时间
  33. }else{
  34. $job->delete();
  35. }
  36. }catch(Exception $e){
  37. $job->delete();
  38. }
  39. }
  40. }

路径:app\job\controller\Test2

  1. <?php
  2. namespace app\job\controller;
  3. use think\facade\Db;
  4. class Test2{
  5. public function orders($data)
  6. {
  7. Db::name('err')->insert(['msg'=>1]);
  8. return true;
  9. }
  10. }

参考:​​​​​​ThinkPHP6+Redis+think-queue+Supervisor实现进程常驻消息队列_lxfdmwin的博客-CSDN博客_thinkphp6 消息队列

【消息队列学习一】TP6 基于 redis 实现消息队列和延迟队列 - 奇点原生 - 博客园

TP6队列thinkphp-queue使用 - 苏晓信个人博客网站

TP5:think-queue官网文档:think-queue/doc at master · tp5er/think-queue · GitHub

redis参考地址:Redis 教程 | 菜鸟教程

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载