在一般的 Server 程序中都会有一些耗时的任务,比如:发送邮件、聊天服务器发送广播等。如果我们采用同步阻塞的防水去执行这些任务,那么这肯定会非常的慢。

Swoole 的 TaskWorker 进程池可以用来执行一些异步的任务,而且不会影响接下来的任务,很适合处理以上场景。

那么什么是异步任务呢?

可以从下面的图示中来简单了解一下。(来源于网络,侵删)

我们上一个 Swoole 的文章介绍了如何创建一个简单的服务器,并且知道了几个核心的回调函数的使用方法。

要实现上述的异步处理,只需要增加两个事件回调即可:onTask 和 onFinish, 这两个回调函数分别用于执行 Task 任务和处理 Task 任务的返回结果。另外还需要在 set 方法中设置 task 进程数量。

使用示例:


class Server
{
private $serv;
public function __construct() {
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array(
'worker_num' => 4,
'daemonize' => false,
'task_worker_num' => 8
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
} public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
echo "Get Message From Client {$fd}:{$data}\n";
// 发送任务到Task进程
$param = array(
'fd' => $fd
);
$serv->task( json_encode( $param ) );
echo "继续处理之后的逻辑\n";
} public function onTask($serv, $task_id, $from_id, $data) {
echo "This Task {$task_id} from Worker {$from_id}\n";
echo "Data: {$data}\n";
for($i = 0 ; $i < 5 ; $i ++ ) {
sleep(1);
echo "Task {$task_id} Handle {$i} times...\n";
}
$fd = json_decode( $data , true )['fd'];
$serv->send( $fd , "Data in Task {$task_id}");
return "Task {$task_id}'s result";
}
public function onFinish($serv,$task_id, $data) {
echo "Task {$task_id} finish\n";
echo "Result: {$data}\n";
}
public function onStart( $serv ) {
echo "Server Start\n";
}
public function onConnect( $serv, $fd, $from_id ) {
echo "Client {$fd} connect\n";
}
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
}
$server = new Server();

通过上述示例可以看到,发起一个异步任务只需要调用 swoole_server 的 task 方法就可以。发送之后会触发 onTask 回调,可以通过 $task_id 和 $from_id 处理不同进程的不同任务。最后可以通过 return 一个字符串来将执行结果返回给 Worker 进程,Worker 进程通过 onFinish 回调来处理结果。

那么基于上述代码就可以实现异步操作 mysql。异步操作 mysql 较适合以下场景:

  • 并发的读写操作
  • 没有时序上的严格关系
  • 不影响主线程逻辑

好处:

  • 提高并发
  • 降低 IO 消耗

数据库的压力主要在于 mysql 维持的连接数,如果存在 1000 个并发,那么 mysql 就需要建立对应数量的连接。而采用长连接的方式,mysql 的连接一直维持在进程中,减少了创建连接的损耗。可以通过 swoole 开启多个 task 进程,每一个进程内维持一个mysql 长连接,那么这样子也可以引申出来 mysql 连接池技术。还需要注意的是,mysql 服务器如果检测到长时间没有没有查询,则会断开连接回收资源,所以要有断线重连的机制。

以下是一个简单的异步操作 mysql 的示例:

还是以上的代码,我们只需要修改 onReceive、onTask、onFinish 三个函数。


class Server
{
private $serv;
public function __construct() {
$this->serv = new swoole_server("0.0.0.0", 9501);
$this->serv->set(array(
'worker_num' => 4,
'daemonize' => false,
'task_worker_num' => 8 // task进程数量 即为维持的MySQL连接的数量
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('Connect', array($this, 'onConnect'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
} public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
echo "收到数据". $data . PHP_EOL;
// 发送任务到Task进程
$param = array(
'sql' => $data, // 接收客户端发送的 sql
'fd' => $fd
);
$serv->task( json_encode( $param ) ); // 向 task 投递任务
echo "继续处理之后的逻辑\n";
} public function onTask($serv, $task_id, $from_id, $data) {
echo "This Task {$task_id} from Worker {$from_id}\n";
echo "recv SQL: {$data['sql']}\n";
static $link = null;
$sql = $data['sql'];
$fd = $data['fd'];
HELL:
if ($link == null) {
$link = @mysqli_connect("127.0.0.1", "root", "root", "test");
}
$result = $link->query($sql);
if (!$result) { //如果查询失败
if(in_array(mysqli_errno($link), [2013, 2006])){
//错误码为2013,或者2006,则重连数据库,重新执行sql
$link = null;
goto HELL;
}
}
if(preg_match("/^select/i", $sql)){//如果是select操作,就返回关联数组
$data = array();
while ($fetchResult = mysqli_fetch_assoc($result) ){
$data['data'][] = $fetchResult;
}
}else{//否则直接返回结果
$data['data'] = $result;
}
$data['status'] = "OK";
$data['fd'] = $fd;
$serv->finish(json_encode($data));
}
public function onFinish($serv, $task_id, $data) {
echo "Task {$task_id} finish\n";
$result = json_decode($result, true);
if ($result['status'] == 'OK') {
$this->serv->send($result['fd'], json_encode($result['data']) . "\n");
} else {
$this->serv->send($result['fd'], $result);
}
}
public function onStart( $serv ) {
echo "Server Start\n";
}
public function onConnect( $serv, $fd, $from_id ) {
echo "Client {$fd} connect\n";
}
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
}
$server = new Server();

以上代码在 onReceive 时直接接收一条 sql,之后直接发送到 Task 任务中。这个时候下一步的流程紧接着输出,这里也就体现出了异步。然后 onTask 和 onFinish 分别用来向数据库发送 sql,处理 task 执行结果。

参考链接:

https://wiki.swoole.com
http://rango.swoole.com/archi...

原文地址:https://segmentfault.com/a/1190000016706048

最新文章

  1. 进击的Python【第二十章】
  2. iOS7.0后隐藏状态栏(UIStatusBar)
  3. tp5文件上传
  4. Category和Extension
  5. objective-C运算符和表达式
  6. OC三种方法实现定时器
  7. windows套接字相关函数
  8. linux下Java环境的配置
  9. net.sf.json.JSONException: java.lang.reflect.InvocationTargetException Caused by: java.lang.IllegalArgumentException at java.sql.Date.getHours(Unknown Source)
  10. Spring源码:IOC原理解析(二)
  11. MyBatis Generator 自定义生成注释
  12. Akka-CQRS(2)- 安装部署cassandra cluster,ubuntu-16.04.1-LTS and MacOS mojave
  13. Hdoj 1232.畅通工程 题解
  14. 6486: An Ordinary Game(规律)
  15. bat杂记 cmd
  16. 菜鸟nginx源码剖析
  17. IPv4到IPv6的三种过渡技术
  18. 软工alpha阶段个人总结
  19. duilib 修复CTreeViewUI复选功能判断不准确的bug
  20. python 爬图

热门文章

  1. codevs 3945 完美拓印 (KMP)
  2. 使用Storyboard实现复杂界面
  3. qt 透明化方法汇总
  4. 《一个民企CEO的职场阳谋》–读书总结(上)
  5. SpringMVC上传文件(图片)并保存到本地
  6. oracle查询表空间的位置
  7. excel2013超链接进不去,提示“您的组织策略不允许...”
  8. 百度语音识别服务 —— 语音识别 REST API 开发笔记
  9. java 基础概念 -- 数组与内存控制
  10. C++实现页码数字统计