地方エンジニアの学習日記

興味ある技術の雑なメモだったりを書いてくブログ。たまに日記とガジェット紹介。

【Redis】Blockコマンドの中身

mogile.web.fc2.com

BRPOPとかBLPOPとかその辺の中身がどんなことやってるのか気になったので見てみたメモ。

ちなみにクライアント側は空のリストへBRPOPなりを実行するとstraceの結果よりread(2)でブロックする。ローカルの検証だがコネクションはこんな感じで維持されている。

クライアントアプリとしてはredis-cliを使っている。keepaliveが付いてるのはそのせいで特に変わったことは無さそう。

root@k3s-node:~# netstat -antupo | grep redis-cli
tcp        0      0 127.0.0.1:47888         127.0.0.1:6379          ESTABLISHED 29080/redis-cli      keepalive (8.94/0/0)

クライアント側に関しては特に疑問はなくてそりゃそうだくらい。気になるのはサーバ側の実装。

疑問点

  • クライアントがBRPOPをサーバ側へ発行してブロックする処理はどうやってるの?
  • どのデータをクライアントがブロックして待ってるかはどうやって判定するの?

という2点。それぞれ調べてみる。

BRPOPの実装あたり

調査にあたってのコマンド実行のエントリポイントを貼っておく。

/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
    robj *o;
    mstime_t timeout;
    int j;

    // クライアントにCLIENT_BLOCKEDフラグを設定
    for (j = 1; j < c->argc-1; j++) {
        o = lookupKeyWrite(c->db,c->argv[j]);
        if (o != NULL) {
            if (checkType(c,o,OBJ_LIST)) {
                return;
            } else {
                if (listTypeLength(o) != 0) {
                    /* Non empty list, this is like a normal [LR]POP. */
                    robj *value = listTypePop(o,where);
                    serverAssert(value != NULL);

                    addReplyArrayLen(c,2);
                    addReplyBulk(c,c->argv[j]);
                    addReplyBulk(c,value);
                    decrRefCount(value);
                    listElementsRemoved(c,c->argv[j],where,o,1);

                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
                    rewriteClientCommandVector(c,2,
                        (where == LIST_HEAD) ? shared.lpop : shared.rpop,
                        c->argv[j]);
                    return;
                }
            }
        }
    }

    // 指定されたキー(リスト、zset、またはストリーム)に対してクライアントをブロックモードに設定
    blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,&pos,NULL);
}

クライアントがBRPOPをサーバ側へ発行してブロックする処理はどうやってるの?

内部的に追おうとしたがC特有のマクロがよくわからなすぎて断念。どうやって読めば良いんですかねこれ。。

とりあえずクライアントがブロッキングなコマンドを実行してきたらサーバ側はRedisModule_BlockClientを呼び出す。reply_callbackにブロック解除時に呼び出すコールバックを登録しておくという感じ。

REDISMODULE_API 
RedisModuleBlockedClient * 
(*RedisModule_BlockClient) (RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) 
REDISMODULE_ATTR;

#ifndef REDISMODULE_ATTR_COMMON
#    if defined(__GNUC__) && !defined(__clang__)
#        define REDISMODULE_ATTR_COMMON __attribute__((__common__))
#    else
#        define REDISMODULE_ATTR_COMMON
#    endif
#endif

attributeは以下を読んだがよくわからない。。

blueeyes.sakura.ne.jp

ちなみにクライアントがブロックされると、以下のAPIでブロックを解除することができるもよう。

int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);

どのデータをクライアントがブロックして待ってるかはどうやって判定するの?

ブロックされている状態はサーバ側が以下の構造体で保持しているらしい。

github.com

クライアント自体は以下の構造体

github.com

接続ごとにstrucr clientを生成。コマンドがblockingならstruct blockingStateに情報を書き込む。ブロックされているキーは情報として持っているのでそのキーへの操作が発生したらブロッククライアントのリストからクライアントへデータを返す。ような流れになりそう。

所感

大雑把すぎるがジョブキューとしてどんな仕組みでブロック/解除をしてるのかがなんとなく分かったのでよかった。redisのクライアント管理は構造体も大きく理解しづらい点が多々あるので暇な時読んでいきたいと思う。とりあえずまとめると

  • クライアントはreadでブロックする
  • サーバ側はクライアントを管理する構造を持っていてブロックしているキーにイベントが発生したらコールバックでクライアントへ返すような仕組み

って理解で良さそう。

あとイベント駆動というかそもそもC自体のリーディングが大変なのでもうちょっと早く正確に読めるようになろう。