5.5 连接池

连接池的重要性,这时就不赘述了,下面具体介绍框架中实现的哪些连接池。

Redis连接池

主要特性

  • 支持异步+协程
  • 支持断线重连
  • 支持自动提取和归还连接
  • 统一同步和异步调用方式

配置

<?php
/**
 * 本地环境
 */
$config['redis']['p1']['ip']               = '127.0.0.1';
$config['redis']['p1']['port']             = 6379;
$config['redis']['p1']['min_conn']         = 3;
$config['redis']['p1']['max_conn']         = 6;
$config['redis']['p1']['max_time']         = 3600;
//$config['redis']['p1']['password']       = 'xxxx';
//$config['redis']['p1']['select']         = 1;
// Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP;
// PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['phpSerialize']   = \Redis::SERIALIZER_NONE;
// 是否将key md5后储存,默认为0,开启为1
//$config['redis']['p1']['hashKey']        = 1;
// 设置key的前缀
//$config['redis']['p1']['keyPrefix']      = 'demo_';

return $config;

示例配置代码:

./php-msf-demo/app/config/docker/redis.php

  • $config['redis']

代表Redis连接池相关配置

  • p1,p2,p3,p4,p5,p6

这里的p仅代表一台或者一组Redis服务器,在使用连接池时会用到,如果Redis服务器端分片(比如twemproxy)就填写为集群导出的地址与端口等信息。

  • ip

Redis服务器地址

  • port

Redis服务器端口

  • min_conn

维持最小连接数

  • max_conn

最大连接数

  • max_time

连接有效时间

  • password

Redis认证密钥

  • select

Redis DB

  • redisSerialize

Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY

  • phpSerialize

PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY

  • hashKey

是否将key md5后储存,默认为0,开启为1

  • keyPrefix

设置key的前缀

Redis连接池的使用

/**
 * Redis示例控制器
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

namespace App\Controllers;

use PG\MSF\Controllers\Controller;
use App\Models\Demo as DemoModel;

class Redis extends Controller
{
    // Redis连接池读写示例
    public function actionPoolSetGet()
    {
        yield $this->getRedisPool('p1')->set('key1', 'val1');
        $val = yield $this->getRedisPool('p1')->get('key1');

        $this->outputJson($val);
    }
}
  1. $this->getRedisPool($name)

获取连接池对象,并选择名为$name的连接池,$name由配置文件中声明,比如上述配置中的tw

  1. 连接池对象的所有方法映射为标准的Redis操作指令

如:SETEX key seconds value映射为$this->getRedisPool($name)->setex($key, $seconds, $value)

  1. string类型的简化操作

$this->getRedisPool($name)->cache($key, $value = '', $expire = 0),$key为redis key,$value为缓存的值,$expire为过期时间,默认不会过期。

  1. 执行lua脚本

$this->getRedisPool($name)->evalMock($script, $args = array(), $numKeys = 0)

如:

<?php
function luaExample()
{
    $num = 100;
    $lua = "
           local allWorks = {}
           local recWorks = {}
           local random = nil
           for k, v in pairs(KEYS) do
               local works = redis.call('sRandMember', v, '" . $num . "')
               if works ~= nil then
                   for key, val in pairs(works) do
                       table.insert(allWorks, val)
                   end
               end
           end
           while #recWorks < " . $num . " and #allWorks > 0 do
               random = math.random(#allWorks)
               table.insert(recWorks, allWorks[random])
               table.remove(allWorks, random)
           end
           return cjson.encode(recWorks)
       ";
    $keys = ['feedId1', 'feedId2', 'feedId3'];
    $this->getRedisPool('tw')->evalMock($lua, $keys, count($keys));
}

Redis代理

在Redis连接池的基本上,MSF框架还实现了Redis代理的基本功能,主要特性有:

  • 支持分布式自动分片
  • 支持master-slave读写分离
  • 支持故障自动failover

配置

<?php
/**
 * 本地环境
 */
$config['redis']['p1']['ip']               = '127.0.0.1';
$config['redis']['p1']['port']             = 6379;
$config['redis']['p1']['min_conn']         = 3;
$config['redis']['p1']['max_conn']         = 6;
$config['redis']['p1']['max_time']         = 3600;
//$config['redis']['p1']['password']       = 'xxxx';
//$config['redis']['p1']['select']         = 1;
// Redis序列化选项等同于phpredis序列化的各个选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['redisSerialize'] = \Redis::SERIALIZER_PHP;
// PHP序列化选项,为了兼容yii迁移项目的set,get,mset,mget,选项如:\Redis::SERIALIZER_PHP,\Redis::SERIALIZER_IGBINARY
//$config['redis']['p1']['phpSerialize']   = \Redis::SERIALIZER_NONE;
// 是否将key md5后储存,默认为0,开启为1
//$config['redis']['p1']['hashKey']        = 1;
// 设置key的前缀
//$config['redis']['p1']['keyPrefix']      = 'demo_';

$config['redis']['p2']['ip']               = '127.0.0.1';
$config['redis']['p2']['port']             = 6380;
$config['redis']['p2']['min_conn']         = 3;
$config['redis']['p2']['max_conn']         = 6;
$config['redis']['p3']['max_time']         = 3600;

$config['redis']['p3']['ip']               = '127.0.0.1';
$config['redis']['p3']['port']             = 6381;
$config['redis']['p3']['min_conn']         = 3;
$config['redis']['p3']['max_conn']         = 6;
$config['redis']['p3']['max_time']         = 3600;

$config['redis']['p4']['ip']               = '127.0.0.1';
$config['redis']['p4']['port']             = 7379;

$config['redis']['p5']['ip']               = '127.0.0.1';
$config['redis']['p5']['port']             = 7380;
$config['redis']['p5']['min_conn']         = 3;
$config['redis']['p5']['max_conn']         = 6;
$config['redis']['p5']['max_time']         = 3600;

$config['redis']['p6']['ip']               = '127.0.0.1';
$config['redis']['p6']['port']             = 7381;
$config['redis']['p6']['min_conn']         = 3;
$config['redis']['p6']['max_conn']         = 6;
$config['redis']['p6']['max_time']         = 3600;

$config['redis_proxy']['master_slave'] = [
    'pools' => ['p1', 'p2', 'p3'],
    'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];

$config['redis_proxy']['cluster'] = [
    'pools' => [
        'p4' => 1,
        'p5' => 1,
        'p6' => 1
    ],
    'mode' => \PG\MSF\Marco::CLUSTER,
];

return $config;

示例配置代码:

https://github.com/pinguo/php-msf-demo/app/config/docker/redis.php

  • $config['redis_proxy']

代表Redis代理相关配置

  • cluster

这里的cluster仅代表一组Redis服务器集群,是一个标识

  • mode

Redis集群类型,\PG\MSF\Marco::CLUSTER代表分布式的Redis集群;\PG\MSF\Marco::MASTER_SLAVE代表主从结构的Redis集群

  • pools

当mode设置为\PG\MSF\Marco::CLUSTER时,pools为array,他的key表示Redis连接池名称,value表示Redis连接池权重;当mode设置为\PG\MSF\Marco::MASTER_SLAVE,pools为英文逗号分隔的Redis连接池名称列表。

Redis代理的使用

<?php
/**
 * Redis示例控制器
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

namespace App\Controllers;

use PG\MSF\Controllers\Controller;
use App\Models\Demo as DemoModel;

class Redis extends Controller
{
    // Redis代理使用示例(分布式)
    public function actionProxySetGet()
    {
        for ($i = 0; $i <= 100; $i++) {
            yield $this->getRedisProxy('cluster')->set('proxy' . $i, $i);
        }

        $val = yield $this->getRedisProxy('cluster')->get('proxy22');
        $this->outputJson($val);
    }

    // Redis代理使用示例(主从)
    public function actionMaserSlaveSetGet()
    {
        for ($i = 0; $i <= 100; $i++) {
            yield $this->getRedisProxy('master_slave')->set('M' . $i, $i);
        }

        $val = yield $this->getRedisProxy('master_slave')->get('M66');
        $this->outputJson($val);
    }
}

Redis连接池与代理的关系

Redis连接池与代表的关系

MySQL连接池

配置

<?php
/**
 * Docker环境
 */
$config['mysql']['master']['host']            = '127.0.0.1';
$config['mysql']['master']['port']            = 3306;
$config['mysql']['master']['user']            = 'root';
$config['mysql']['master']['password']        = '123456';
$config['mysql']['master']['charset']         = 'utf8';
$config['mysql']['master']['database']        = 'demo';
$config['mysql']['master']['min_conn']       = 10;
$config['mysql']['master']['max_conn']       = 60;
$config['mysql']['master']['max_time']       = 3600;

$config['mysql']['slave1']['host']           = '127.0.0.1';
$config['mysql']['slave1']['port']           = 3306;
$config['mysql']['slave1']['user']           = 'root';
$config['mysql']['slave1']['password']       = '123456';
$config['mysql']['slave1']['charset']        = 'utf8';
$config['mysql']['slave1']['database']       = 'demo';
$config['mysql']['slave1']['min_conn']       = 10;
$config['mysql']['slave1']['max_conn']       = 60;
$config['mysql']['slave1']['max_time']       = 3600;

$config['mysql']['slave2']['host']           = '127.0.0.1';
$config['mysql']['slave2']['port']           = 3306;
$config['mysql']['slave2']['user']           = 'root';
$config['mysql']['slave2']['password']       = '123456';
$config['mysql']['slave2']['charset']        = 'utf8';
$config['mysql']['slave2']['database']       = 'demo';
$config['mysql']['slave2']['min_conn']       = 10;
$config['mysql']['slave2']['max_conn']       = 60;
$config['mysql']['slave2']['max_time']       = 3600;

$config['mysql_proxy']['master_slave'] = [
    'pools' => [
        'master' => 'master',
        'slaves' => ['slave1', 'slave2'],
    ],
    'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];

return $config;

示例配置代码:

https://github.com/pinguo/php-msf-demo/app/config/docker/mysql.php

执行SQL

<?php
/**
 * MySQL示例控制器
 *
 * app/data/demo.sql可以导入到mysql再运行示例方法
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

namespace App\Controllers;

use PG\MSF\Controllers\Controller;

class MySQL extends Controller
{
    // MySQL连接池示例
    public function actionBizLists()
    {
        // SQL DBBuilder更多参考 https://github.com/jstayton/Miner
        $bizLists  = yield $this->getMysqlPool('master')->select("*")->from('biz')->go();
        $this->outputJson($bizLists);
    }

    // 直接执行sql
    public function actionShowDB()
    {
        /**
         * @var \PG\MSF\Pools\Miner $DBBuilder
         */
        $dbs = yield $this->getMysqlPool('master')->go(null, 'show databases');
        $this->outputJson($dbs);
    }

    // 事务示例
    public function actionTransaction()
    {
        /**
         * @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlPool
         */
        $mysqlPool = $this->getMysqlPool('master');
        // 开启一个事务,并返回事务ID
        $id = yield $mysqlPool->goBegin();
        $up = yield $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id);
        $ex = yield $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
        if ($ex['result']) {
            yield $mysqlPool->goCommit($id);
            $this->outputJson('commit');
        } else {
            yield $mysqlPool->goRollback($id);
            $this->outputJson('rollback');
        }
    }
}

示例代码:

https://github.com/pinguo/php-msf-demo/app/Controllers/MySQL.php

DBQueryBuilder

目前php-msf整合的是DB Query Builder是Miner,更多SQL的拼装请参考它。

另外,$this->getMysqlPool('连接池配置名'),获取的连接池对象,可以在上面直接调用Miner的相关方法,进行sql拼装。

关于 go($id = null, $sql = null)

go($id = null, $sql = null)是以协程方法执行SQL,它会创建一个MySQL协程,其中$id为事务ID,如果未启用事务,默认为null。$sql为手工书写待执行的SQL。

事务

事务操作的一般流程为:

  1. 开启一个事务,并返回事务ID
  2. 执行一个SQL,设置事务ID,执行一个SQL,设置事务ID,...
  3. 提交(回滚)事务

用代码实现即:

try {
    $id   = yield $mysqlPool->goBegin();
    $res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id)
    $res1 = yield $mysqlPool->update($table)->set($filed, $value)->go($id)
} catch (\Exception $e) {
    yield $mysqlPool->goRollback($id);
    throw $e;
}
yield $mysqlPool->goCommit($id);

MySQL代理

在MySQL连接池的基本上,MSF框架还实现了MySQL代理的基本功能,主要特性有:

  • 支持master-slave读写分离
  • 支持事务

配置代理

如上述配置代码

$config['mysql_proxy']['master_slave'] = [
    'pools' => [
        'master' => 'master',
        'slaves' => ['slave1', 'slave2'],
    ],
    'mode' => \PG\MSF\Marco::MASTER_SLAVE,
];
  • $config['mysql_proxy']

代表MySQL代理相关配置

  • master_slave

这里的master_slave仅代表一组MySQL服务器集群,是一个标识

  • mode

MySQL集群类型\PG\MSF\Marco::MASTER_SLAVE代表主从结构的MySQL集群

  • pools

当mode设置为\PG\MSF\Marco::MASTER_SLAVE, pools.master表示MySQL主节点对应的连接池标识;pools.slaves为数字索引MySQL从节点对应的连接池标识列表

MySQL代理的使用

<?php
/**
 * MySQL示例控制器
 *
 * app/data/demo.sql可以导入到mysql再运行示例方法
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

namespace App\Controllers;

use PG\MSF\Controllers\Controller;

class MySQL extends Controller
{
    // MySQL代理使用示例
    public function actionProxy()
    {
        /**
         * @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy
         */
        $mysqlProxy = $this->getMysqlProxy('master_slave');
        $bizLists   = yield $mysqlProxy->select("*")->from('user')->go();
        $up         = yield $mysqlProxy->update('user')->set('name', '徐典阳-6')->where('id', 3)->go();
        $this->outputJson($bizLists);
    }

    // MySQL代理事务,事务只会在主节点上执行
    public function actionProxyTransaction()
    {
        /**
         * @var \PG\MSF\Pools\Miner|\PG\MSF\Pools\MysqlAsynPool $mysqlProxy
         */
        $mysqlProxy = $this->getMysqlProxy('master_slave');
        // 开启一个事务,并返回事务ID
        $id = yield $mysqlProxy->goBegin();
        $up = yield $mysqlProxy->update('user')->set('name', '徐典阳-2')->where('id', 3)->go($id);
        $ex = yield $mysqlProxy->select('*')->from('user')->where('id', 3)->go($id);
        if ($ex['result']) {
            yield $mysqlProxy->goCommit($id);
            $this->outputJson('commit');
        } else {
            yield $mysqlProxy->goRollback($id);
            $this->outputJson('rollback');
        }
    }
}

MySQL代理基于连接池,它和连接池的使用唯一区别在于从$this->getMysqlPool切换为$this->getMysqlProxy,所有的调用方式和连接池保持一致,就是这么简单。

MySQL同步模式

有一些场景,需要用到MySQL同步查询数据,比如Task在Tasker进程中执行,由于Tasker是同步阻塞的进程模型,在处理数据过程中又需要查询数据库中的数据,然后再计算相关数据,这个时候就需要使用MySQL同步模式。

php-msf框架内部已经将异步和同步查询MySQL数据的差异屏蔽,同步模式下采用长连接,如果连接断开,驱动会自动重连,唯一的区别在于同步模式不需要添加yield关键字,如:

MySQL同步Task

<?php
/**
 * Demo Task
 *
 * 注意理论上本文件代码应该在Tasker进程中执行
 */

namespace App\Tasks;

use \PG\MSF\Tasks\Task;

/**
 * Class Demo
 * @package App\Tasks
 */
class Demo extends Task
{
    /**
     * 连接池执行同步查询
     *
     * @return array
     */
    public function syncMySQLPool()
    {
        $user = $this->getMysqlPool('master')->select("*")->from("user")->go();
        return $user;
    }

    /**
     * 代理执行同步查询
     *
     * @return array
     */
    public function syncMySQLProxy()
    {
        $user = $this->getMysqlProxy('master_slave')->select("*")->from("user")->go();
        return $user;
    }

    /**
     * 连接池执行同步事务
     *
     * @return boolean
     */
    public function syncMySQLPoolTransaction()
    {
        $mysqlPool = $this->getMysqlPool('master');
        $id = $mysqlPool->begin();
        // 开启一个事务,并返回事务ID
        $up = $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id);
        $ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
        if ($ex['result']) {
            $mysqlPool->commit();
            return true;
        } else {
            $mysqlPool->rollback();
            return false;
        }
    }

    /**
     * 代理执行同步事务
     *
     * @return boolean
     */
    public function syncMySQLProxyTransaction()
    {
        $mysqlPool = $this->getMysqlProxy('master_slave');
        $id = $mysqlPool->begin();
        // 开启一个事务,并返回事务ID
        $up = $mysqlPool->update('user')->set('name', '徐典阳-1')->where('id', 3)->go($id);
        $ex = $mysqlPool->select('*')->from('user')->where('id', 3)->go($id);
        if ($ex['result']) {
            $mysqlPool->commit();
            return true;
        } else {
            $mysqlPool->rollback();
            return false;
        }
    }
}

示例代码:

https://github.com/pinguo/php-msf-demo/app/Tasks/Demo.php

调用MySQL同步查询数据

<?php
/**
 * MySQL示例控制器
 *
 * app/data/demo.sql可以导入到mysql再运行示例方法
 *
 * @author camera360_server@camera360.com
 * @copyright Chengdu pinguo Technology Co.,Ltd.
 */

namespace App\Controllers;

use PG\MSF\Controllers\Controller;
use App\Tasks\Demo as DemoTask;

class MySQL extends Controller
{
    // 通过Task,同步执行MySQL查询(连接池)
    public function actionSyncMySQLPoolTask()
    {
        /**
         * @var DemoTask $demoTask
         */
        $demoTask = $this->getObject(DemoTask::class);
        $user     = yield $demoTask->syncMySQLPool();
        $this->outputJson($user);
    }

    // 通过Task,同步执行MySQL查询(代理)
    public function actionSyncMySQLProxyTask()
    {
        /**
         * @var DemoTask $demoTask
         */
        $demoTask = $this->getObject(DemoTask::class);
        $user     = yield $demoTask->syncMySQLProxy();
        $this->outputJson($user);
    }

    // 通过Task,同步执行MySQL事务查询(连接池)
    public function actionSyncMySQLPoolTaskTransaction()
    {
        /**
         * @var DemoTask $demoTask
         */
        $demoTask = $this->getObject(DemoTask::class);
        $user     = yield $demoTask->syncMySQLPoolTransaction();
        $this->outputJson($user);
    }

    // 通过Task,同步执行MySQL事务查询(代理)
    public function actionSyncMySQLProxyTaskTransaction()
    {
        /**
         * @var DemoTask $demoTask
         */
        $demoTask = $this->getObject(DemoTask::class);
        $user     = yield $demoTask->syncMySQLProxyTransaction();
        $this->outputJson($user);
    }
}

links

results matching ""

    No results matching ""