swoole_process 主要是用来代替 PHP 的 pcntl 扩展。我们知道 pcntl 是用来进行多进程编程的,而 pcntl 只提供了 fork 这样原始的接口,容易使用错误,并且没有提供进程间通信以及重定向标准输入输出的功能。

而 swoole_process 则提供了比 pcntl 更强大的功能,更易用的API,使PHP在多进程编程方面更加轻松。

本文使用 swoole_process 与 EventLoop 完成一个 php 的进程池,并且支持动态创建新进程。

EventLoop

swoole 有一个 Reactor 线程,这个线程可以说是对 epoll 模型的封装,可以设置 read 事件和 write 事件的监听回调函数。

下面会用到一个函数:


bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);
  • 参数1为一个文件描述符,包括swoole_client->$sockswoole_process->$pipe或者其他 fd(socket_create 创建的资源 , stream_socket_client/fsockopen创建的资源)
  • 参数2为可读事件回调函数
  • 参数3为可写事件回调函数

多进程编程少不了进程之间的通讯,swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe)。那么本文使用的是 pipe 的方式。

下面是一个定时向进程池投递任务的例子。

代码:


<?php
class ProcessPool{ private $process; /**
* Worker 进程数组
* @var array
*/
private $process_list = []; /**
* 正在被使用的进程
* @var array
*/
private $process_use = []; /**
* 最少进程数量
* @var int
*/
private $min_worker_num = 3; /**
* 最多进程数量
* @var int
*/
private $max_worker_num = 6; /**
* 当前进程数量
* @var int
*/
private $current_num; public function __construct()
{
$this->process = new swoole_process(array($this, 'run'), false, 2);
$this->process->start();
swoole_process::wait();
} public function run()
{
$this->current_num = $this->min_worker_num;
//创建所有的worker进程
for($i = 0; $i < $this->current_num; $i++){
$process = new swoole_process(array($this, 'task_run'), false, 2);
$pid = $process->start();
$this->process_list[$pid] = $process;
$this->process_use[$pid] = 0;
} foreach($this->process_list as $process){
swoole_event_add($process->pipe, function ($pipe) use ($process){
$data = $process->read();
var_dump($data . '空闲');
//接收子进程处理完成的信息,并且重置为空闲
$this->process_use[$data] = 0;
});
} //每秒定时向worker管道投递任务
swoole_timer_tick(1000 ,function ($timer_id){
static $index = 0;
$index = $index + 1;
$flag = true; //是否新建worker
foreach ($this->process_use as $pid => $used){
if($used == 0){
$flag = false;
//标记为正在使用
$this->process_use[$pid] = 1;
// 在父进程内调用write,子进程可以调用read接收此数据
$this->process_list[$pid]->write($index. "hello");
break;
}
} if($flag && $this->current_num < $this->max_worker_num){
//没有闲置worker,新建worker来处理
$process = new swoole_process(array($this, 'task_run'), false, 2);
$pid = $process->start();
$this->process_list[$pid] = $process;
$this->process_use[$pid] = 1;
$this->process_list[$pid]->write($index. "hello");
$this->current_num++;
}
var_dump('第' .$index. '个任务');
if($index == 10){
foreach($this->process_list as $process){
$process->write("exit");
}
swoole_timer_clear($timer_id);
$this->process->exit();
} });
} /**
* 子进程处理
* @param $worker
*/
public function task_run($worker)
{
swoole_event_add($worker->pipe, function($pipe)use($worker){
$data = $worker->read();
var_dump($worker->pid . ':' . $data);
if($data == 'exit'){
$worker->exit();
exit;
}
//模拟耗时任务
sleep(5);
//告诉主进程处理完成
//在子进程内调用write,父进程可以调用read接收此数据
$worker->write($worker->pid);
});
} } new ProcessPool();

首先定义几个重要的属性:

  • $process_list :Worker 进程数组
  • $process_use:正在被使用的进程
  • $min_worker_num :最少进程数量
  • $max_worker_num :最多进程数量
  • $current_num :当前进程数量
  • $process : 主进程

在实例化的时候创建主进程,并且运行 run 方法,在 run 方法里面先创建所有的 worker 进程,并且设置为空闲状态。

接着遍历所有的 worker 进程,并且加入 EventLoop 中,设置可读事件,用于接收子进程的空闲信号。

最后每隔一秒向 worker 进程投递任务。动态扩充进程池则在这里实现,如果没有闲置的进程,而此时又有新的任务,则需要动态创建一个新的进程并且置为繁忙状态。由于只模拟了十次任务,则第十个任务完成之后在父进程中发送 exit 使所有子进程退出。

运行效果与图解:

参考链接:

https://wiki.swoole.com/wiki/...
https://opso.coding.me/2018/0...

原文地址:https://segmentfault.com/a/1190000016824172

最新文章

  1. Unix NetWork Programming(unix环境编程)——环境搭建(解决unp.h等源码编译问题)
  2. javascript判断变量是不是空值
  3. 用栈解决Largest Rectangle问题
  4. 【转载】 Pyqt QStackedWidget堆栈窗体
  5. ADF_Data Binding系列1_使用Bean Data Control
  6. Smarty3学习笔记
  7. Cocos2d-X3.0 刨根问底(五)----- Node类及显示对象列表源码分析
  8. Codeforces Round #230 (Div. 2) 解题报告
  9. HDU 4850
  10. Oracle bbed 实用示例-----File Header Reset
  11. HTTPS通信机制
  12. phpcms V9静态判断会员登录状态的方法
  13. 程序异常捕获库 - CrashRpt
  14. PAIP.提升效率----论项目知识库的建设。。
  15. WinPcap编程(三)
  16. Counting Triangles(hd1396)
  17. 转:apache 的mod-status
  18. uva12519
  19. C# 如何添加Excel页眉页脚(图片、文字、奇偶页不同)
  20. hadoop集群无法找到datanode节点问题解决

热门文章

  1. P3809 【模版】后缀排序
  2. javascript实现自动添加文本框功能
  3. 接口测试工具篇--jmeter
  4. CF17E Palisection(manacher)
  5. docker-ce-17.03.2 离线安装RPM包
  6. 大O时间复杂度
  7. JAVA SSL
  8. layDate1.0正式公布,您一直在寻找的的js日期控件
  9. QTP11.5公布,改名UFT
  10. OpenCASCADE License FAQs