地方エンジニアの学習日記

興味ある技術の雑なメモだったりを書いてくブログ。たまに日記とガジェット紹介。

【Redis】コードリーディングメモ イベント駆動エンジンまで

イベント駆動エンジンまでのエントリポイントのメモ。メモしようと思ったきっかけは以下のQAをみた時。

  • Q: Redisはソケット I/O を処理するためにそのようなオープンソースのイベント ライブラリを使いますか?
  • A: いいえ。様々な理由から、Redis は独自のイベント ライブラリを使います。

あ、独自なんですねって感じで読み始めることにしました。とりあえずエントリー部分だけでもメモ。read/writeなりの先の処理は今後読みたいときに読みます。Node.jsといいNginxといいlibev*系のOSSライブラリを素直に使ったイベント駆動アーキテクチャOSSってあんま無いんですかね。(memcachedはlibeventですが。。)

mogile.web.fc2.com

main()

エントリポイント。configやらの適用や起動オプションなんかを解析して設定を行っている。CPUアフィニティとかOOMのスコアなんかもここで設定している。

https://github.com/redis/redis/blob/6.2/src/server.c#L5856

mainにもあるのですが起動時にテストモードと呼ばれる物がRedisにも存在していて低レイヤのRedisのエンジン部分のリーディングにも使えそうなので覚えておくと良さそうでした。ガッツリ開発する人向けなんでしょうが出力情報が結構増えて迷子になりそうなときにも便利そうに思いました。

#ifdef REDIS_TEST
    if (argc == 3 && !strcasecmp(argv[1], "test")) {
        if (!strcasecmp(argv[2], "ziplist")) {
            return ziplistTest(argc, argv);
        } else if (!strcasecmp(argv[2], "quicklist")) {
            quicklistTest(argc, argv);
        } else if (!strcasecmp(argv[2], "intset")) {
            return intsetTest(argc, argv);
        } else if (!strcasecmp(argv[2], "zipmap")) {
            return zipmapTest(argc, argv);
        } else if (!strcasecmp(argv[2], "sha1test")) {
            return sha1Test(argc, argv);
        } else if (!strcasecmp(argv[2], "util")) {
            return utilTest(argc, argv);
        } else if (!strcasecmp(argv[2], "endianconv")) {
            return endianconvTest(argc, argv);
        } else if (!strcasecmp(argv[2], "crc64")) {
            return crc64Test(argc, argv);
        } else if (!strcasecmp(argv[2], "zmalloc")) {
            return zmalloc_test(argc, argv);
        } else if (!strcasecmp(argv[2], "sds")) {
            return sdsTest(argc, argv);
        }

        return -1; /* test not found */
    }
#endif

起動パラメータのエラーなんかもmainで処理していて単純な起動mainって感じではなくinit処理なんかのハンドリングもここでやってるので凝った設定とか書いた場合はここ読むのも良さそうな感じもしました。

aeMain()

イベントループの起動部分。起動するのみの薄いラッパー的な感じ。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

https://github.com/redis/redis/blob/6.2/src/ae.c#L485-L492

以下構造体でeventLoopは管理していてstopフラグが立つまではループする。

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

aeProcessEvents()

イベントの監視を行いつつイベント発生時のコールバックの起動なんかを行う関数。この辺からが所謂イベント駆動エンジンになるんでしょうか?ファイルディスクリプたを監視しつつreadableなら読み取り関数をコールしたりと言った処理を行っている。

イベントのフラグは以下のように定数で定義されている。監視自体はepollなりselectなりがOSごとに設定可能となっている。(ビルド時に指定?)

#define AE_NONE 0 // イベントは登録されていません
#define AE_READABLE 1 // 記述子が読み取り可能になったときに発生
#define AE_WRITABLE 2 // 記述子が書き込み可能である場合に起動

読み取り書き込みのイベントの起動部分は以下のようになっています。前述したフラグによる振り分けを行っています。

            int invert = fe->mask & AE_BARRIER;

            /* 通常、最初に読み取り可能なイベントを実行し、後で書き込み可能なイベントを実行します。
            * これは、クエリの処理直後にクエリの応答を提供できる場合があるため、便利です。
            * ただし、AE_BARRIERがマスクに設定されている場合、アプリケーションは逆のことを行うように要求します。
            * 読み取り可能なイベントの後に書き込み可能なイベントを発生させないでください。
            * このような場合、呼び出しを反転します。
            * これは、たとえば、クライアントに応答する前に、ファイルをディスクに同期するなど、
            * beforeSleep()フックで処理を実行する場合に役立ちます。
            */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* 書き込み可能なイベントを起動 */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* 呼び出しを反転する必要がある場合は、書き込み可能なイベントの後に読み取り可能なイベントを発生させます。*/
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

謎の最後のreturnは何用なんでしょうか。

    return processed; /* return the number of processed file/time events */
}

メモ

とりあえず何かしらの処理の先が読みたいみたいな場合にはこれを見返せば追っていくヒントにはなるはず。