地方エンジニアの学習日記

興味ある技術の雑なメモだったりを書いてくブログ。たまに日記とガジェット紹介。

【Nginx】ngx_http_upstream_process_upgradedのメモ

NginxではWebSocketのプロキシーも対応している。その中でもngx_http_upstream_process_upgradedは、Nginx HTTPモジュールにおけるプロトコルアップグレードの処理を担当しています。主な役割は、クライアントとアップストリームサーバー間のデータ転送を管理することです。以下に、関数の主要な動作を書いておきます。

  • 接続のタイムアウト処理
    • クライアント側または上流サーバー側の接続でタイムアウトが発生した場合、適切なエラーレスポンスを返します
  • データ転送の仕組み
    • from_upstream パラメータに応じて、データの送信元と送信先を決定します
    • クライアントから上流サーバー、または上流サーバーからクライアントへのデータ転送を処理します
  • バッファ管理
    • データ転送用のバッファを動的に確保し、データの読み取りと書き込みを効率的に行います
    • バッファが一杯になった場合は、自動的にリセットされます
  • イベント駆動の処理
    • 読み取りと書き込みイベントを継続的に監視し、データの可用性に応じて処理を行います
    • 接続の終了(EOF)や エラー状態を検出し、適切に対応します
  • タイマー管理
    • 読み取りおよび書き込みイベントにタイムアウトを設定し、接続の健全性を確保します

またNginx 自体は WebSocket のプロトコルを理解しているわけではありませんが、プロキシとして WebSocket のデータを転送することができます。これは、Nginx がプロトコル非依存で TCP ストリームを処理しているからです。

nginx.org

実装を追ってみる

static void
ngx_http_upstream_process_upgraded(ngx_http_request_t *r,
    ngx_uint_t from_upstream, ngx_uint_t do_write)
{
    size_t                     size;
    ssize_t                    n;
    ngx_buf_t                 *b;
    ngx_uint_t                 flags;
    ngx_connection_t          *c, *downstream, *upstream, *dst, *src;
    ngx_http_upstream_t       *u;
    ngx_http_core_loc_conf_t  *clcf;

    c = r->connection;
    u = r->upstream;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream process upgraded, fu:%ui", from_upstream);

    downstream = c;
    upstream = u->peer.connection;

タイムアウト処理

    // タイムアウトフラグが設定されている場合はタイムアウト
    if (downstream->write->timedout) {
        c->timedout = 1;
        ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT);
        return;
    }

    if (upstream->read->timedout || upstream->write->timedout) {
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
        ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT);
        return;
    }

Nginxのアップグレードされた接続において、データ転送の方向を決定する

    // データが上流サーバーから下流クライアントに転送されるかどうかを判断
    if (from_upstream) {
        src = upstream;
        dst = downstream;
        b = &u->buffer;
    } else { // クライアントから上流サーバーへのデータ転送を扱う
        src = downstream;
        dst = upstream;
        b = &u->from_client;

        if (r->header_in->last > r->header_in->pos) { // クライアントからまだ読み込んでいないHTTPヘッダデータが存在する場合を判定
            b = r->header_in;
            b->end = b->last;
            do_write = 1;
        }

        // 必要に応じてバッファを初期化します
        if (b->start == NULL) {
            b->start = ngx_palloc(r->pool, u->conf->buffer_size);
            if (b->start == NULL) {
                ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                return;
            }

            b->pos = b->start;
            b->last = b->start;
            b->end = b->start + u->conf->buffer_size;
            b->temporary = 1;
            b->tag = u->output.tag;
        }
    }

データの実際の転送とイベント処理を行う

    for ( ;; ) { // このループ内で、データの読み書きとイベント処理を繰り返し行う

        if (do_write) { // フラグが真の場合、データの書き込み処理を行う

            size = b->last - b->pos;

            if (size && dst->write->ready) {

                n = dst->send(dst, b->pos, size);

                if (n == NGX_ERROR) {
                    ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
                    return;
                }

                if (n > 0) {
                    b->pos += n;

                    if (b->pos == b->last) {
                        b->pos = b->start;
                        b->last = b->start;
                    }
                }
            }
        }

        size = b->end - b->last;

        if (size && src->read->ready) {

            n = src->recv(src, b->last, size);

            if (n == NGX_AGAIN || n == 0) {
                break;
            }

            if (n > 0) {
                do_write = 1;
                b->last += n;

                if (from_upstream) {
                    u->state->bytes_received += n;
                }

                continue;
            }

            if (n == NGX_ERROR) {
                src->read->eof = 1;
            }
        }

        break;
    }

接続終了の判定

    if ((upstream->read->eof && u->buffer.pos == u->buffer.last)
        || (downstream->read->eof && u->from_client.pos == u->from_client.last)
        || (downstream->read->eof && upstream->read->eof))
    {
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "http upstream upgraded done");
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    if (ngx_handle_write_event(upstream->write, u->conf->send_lowat)
        != NGX_OK)
    {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

イベントの再登録とタイマーの設定

// upstream 書き込みイベントがアクティブで、まだ準備ができていない場合
if (upstream->write->active && !upstream->write->ready) {
    // 書き込みタイムアウト用のタイマーを追加
    ngx_add_timer(upstream->write, u->conf->send_timeout);

} else if (upstream->write->timer_set) {
    // タイマーがすでに設定されている場合は削除
    ngx_del_timer(upstream->write);
}

// upstream の読み取りが EOF またはエラーを検出した場合
if (upstream->read->eof || upstream->read->error) {
    // 接続終了イベントフラグを設定
    flags = NGX_CLOSE_EVENT;

} else {
    // フラグはクリア
    flags = 0;
}

// upstream の読み取りイベントをハンドル(処理)する
if (ngx_handle_read_event(upstream->read, flags) != NGX_OK) {
    // エラーが発生した場合、リクエストを終了
    ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
    return;
}

// upstream の読み取りイベントがアクティブで、まだ準備ができていない場合
if (upstream->read->active && !upstream->read->ready) {
    // 読み取りタイムアウト用のタイマーを追加
    ngx_add_timer(upstream->read, u->conf->read_timeout);

} else if (upstream->read->timer_set) {
    // タイマーがすでに設定されている場合は削除
    ngx_del_timer(upstream->read);
}

// downstream 書き込みイベントをハンドル(処理)する
if (ngx_handle_write_event(downstream->write, clcf->send_lowat) != NGX_OK) {
    // エラーが発生した場合、リクエストを終了
    ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
    return;
}

// downstream の読み取りが EOF またはエラーを検出した場合
if (downstream->read->eof || downstream->read->error) {
    // 接続終了イベントフラグを設定
    flags = NGX_CLOSE_EVENT;

} else {
    // フラグはクリア
    flags = 0;
}

// downstream の読み取りイベントをハンドル(処理)する
if (ngx_handle_read_event(downstream->read, flags) != NGX_OK) {
    // エラーが発生した場合、リクエストを終了
    ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
    return;
}

// downstream の書き込みイベントがアクティブで、まだ準備ができていない場合
if (downstream->write->active && !downstream->write->ready) {
    // 書き込みタイムアウト用のタイマーを追加
    ngx_add_timer(downstream->write, clcf->send_timeout);

} else if (downstream->write->timer_set) {
    // タイマーがすでに設定されている場合は削除
    ngx_del_timer(downstream->write);
}