PHP进程间通信探究

PHP作为解释器运行通过线程或者进程都能实现(如果使用Apache,那么就可能使用多线程模型。使用php-fpm,就是使用多进程模型,这里以多进程模型解释)。服务器每接收到一个请求就要起一个PHP进程,平均一个PHP进程消耗内存2M左右(默认最大为8M,参数可以设置)。独立的进程让PHP能专一的做自己的解释工作,程序员也从复杂的代码逻辑中走出来,不用担心资源的竞争和各种锁问题。独立进程虽好但这也导致想通过多进程或者异步来提速成本非常的高(主要是开发难度)。如果一定要通过PHP实现多进程和异步其实是很容易做到的。

PHP有很多第三方扩展,比如Swoole能让PHP像Node一样实现异步。PHP官方扩展库pcntl_*能很简单的实现多进程。扩展虽好,但实际应用时切忌要慎重,便利的同时风险也来了。比如对多进程的控制,处理不好很容易导致程序死锁,CPU内存爆表、服务器宕机。异步回调的Coding方式与PHP本身的编程思想有一定出入,驾驭不好也是灾难。

当然也不能说的太吓人,在实际的项目中我们有很多场景不得不考虑通过多进程或者异步来优化程序。这里举一个很常见的例子『发送消息通知』,比如短信和邮件。这里说一个实际的场景:企业需要给200W用户发短信通知,短信接口支持最大100次/秒的调用频率,短信接口每次调用耗时300毫秒。如果单进程跑脚本的话,需要7天才能把短信发完。如果我们起30个进程,每秒能发送100条短信,6个小时内能发完,能提速30倍。优化方案确定之后,我们再看如何通过PHP去实现这样一个脚本。

一. pcntl扩展初探;

  1. 通过pcntl扩展创建多进程,参见如下代码;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
        function demo(array $phoneList){
    $cnt = count($phoneList); //测试数组大小
    $slice = 30; //需要调用的进程数量
    $master = array_chunk($phoneList,floor($cnt/$slice));
    $childList = [];

    while($slice >= 0)
    {
    $pid = pcntl_fork();
    if($pid > 0){
    $childList[$pid] = 1;
    //$pid>0表示当前还在执行父进程的代码
    //这里最好啥都不做,每次执行pcntl_fork都会执行这里的代码。
    //这里的代码执行完之后 会将$pid设置为0,然后jump到pcntl_fork代码之后,重新做判断;
    }elseif($pid == 0){
    //这里写我们的逻辑
    foreach($master[$slice] as $val)
    {
    //这里发生短信
    echo sprintf("%s Child:%s \r\n",$slice,$val);
    }
    //子进程执行完之后务必需要关闭;
    exit();
    }else
    {
    //程序发生错误也需要关闭程序
    exit();
    }
    $slice--;
    }

    // 等待所有子进程结束后回收资源
    while(!empty($childList)){
    $childPid = pcntl_wait($status);
    if ($childPid > 0){
    unset($childList[$childPid]);
    }
    }
    }

    /** 运行的结果如下,phone不是连续的
    Slice id:19,phone:66558
    Slice id:23,phone:79921
    Slice id:19,phone:66559
    Slice id:23,phone:79922
    Slice id:19,phone:66560
    Slice id:23,phone:79923
    Slice id:19,phone:66561
    Slice id:23,phone:79924
    Slice id:19,phone:66562
    Slice id:23,phone:79925
    **/

    通过pcntl扩展,几句代码就使用多进程将发消息通知的功能提速了30倍。不过这么简单的多进程编码,我为什么会在文章开始形容的如此复杂呢?

    重点和难点还是进程间通信,因为我们给用户发短信的每个子进程是相对独立的,进程之间没有通信,不会互相传递数据状态。所以不会发生资源抢占与锁问题。假如需求发生变化,我们需要按用户的活跃度高低给用户发短信,该怎么做?

    通俗点解释如下:一个盘子里有30个苹果,需要发给30个人,由3个人负责发苹果。最简单的办法就是我们先把苹果分成3份,3个人一人一份,很快就能发完。但是如果我们要按照苹果的大小顺序去发,把大苹果先发出去,此时我们就没办法分成3份了,只能三个人互相去挣当前最大的,很容易就打起来。那该怎么做呢?最常见的办法就是使用一个工具把所有苹果按由大到下的顺序放在里面,每次只能取一个,这样就解决了资源抢占的问题。

    关于进程间资源抢占的问题非常的复杂,编码难度非常高,这也是为什么很少使用PHP跑多进程的原因。当需要用到多进程时我们更愿意去使用Python或者Java,它们对多线程封装的更好。需要重点说的是PHP并不是不能写多进程的程序,也不是像其他人说的不稳定,而是编码费时,维护成本高。

二. 进程间通信

常见的进程通信方式有:消息队列、共享内存与信号量、管道、socket,我将一一举例说明。

  1. 消息队列
    『消息队列』是在消息的传输过程中保存消息的容器。消息队列管理器相当于消息发送者和接收者的中介。消息队列的主要目的是创建路由并且保证消息可靠传递;如果发送消息时接收者不可用,消息队列会保留消息,直到有人接收它。

    消息队列可提供临时存储的功能并且能保证消息可靠的传递,我们正好使用它实现进程间通信。当然消息队列不单单用于进程间通信,他的应用领域非常广。比如消息队列非常适用于解决消费者和生产者的问题,因为生产者和消费者之间总会存在『速度差』。比如生产者突然少了10个,两边处理的速度就会不平衡,会导致排队阻塞,服务不可用。这肯定不是我们想看到的,如果这时候引入消息队列将两个系统解耦,无论谁慢了都不会影响整体业务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    function demo(array $phoneList){
    global $msgQueue;
    $cnt = count($phoneList); //测试数组大小
    $slice = 3; //需要调用的进程数量
    $childList = [];
    //主进程先发送一条消息,告诉子进程可以发送第一条短信了
    msg_send($msgQueue,MSG_TYPE,0);

    while($slice >= 0)
    {
    $pid = pcntl_fork();
    if($pid > 0){
    $childList[$pid] = 1;
    //父进程什么都不用做

    }elseif($pid == 0){
    //子进程不停的请求,直到所有短信发送完成
    while(msg_receive($msgQueue,MSG_TYPE,$msgType,1024,$message))
    {
    if($cnt>intval($message))
    {
    printf("Slice id:%s,phone:%s \r\n",$slice,$phoneList[$message]);
    $message = $message + 1;
    msg_send($msgQueue,MSG_TYPE,$message);
    }else
    {
    //通知其他进程一切都结束了
    msg_send($msgQueue,MSG_TYPE,$cnt);
    exit();
    }
    }
    }else
    {
    //程序发生错误也需要关闭程序
    exit();
    }
    $slice--;
    }

    // 等待所有子进程结束后回收资源
    while(!empty($childList)){
    $childPid = pcntl_wait($status);
    if ($childPid > 0){
    unset($childList[$childPid]);
    }
    }
    }

    const MSG_TYPE = 1;
    //创建消息队列
    $id = ftok(__FILE__,'m');
    $msgQueue = msg_get_queue($id);

    demo(range(0,900));

    /**运行结果,按大小输出
    Slice id:1,phone:895
    Slice id:1,phone:896
    Slice id:2,phone:897
    Slice id:3,phone:898
    Slice id:3,phone:899
    **/
  2. 共享内存与信号量
    『共享内存』很容易理解,就是在内存中找一块区域,所有进程都能读写。『信号量』是系统提供的一种原子操作,进程在开启信号和结束信号之间拥有共享内存的『绝对占有』权,这样能有效的防止多个进程读取同一个资源时发生死锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    	function demo(array $phoneList){
    global $shareMemory;
    global $signal;
    $cnt = count($phoneList); //测试数组大小
    $slice = 3; //需要调用的进程数量
    $childList = [];

    while($slice >= 0)
    {
    $pid = pcntl_fork();
    if($pid > 0){
    $childList[$pid] = 1;
    //父进程什么都不用做

    }elseif($pid == 0){

    while(true)
    {
    // 标记信号量,这里被我承包了
    sem_acquire($signal);
    //检测共享内存是否存在
    if (shm_has_var($shareMemory,SHARE_KEY)){
    //从共享内存中拿数据
    $val = shm_get_var($shareMemory,SHARE_KEY);
    if($val>=$cnt)
    {
    sem_release($signal);
    break;
    }else
    {
    printf("Slice id:%s,phone:%s \r\n",$slice,$phoneList[$val]);
    $val ++;
    //再将数据写入共享内存
    shm_put_var($shareMemory,SHARE_KEY,$val);
    }
    }else{
    // 无值会,先初始化
    shm_put_var($shareMemory,SHARE_KEY,0);
    }
    // 用完释放
    sem_release($signal);
    }
    exit();
    }else
    {
    //程序发生错误也需要关闭程序
    exit();
    }
    $slice--;
    }

    // 等待所有子进程结束后回收资源
    while(!empty($childList)){
    $childPid = pcntl_wait($status);
    if ($childPid > 0){
    unset($childList[$childPid]);
    }
    }
    }

    const SHARE_KEY = 1;

    // 创建一块共享内存
    $shm_id = ftok(__FILE__,'a');
    $shareMemory = shm_attach($shm_id);

    // 创建一个信号量
    $sem_id = ftok(__FILE__,'b');
    $signal = sem_get($sem_id);

    demo(range(0,900));

    // 释放共享内存与信号量
    shm_remove($shareMemory);
    sem_remove($signal);
    /**运行结果,按大小输出
    Slice id:1,phone:775
    Slice id:3,phone:776
    Slice id:3,phone:777
    Slice id:3,phone:778
    Slice id:0,phone:779
    Slice id:0,phone:780
    **/
  3. 管道
    管道是比较常用的进程间通信手段,管道又分为匿名管道(pipe)与具名管道(mkfifo),匿名管道只能用于具有亲缘关系的进程间通信,而具名管道可以用于同一主机上任意进程。

    pipe与mkfifo的主要差别是mkfifo会创建一个特殊的FIFO物理文件,这个FIFO文件其他进程都可以像读写一般文件一样读写。再写下去文章就太长了,之后写下一篇吧。

    未完待续……

PS:所有代码都放到了GitHub:php_thread_demo