Redis配合ThinkPHP(tp6.0.8)实现消息队列
运行环境
Windows 10
本地环境
PHP 7.2.9
ThinkPHP v6.0.8
Redis v3.2.100
安装
tp6.0只能通过composer方式安装,而且默认框架不带ORM和Queue,都需要通过composer方式安装。tp6.0环境默认安装的是最新版think-queue,目前是3.X版本。composer require topthink/think-queue
注意:如果是tp5.1,只能安装2.X版本,命令为:composer require topthink/think-queue=2.*
验证是否安装成功,只需命令行执行:php think queue:listen -h
,看到如下画面,说明安装成功。
配置
安装成功之后,在config
目录下会有一个queue.php
文件,我们选择redis
驱动类型,默认驱动类型是sync
,即同步执行。
创建任务类
由于我的本地环境是单应用模式,所以只需要在app/controller
目录下新建任务类文件Myjob.php
,内容如下:
<?php
namespace app\controller;
use think\facade\Db;
use think\queue\Job;
class Myjob{
public function fire(Job $job, $data)
{
// 这里执行具体的任务
$isJobDone = $this->doJob($data);
if ($isJobDone) {
// 如果任务执行成功,删除任务
$job->delete();
print("<info>Job has been done and deleted"."</info>\n");
} else {
// 通过这个方法可以检查这个任务已经重试了几次了
if ($job->attempts() > 3) {
print("<warn>Job has been retried more than 3 times!"."</warn>\n");
$job->delete();
}
}
}
public function failed($data)
{
// 任务达到最大重试次数后,失败了
}
/*
* user表中有一个username字段
*/
public function doJob($data)
{
$name = $data['name'];
$res = Db::table('user')->insert(['username'=>$name]);
return $res;
}
}
任务发布
think\facade\Queue::push($job, $data = '', $queue = null)
和think\facade\Queue::later($delay, $job, $data = '', $queue = null)
两个方法,前者是立即执行,后者是在$delay秒后执行
新建了任务类文件之后,我们还需要有动作触发任务的执行,所以我还创建了JobTest.php
文件用于测试,内容如下:
<?php
namespace app\controller;
use think\facade\Queue;
class JobTest
{
public function publish(){
// 指定处理任务的类
$jobHandlerClassName = 'app\controller\Myjob';
// 队列名称
$jobQueueName = "jobQueue";
// 当前任务所需的业务数据
// 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
$jobData = [ 'name'=>'job' . mt_rand()] ;
// 将该任务推送到消息队列,等待对应的消费者去执行,下面是两种方式
// $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
$isPushed = Queue::later(5, $jobHandlerClassName , $jobData , $jobQueueName);
// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
if( $isPushed !== false ) {
echo date('Y-m-d H:i:s') . " a new Job is Pushed to the MQ"."<br>";
} else {
echo 'Oops, something went wrong.';
}
}
}
监听任务并执行
开启Redis服务:redis-server
。
执行队列监听:php think queue:listen --queue jobQueue
。
当代码执行 app\controller\jobtest\publish
方法时,即触发了一次任务,监听程序监听到之后就会按照预定程序进行执行:立即执行或者延迟执行。至此,一个简单的消息队列完成。
可配合supervisor使用,保证进程常驻