FuelPHPでMongoDBをちょびっと便利に使う

MongoDB Advent Calendar 2013の14日目です。まぁ途中で一度途切れているので気楽に行きましょうw


さて、このエントリはここ連続で続いている FuelPHP ネタでもあります。

MongoDBでSQL的なシーケンスをどうするか

FuelPHPでMongoDBを使うには、Coreに含まれている Mongo_Dbクラスを使うのが普通だと思います。基本的なメソッドはひと通り用意されていて、通常使う文にはまあ困らないでしょう。ただしちょっと突っ込んだことをやろうとすると、そのままでは使えなくて何らかの拡張を自分ですることになるのもよくあることです。


MongoDBをアプリケーションデータの格納に使うとして、SQL的なシーケンス(SEQUENCE)が欲しいことがあります。MySQLでいうところのSERIAL属性と同様で、一意な値を自動採番していくれる仕組みです。むろんMongoDBでもちょっとした仕掛けで実現可能で、そのための手法はちょっとググれば簡単に見つかります。ただし FuelPHP の Mongo_Db クラスでは、PHP の Mongo オブジェクトがプライベート変数になっていて直接アクセスできるメソッドもないので、直接アクセスするには Mongo_Db クラスを継承したクラスを用意する必要があります。そうやって作ったのが、以下の Util_MongoPlus クラスです。

<?php

class Util_MongoPlus extends \Fuel\Core\Mongo_Db
{
    protected $seq_collection = 'sequences';

    public function getRawMongo()
    {
        return $this->db;
    }

    public function seqNext($name)
    {
        $query = array(
            '_id' => $name,
            );
        $update = array(
            '$inc' => array('seq' => 1),
            );
        $options = array(
            'new' => true,
            'upsert' => true,
            );

        $result = $this->db->{$this->seq_collection}->findAndModify(
            $query,
            $update,
            null,
            $options
            );
        return $result['seq'];
    }
} 

直接 Mongo オブジェクトにアクセスする getRawMongo メソッドと、シーケンスを実現する seqNext メソッドを持っています。シーケンスは sequences というコレクションを作り、そこにシーケンス名がキーのレコードを作成して、アトミックにインクリメントを行うことで実現しています。


※ とここまで書いていて、Mongo_Db::get_collection メソッドを用いれば任意のコレクションにアクセスできるので、直接 Mongo オブジェクトを使う必要がないことに気がつきました。まあいいやw

MongoDBをベースにしたモデルのベースクラスを作る

で、ここまでできたらもういっそ Model のベースクラスまで作ってしまえということで、できたのが以下です。

<?php
use Fuel\Core\Date;

class Model_Mongobase
{
    static protected $table_name = null;
    static protected $uniq_id = null;

    static protected $timestamp = null;

    static public function getMongo()
    {
        return \Util_MongoPlus::instance();
    }

    static public function getTable()
    {
        return static::$table_name;
    }

    static public function getSeqName()
    {
        // dummy
        return null;
    }

    static public function newId()
    {
        $mongo = static::getMongo();
        $seq = static::getSeqName();
        if (empty($seq)) {
            throw new Exception('empty sequence');
        }
        return $mongo->seqNext($seq);
    }

    // base methods
    static public function get($id)
    {
        if (static::$uniq_id === null) {
            throw new Exception('Unique key is not found');
        }

        $mongo = static::getMongo(); 
        $conds = array(
            static::$uniq_id => (int)$id,
            );
        $result = $mongo->get_where(static::getTable(), $conds);
        if (count($result) === 0) {
            throw new Exception('Item not found');
        }
        return $result[0];
    }

    static public function all($options = null)
    {
        $conds = array_val($options, 'conditions');
        $order_by = array_val($options, 'order_by');
        $use_cursor = array_val($options, 'use_cursor', false);

        $mongo = static::getMongo();
        if (!empty($conds)) {
            $mongo->where($conds);
        }
        if (!empty($order_by)) {
            $mongo->order_by($order_by);
        }
        if ($use_cursor) {
            $result = $mongo->get_cursor(static::getTable());
        } else {
            $result = $mongo->get(static::getTable());
        }
        return $result;
    }

    static public function save($data)
    {
        if (static::$uniq_id === null) {
            $result = static::saveWithNonuniq($data);
        } else {
            $result = static::saveWithUniq($data);
        }
        return $result;
    }
    static public function saveWithNonuniq($data)
    {
        // call hook
        static::beforeInsert($data);

        $mongo = static::getMongo();
        return $mongo->insert(static::getTable(), $data);
    }

    static public function saveWithUniq($data)
    {
        $mongo = static::getMongo();

        $id_name = static::$uniq_id;

        $id = array_val($data, $id_name);
        if (empty($id)) {
            $id = static::newId();
            $data[$id_name] = $id;

            static::beforeInsert($data);
            $result = $mongo->insert(static::getTable(), $data);
        } else {
            $conds = array(
                $id_name => (int)$id,
                );

            static::beforeUpdate($data);
            $result = $mongo->where($conds)->update(static::getTable(), $data);
        }
        return $result;
    }

    static public function deleteWithUniq($id)
    {
        $mongo = static::getMongo();

        if (static::$uniq_id === null) {
            $id_key = '_id';
        } else {
            $id_key = static::$uniq_id;
        }
        $conds = array(
            $id_key => (int)$id,
            );
        return $mongo->where($conds)->delete(static::getTable());
    }

    static public function deleteAll($conds)
    {
        $mongo = static::getMongo();

        return $mongo->where($conds)->delete_all(static::getTable());
    }

    static protected function beforeInsert(&$data)
    {
        $hook = array_val(static::$timestamp, 'before_insert');
        if (empty($hook)) {
            return;
        }
        $key = array_val($hook, 'key');
        static::handleTimestamp($data, $key);
    }

    static protected function beforeUpdate(&$data)
    {
        $hook = array_val(static::$timestamp, 'before_update');
        if (empty($hook)) {
            return;
        }
        $key = array_val($hook, 'key');
        static::handleTimestamp($data, $key);
    }

    static public function handleTimestamp(&$data, $key)
    {
        foreach ($key as $k) {
            $data[$k] = Date::forge()->format('mysql');
        }
    }
}

あー、いつものarray_val()を使ってますね。ここで紹介している自作の便利関数です。


使用するには、例えば以下の様な Model_Mongobase を継承したクラスを作って

<?php

class Model_Item extends Model_Mongobase
{
    static protected $table_name = 'item';
    static protected $sequence = 'item';
    static protected $uniq_id = 'id';

    static protected $timestamp = array(
        'before_insert' => array(
            'key' => array('created_at', 'updated_at'),
            ),
        'before_update' => array(
            'key' => array('updated_at'),
            ),
        );

    static public function getSeqName()
    {
        return static::$sequence;
    }
}

あとは任意のデータを格納するだけです。

<?php
  :
    Model_Item::save(array('fuga' => 'hoge'));

継承したクラスで

static protected $uniq_id = 'id';

のように $uniq_id プロパティが定義されていれば、それに対してシーケンスを適用してくれます。


いつもなら Package にまとめて github で公開したりするんですが、これに関してはちょっとどういうまとめ方が良いのか悩んでいるところもあって、とりあえず生ソースをブログで公開することにしました。


明日のアドカレは@さんです。

「第3回 MongoDB 勉強会 in Tokyo」に参加してきたよ

ここ1ヶ月位、AMN の広告配信システムのログ保存を MongoDB に切り替える作業をしていて、最近は MongoDB の素晴らしさに驚かされっぱなしだったわけですが、調度良いタイミングで勉強会が開催されたので参加してきました。

しょっぱなの @ さんの MongoDB の機能を俯瞰するセッションが素晴らしくて、時間を大幅にオーバーしたにも関わらず、参加者からの不満は全く聞かれませんでした。(まぁどうせ後ろは懇親会ですし、時間オーバー気にしないw)

新しいコミュニティがいい感じで育っている

比較的新しめの技術である MongoDB は、運用ノウハウなどもまださほど溜まっている状態ではなく、使っている人が手探りで試しながら、そこから得られた知見を積極的に交換している状況です。みんな楽しそうに疑問点などについてディスカッションしていて、とてもいい雰囲気のコミュニティが育っているのを肌で感じました。

MongoDB はシングルスレッドか?

イベントの中で「MongoDB はシングルスレッドだから...」という発言が幾度もあって、エラーログに "can't create new thread" なんてメッセージを良く見ていた自分としては、本当かなぁと思っていたわけです。それで ps -eLf で見てみました。

# ps -eLf | grep mongod
mongod   22359     1 22359  0   20 May10 ?        00:34:24 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22360  0   20 May10 ?        00:00:00 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22361  0   20 May10 ?        00:00:15 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22363  0   20 May10 ?        00:00:00 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22364  0   20 May10 ?        00:00:04 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22365  0   20 May10 ?        00:00:02 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22367  0   20 May10 ?        00:14:54 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22369  0   20 May10 ?        00:00:00 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22382  0   20 May10 ?        00:00:32 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22383  0   20 May10 ?        00:00:31 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22384  0   20 May10 ?        00:00:12 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22385  0   20 May10 ?        00:00:08 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22386  0   20 May10 ?        00:00:00 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22388  0   20 May10 ?        00:00:23 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22392  0   20 May10 ?        00:00:24 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 22397  0   20 May10 ?        00:00:07 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1  4806  3   20 16:32 ?        00:00:11 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1  4818  0   20 16:32 ?        00:00:01 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 14798  0   20 16:37 ?        00:00:00 /usr/bin/mongod -f /etc/mongod.conf
mongod   22359     1 14799  0   20 16:37 ?        00:00:00 /usr/bin/mongod -f /etc/mongod.conf

やはりどうやらマルチスレッドで動いているぽい。


ではということでソースにあたってみます。先程の "can't create new thread" というメッセージで grep をかけたらあっさり見つかりました。


1.8.1 では db/db.c の 111 行目、OurListener::accepted() の中で

            try {
                boost::thread thr(boost::bind(&connThread,mp));
            }
            catch ( boost::thread_resource_error& ) {
                log() << "can't create new thread, closing connection" << endl;
                mp->shutdown();
                delete mp;
            }

と boost::thread を使ってスレッドを生成しています。多少コードを追ってみると、接続一つごとにスレッドを割り当てて処理しているようです。そこでboost::thread で grep すると、他にも様々な場所でスレッドを使っていることが分かりました。


これらから「MongoDB がシングススレッド」という意味は、一つのオペレーションは複数のスレッドに分割されて処理するわけではなく、一つのスレッドが割り当てられてそのスレッドのみが処理する、という意だと理解しました。


まぁ1プロセス1スレッドで動いているとしたら、あれだけの速度で動作するのは超絶職人芸的なプログラミングが必要だろうなと思っていたので、ある意味納得です。