4.5 同步任务

同步任务即Task,用来做一些异步的慢速任务,比如发邮件、批量任何、任何不支持异步Client的服务调用等等。MSF框架是纯异步Server架构,也就是说任何耗时的任务的代码不能放到Worker进程中执行,所以我们利用Swoole的Task模块来完成耗时任务。

Task进程

同步任务代码是在Task进程中执行的,Task进程具有以下的特性:

  • 同步阻塞
  • 支持定时器

目前MSF框架封装了异步的Http Client、Redis Client、MySQL Client,除了这几种原生支持异步外,任何其他的非异步Client的网络IO均要封装成Task,比如MongoDB Task。

Task示例

框架内封装了Task基类\PG\MSF\Tasks\Task,自定义的Task都应该继承此类。另外,框架内置一个MongoDbTask类,是操作MongoDB的任务类,封装了查询和修改MongoDB数据库的一些基本方法,如:

composer安装依赖

编辑项目的composer.json,加入依赖alcaeus/mongo-php-adapter

{
  "require": {
    "alcaeus/mongo-php-adapter": "^1.0"
  }
}

或者

$composer require alcaeus/mongo-php-adapter

MongoDB配置

<?php
/**
 * MongoDb配置文件
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

$config['mongodb']['test']['server'] = 'mongodb://192.168.1.106:27017';
$config['mongodb']['test']['options'] = ['connect' => true];
$config['mongodb']['test']['driverOptions'] = [];

return $config;

示例代码:

./php-msf-demo/config/docker/mongodb.php

Task业务逻辑类

<?php
namespace App\Tasks;

use \PG\MSF\Tasks\MongoDbTask;

class Idallloc extends MongoDbTask
{
    /**
     * 当前要用的配置  配置名,db名,collection名
     * @var array
     */
    protected $mongoConf = ['test', 'demo', 'idalloc'];

    public function getNextId($key)
    {
        $condition = [
            '_id' => $key,
        ];
        $update = [
            '$inc' => [
                'last' => 1,
            ],
        ];
        $options = [
            'new' => true,
            'upsert' => true,
        ];


        $doc = $this->mongoCollection->findAndModify($condition, $update, [], $options);

        return isset($doc['last']) ? $doc['last'] : false;
    }
}

示例代码:

https://github.com/pinguo/php-msf-demo/app/Tasks/Idallloc.php

调用Task

<?php
/**
 * MongoDB操作示例
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

namespace App\Controllers;

use PG\MSF\Controllers\Controller;
use App\Tasks\Idallloc;

class MongoDBTest extends Controller
{
    public function actionGetNewId()
    {
        /**
         * @var Idallloc $idAlloc
         */
        $idAlloc = $this->getObject(Idallloc::class);
        $newId   = yield $idAlloc->getNextId('test');
        $this->output($newId);
    }
}

示例代码:

https://github.com/pinguo/php-msf-demo/app/Controllers/MongoDBTest.php

我们需要认识到:

  1. Worker进程只是将任务投递给Tasker进程后立即返回,即是非阻塞的投递;

  2. Tasker进程执行相应的业务逻辑,在这里就是从MongoDB获取新的一个ID;

  3. Worker进程是通过协程获取到Tasker执行结果,即调用需要加yield关键字;

访问接口

[worker@newdev ~]$ curl http://127.0.0.1:8000/MongoDBTest/GetNewId
1

links

results matching ""

    No results matching ""