NginxではWebSocketのプロキシーも対応している。その中でもngx_http_upstream_process_upgradedは、Nginx HTTPモジュールにおけるプロトコルアップグレードの処理を担当しています。主な役割は、クライアントとアップストリームサーバー間のデータ転送を管理することです。以下に、関数の主要な動作を書いておきます。
- 接続のタイムアウト処理
- クライアント側または上流サーバー側の接続でタイムアウトが発生した場合、適切なエラーレスポンスを返します
- データ転送の仕組み
- from_upstream パラメータに応じて、データの送信元と送信先を決定します
- クライアントから上流サーバー、または上流サーバーからクライアントへのデータ転送を処理します
- バッファ管理
- データ転送用のバッファを動的に確保し、データの読み取りと書き込みを効率的に行います
- バッファが一杯になった場合は、自動的にリセットされます
- イベント駆動の処理
- 読み取りと書き込みイベントを継続的に監視し、データの可用性に応じて処理を行います
- 接続の終了(EOF)や エラー状態を検出し、適切に対応します
- タイマー管理
- 読み取りおよび書き込みイベントにタイムアウトを設定し、接続の健全性を確保します
またNginx 自体は WebSocket のプロトコルを理解しているわけではありませんが、プロキシとして WebSocket のデータを転送することができます。これは、Nginx がプロトコル非依存で TCP ストリームを処理しているからです。
実装を追ってみる
static void ngx_http_upstream_process_upgraded(ngx_http_request_t *r, ngx_uint_t from_upstream, ngx_uint_t do_write) { size_t size; ssize_t n; ngx_buf_t *b; ngx_uint_t flags; ngx_connection_t *c, *downstream, *upstream, *dst, *src; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; c = r->connection; u = r->upstream; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream process upgraded, fu:%ui", from_upstream); downstream = c; upstream = u->peer.connection;
タイムアウト処理
// タイムアウトフラグが設定されている場合はタイムアウト if (downstream->write->timedout) { c->timedout = 1; ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_REQUEST_TIME_OUT); return; } if (upstream->read->timedout || upstream->write->timedout) { ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_GATEWAY_TIME_OUT); return; }
Nginxのアップグレードされた接続において、データ転送の方向を決定する
// データが上流サーバーから下流クライアントに転送されるかどうかを判断 if (from_upstream) { src = upstream; dst = downstream; b = &u->buffer; } else { // クライアントから上流サーバーへのデータ転送を扱う src = downstream; dst = upstream; b = &u->from_client; if (r->header_in->last > r->header_in->pos) { // クライアントからまだ読み込んでいないHTTPヘッダデータが存在する場合を判定 b = r->header_in; b->end = b->last; do_write = 1; } // 必要に応じてバッファを初期化します if (b->start == NULL) { b->start = ngx_palloc(r->pool, u->conf->buffer_size); if (b->start == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } b->pos = b->start; b->last = b->start; b->end = b->start + u->conf->buffer_size; b->temporary = 1; b->tag = u->output.tag; } }
データの実際の転送とイベント処理を行う
for ( ;; ) { // このループ内で、データの読み書きとイベント処理を繰り返し行う if (do_write) { // フラグが真の場合、データの書き込み処理を行う size = b->last - b->pos; if (size && dst->write->ready) { n = dst->send(dst, b->pos, size); if (n == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } if (n > 0) { b->pos += n; if (b->pos == b->last) { b->pos = b->start; b->last = b->start; } } } } size = b->end - b->last; if (size && src->read->ready) { n = src->recv(src, b->last, size); if (n == NGX_AGAIN || n == 0) { break; } if (n > 0) { do_write = 1; b->last += n; if (from_upstream) { u->state->bytes_received += n; } continue; } if (n == NGX_ERROR) { src->read->eof = 1; } } break; }
接続終了の判定
if ((upstream->read->eof && u->buffer.pos == u->buffer.last) || (downstream->read->eof && u->from_client.pos == u->from_client.last) || (downstream->read->eof && upstream->read->eof)) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream upgraded done"); ngx_http_upstream_finalize_request(r, u, 0); return; } clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (ngx_handle_write_event(upstream->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; }
イベントの再登録とタイマーの設定
// upstream 書き込みイベントがアクティブで、まだ準備ができていない場合 if (upstream->write->active && !upstream->write->ready) { // 書き込みタイムアウト用のタイマーを追加 ngx_add_timer(upstream->write, u->conf->send_timeout); } else if (upstream->write->timer_set) { // タイマーがすでに設定されている場合は削除 ngx_del_timer(upstream->write); } // upstream の読み取りが EOF またはエラーを検出した場合 if (upstream->read->eof || upstream->read->error) { // 接続終了イベントフラグを設定 flags = NGX_CLOSE_EVENT; } else { // フラグはクリア flags = 0; } // upstream の読み取りイベントをハンドル(処理)する if (ngx_handle_read_event(upstream->read, flags) != NGX_OK) { // エラーが発生した場合、リクエストを終了 ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } // upstream の読み取りイベントがアクティブで、まだ準備ができていない場合 if (upstream->read->active && !upstream->read->ready) { // 読み取りタイムアウト用のタイマーを追加 ngx_add_timer(upstream->read, u->conf->read_timeout); } else if (upstream->read->timer_set) { // タイマーがすでに設定されている場合は削除 ngx_del_timer(upstream->read); } // downstream 書き込みイベントをハンドル(処理)する if (ngx_handle_write_event(downstream->write, clcf->send_lowat) != NGX_OK) { // エラーが発生した場合、リクエストを終了 ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } // downstream の読み取りが EOF またはエラーを検出した場合 if (downstream->read->eof || downstream->read->error) { // 接続終了イベントフラグを設定 flags = NGX_CLOSE_EVENT; } else { // フラグはクリア flags = 0; } // downstream の読み取りイベントをハンドル(処理)する if (ngx_handle_read_event(downstream->read, flags) != NGX_OK) { // エラーが発生した場合、リクエストを終了 ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } // downstream の書き込みイベントがアクティブで、まだ準備ができていない場合 if (downstream->write->active && !downstream->write->ready) { // 書き込みタイムアウト用のタイマーを追加 ngx_add_timer(downstream->write, clcf->send_timeout); } else if (downstream->write->timer_set) { // タイマーがすでに設定されている場合は削除 ngx_del_timer(downstream->write); }