子プロセス制御ふたたび : PHP Advent Calendar jp 2011 Day 8

はい、7日目の @ さんのエントリ「DateTimeクラスの落とし穴と対策 : PHP Advent Calendar jp 2011 Day 7」から引き続いて、PHP Advent Calendar jp 2011の8日目なわけです。


今回は何を書こうかずいぶん悩んで、ちょうど昨晩開催されたPHP忘年会2011@関東でネタ募集したところ、@さんが口走ったphpQueryネタをパクるという案もあったのですが、やはり正攻法で手持ちのネタでいくことにしました。

子プロセスfork

このはてなダイアリーでの数少ないPHPヒットネタとして「pcntl extensionを使って一定個数の子プロセスに作業させる方法 - Blog::koyhoge」という記事がありまして、公開した2007年以来ぼちぼちとずっとアクセスを稼いでくれております。


ただこの時に書いたサンプルコードも、話を単純にするためにグローバル空間にベタ書きですし、今となってはあまり良いサンプルとは呼べません。実は今年になって、とある目的できちんとクラス化したものが作ったのでそれを紹介しようと思います。

きっかけは Amazon SQS+SDB のワーカー

その目的というのは、AMN の広告配信ログサーバを改善する際の途中成果として作成した、キューからデータを取ってきて DB に入れるワーカー処理です。11月15日に開催された「第7回 MongoDB 勉強会 in Tokyo」で発表した以下のスライドで、そこに至る背景と経緯を説明しています。

ProcessForker クラス

では早速コードを見ていきましょう。

<?php
class ProcessForker {
    // defaults
    protected $_options = array(
        'max_children' => 10,  // max number of child processes
        'process_limit' => 0,  // return when this count tasks are finished
        'loop_task' => false,  // reuse tasks
        'sleep' => 0, // microseconds
        'single_execution' => null,
        );

    protected $_idx_task = 0;

    public function __construct($options = null) {
        if (!empty($options)) {
            $this->_options = array_merge($this->_options, $options);
        }
    }

    public function fetchTask(&$tasks) {
        if ($this->_options['loop_task']) {
            $idx = $this->_idx_task;
            $task = $tasks[$idx];

            // circulation in tasks
            ++$idx;
            if ($idx >= count($tasks)) {
                $idx = 0;
            }
            $this->_idx_task = $idx;
        } else {
            $task = array_shift($tasks);
        }
        return $task;
    }

    public function run($tasks) {
        // number of current running child processes
        $nchildren = 0;
        // number of finished task
        $nfinished = 0;
        for (;;) {
            if (empty($tasks)) {
                break;
            }
            $maxproc = $this->_options['process_limit'];
            if (($maxproc > 0) && ($maxproc <= $nfinished)) {
                break;
            }
            if ($nchildren <= $this->_options['max_children']) {
                $task = $this->fetchTask($tasks);

                $pid = pcntl_fork();
                if ($pid === -1) {
                    throw new Exception('pcntl_fork faild');
                } else if ($pid) {
                    // parent process
                    ++$nchildren;
                } else {
                    $exit_code = 0;
                    // child process
                    $func = $task[0];
                    $arg = $task[1];
                    if (!is_array($arg)) {
                        $arg = array($arg);
                    }

                    try {
                        call_user_func_array($func, $arg);
                    } catch (Exception $e) {
                        $exit_code = -1;
                    }
                    // care singleExecution:
                    // child process must not unlock
                    $se = $this->_options['single_execution'];
                    if ($se !== null) {
                        $se->setDoUnlock(false);
                    }
                    exit($exit_code);
                }

                $sleep = $this->_options['sleep'];
                if ($sleep > 0) {
                    usleep($sleep);
                }
            } else {
                $pid = pcntl_waitpid(0, $status, 0);
                --$nchildren;
                ++$nfinished;
            }
        }
    }
}

前回は決め打ちだった各種パラメータや実際に実行する処理を、すべて外部から渡せるようにしてあります。

使用例

この ProcessForker クラスは以下のように使用します。

<?php
require_once 'ProcessForker.php';

function hoge($str) {
    srand();
    $rand = rand(1, 5);
    echo "$rand: $str\n";
    sleep($rand);
}
$opts = array(
    'max_children' => 4,
    'process_limit' => 50,
    'loop_task' => true,
    );
$pf = new ProcessForker($opts);

$tasks = array(
    array('hoge', 'a'),
    array('hoge', 'b'),
    );
$pf->run($tasks);

まずはオプションを指定してProcessForkerオブジェクトを作ります。それぞのれオプションの意味は以下になります。

max_children 子プロセスの最大同時実行数
process_limit いくつのタスクを処理したら終了するか
loop_task 渡されたタスクを繰り返すかどうか
sleep 親プロセスが処理を行うたびにスリープする時間(us)
single_execution singleExecutionオブジェクト(後述)

この例では、

  • 最大4つの子プロセスを起動し (max_children => 4)
  • 全部で50回の処理を行い (process_limit => 50)
  • タスクを繰り返しながら (loop_task => true)

実行するという意味になります。

タスク指定

子プロセスとして処理するタスクは、配列の形で渡します。

array(<callable>, <引数>)

callablePHPis_callable() でtrueとなるもので、以下のどれかになります。

タスクへの引数は、ひとつの場合はそのままで構いませんが、複数の場合はそれも配列にして渡します。

繰り返し処理

loop_task オプションが true になっていると、渡されたタスクをすべて処理し終えてても終了せずに、またタスクリストの先頭から処理を行います。同じ処理を何度も繰り返したいときは、タスクを1つだけ用意して loop_task を true にすれば良いです。

single_execution 対応

常にプロセスが動いていて欲しい場合は cron で毎分キックするということをよくやりますが、しかし大元の親プロセスが複数立ち上がってしまうのは好ましくありません。そこで @ さんが作られた single_execution を利用して、同時起動チェックを行いました。しかし単に ProcessForker と single_execution を組み合わせるだけではどうも上手く動いてくれません。


調べたところ ProcessForker から起動された子プロセスが、プロセス終了時に singleExecution の管理しているロックファイルを削除してしまっていることが解りました。single_execution はそこから更に子プロセスが fork されることなど想定していないので当然ですね。

ということで、single_execution.php にパッチを当ててプロセス終了時のロック解除を選択できるように変更しました。ProcessForker 作成時の single_execution オプションに singleExecution オブジェクトを渡せば、子プロセスの場合はロックを解除せずに終了するようになっています。改変した single_execution.php は以下に置いてあります。

https://gist.github.com/1446922

これですでにプロセスが動作しているかどうか気にせずに、cronからスクリプトを定期的に起動するだけで、複数プロセスで動的に処理を行なってくれるようになりました。


明日の Advent Calendar は @takuya_1st さんです。