Redis配合ThinkPHP(tp6.0.8)实现消息队列

运行环境

Windows 10 本地环境
PHP 7.2.9
ThinkPHP v6.0.8
Redis v3.2.100

安装

tp6.0只能通过composer方式安装,而且默认框架不带ORM和Queue,都需要通过composer方式安装。tp6.0环境默认安装的是最新版think-queue,目前是3.X版本
composer require topthink/think-queue

注意:如果是tp5.1,只能安装2.X版本,命令为:composer require topthink/think-queue=2.*

验证是否安装成功,只需命令行执行:php think queue:listen -h,看到如下画面,说明安装成功。
微信截图_20210708154210.png

配置

安装成功之后,在config目录下会有一个queue.php文件,我们选择redis驱动类型,默认驱动类型是sync,即同步执行。
微信截图_20210708154515.png

创建任务类

由于我的本地环境是单应用模式,所以只需要在app/controller目录下新建任务类文件Myjob.php,内容如下:

<?php
namespace app\controller;

use think\facade\Db;
use think\queue\Job;

class Myjob{
    
    public function fire(Job $job, $data)
    {
    
            // 这里执行具体的任务
            $isJobDone = $this->doJob($data);
            
            if ($isJobDone) {
                // 如果任务执行成功,删除任务
                $job->delete();
                print("<info>Job has been done and deleted"."</info>\n");
            } else {
                // 通过这个方法可以检查这个任务已经重试了几次了
                if ($job->attempts() > 3) {
                    print("<warn>Job has been retried more than 3 times!"."</warn>\n");
                    $job->delete();
                }
            }
          
    }
    
    public function failed($data)
    {
    
        // 任务达到最大重试次数后,失败了
    }

    /*
     * user表中有一个username字段
     */
    public function doJob($data)
    {
        $name = $data['name'];
        $res = Db::table('user')->insert(['username'=>$name]);
        return $res;
    }

}

任务发布

think\facade\Queue::push($job, $data = '', $queue = null)think\facade\Queue::later($delay, $job, $data = '', $queue = null) 两个方法,前者是立即执行,后者是在$delay秒后执行
新建了任务类文件之后,我们还需要有动作触发任务的执行,所以我还创建了JobTest.php文件用于测试,内容如下:
<?php
namespace app\controller;

use think\facade\Queue;

class JobTest
{
    
    public function publish(){
        // 指定处理任务的类
        $jobHandlerClassName  = 'app\controller\Myjob';

        // 队列名称
        $jobQueueName        = "jobQueue";

        // 当前任务所需的业务数据
        // 不能为 resource 类型,其他类型最终将转化为json形式的字符串
        //   ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
        $jobData             = [ 'name'=>'job' . mt_rand()] ;

        // 将该任务推送到消息队列,等待对应的消费者去执行,下面是两种方式
        // $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
        $isPushed = Queue::later(5, $jobHandlerClassName , $jobData , $jobQueueName);

        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ) {
            echo date('Y-m-d H:i:s') . " a new Job is Pushed to the MQ"."<br>";
        } else {
            echo 'Oops, something went wrong.';
        }
    }

}

监听任务并执行

开启Redis服务:redis-server
执行队列监听:php think queue:listen --queue jobQueue

当代码执行 app\controller\jobtest\publish 方法时,即触发了一次任务,监听程序监听到之后就会按照预定程序进行执行:立即执行或者延迟执行。至此,一个简单的消息队列完成。

可配合supervisor使用,保证进程常驻

标签: thinkphp, 消息队列, redis

添加新评论