Manjusaka

Manjusaka

ネットワークイベントにおける驚群効果について話しましょう

驚群問題については、実は昨年から注目していました。そして、CPython にselectorの驚群問題を解決するためのパッチ BPO-35517 を提出しました。さて、驚群問題について少しお話ししましょう。

驚群問題の過去#

驚群問題とは?#

驚群問題は驚群効果とも呼ばれます。簡単に言うと、複数のプロセスまたはスレッドが同じイベントを待っているとき、イベントが発生すると、すべてのスレッドとプロセスがカーネルによって起こされます。起こされた後、通常は 1 つのプロセスだけがそのイベントを取得して処理を行い、他のプロセスはイベントの取得に失敗したことに気づき、再び待機状態に入ります。これにより、ある程度システムの性能が低下します。

多くの人が疑問に思うかもしれませんが、驚群効果はなぜシステムリソースを消費し、システム性能を低下させるのでしょうか?

  1. マルチプロセス / スレッドの起こしには、コンテキストスイッチの問題が関わっています。頻繁なコンテキストスイッチによって、データがレジスタと実行キューの間で頻繁に流動します。極端な場合、時間はプロセス / スレッドのスケジューリングに多く消費され、実行には消費されません。

次に、ネットワークプログラミングでよく見られる驚群問題についてお話ししましょう。

よくある驚群問題#

Linux では、acceptselectpollepollなどのシステム提供の API を使用してネットワーク接続を処理する際に、よく見られる驚群効果が発生します。

accept の驚群#

まず、従来のacceptの使用方法を復習するためにフローチャートを見てみましょう。

image

ここでの状況は、リクエストが到着したときに、すべてのプロセス / スレッドがacceptを開始しますが、最終的には 1 つだけが成功するというものです。コードを見てみましょう。

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4

int worker_process(int listenfd, int i) {
  while (1) {
    printf("私は作業 %d です、私のPIDは %d です、接続を受け入れ始めます \n", i,
           getpid());
    struct sockaddr_in client_info;
    socklen_t client_info_len = sizeof(client_info);
    int connection =
        accept(listenfd, (struct sockaddr *)&client_info, &client_info_len);
    if (connection != -1) {
      printf("ワーカー %d が成功しました\n", i);
      printf("IP :%s\t", inet_ntoa(client_info.sin_addr));
      printf("ポート: %d \n", client_info.sin_port);
    } else {
      printf("ワーカー %d が失敗しました", i);
    }
    close(connection);
  }

  return 0;
}

int main() {
  int i = 0;
  struct sockaddr_in address;
  bzero(&address, sizeof(address));
  address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &address.sin_addr);
  address.sin_port = htons(SERVER_PORT);
  int listenfd = socket(PF_INET, SOCK_STREAM, 0);
  int ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
  ret = listen(listenfd, 5);
  for (i = 0; i < WORKER_COUNT; i++) {
    printf("ワーカー %d を作成します\n", i + 1);
    pid_t pid = fork();
    /*子プロセス */
    if (pid == 0) {
      worker_process(listenfd, i);
    }
    if (pid < 0) {
      printf("forkエラー");
    }
  }

  /*子プロセスを待つ*/
  int status;
  wait(&status);
  return 0;
}

実行結果を見てみましょう。

image

え?どうしたの?なぜここで私たちが望んでいた現象(1 つのプロセスが成功し、3 つのプロセスが失敗する)が現れないのでしょうか?理由は、Linux 2.6 以降、Accept の驚群問題がカーネルレベルで処理されたからです。

では、次に進みましょう。

select/poll/epoll の驚群#

epollを例にとって、従来の作業モードを見てみましょう。

image

さて、コードを見てみましょう。

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10087
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntlエラー");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl設定エラー");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("ワーカー %d のPIDは %d で、epoll_waitから値を取得しました\n", k, getpid());
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epollエラー\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("ワーカー %d が失敗しました\n", k);
          break;
        }
        printf("ワーカー %d が成功しました\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  event.events = EPOLLIN;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("ワーカー %d を作成します\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

次に、telnetを使って TCP リクエストを送信し、結果を見てみましょう。

image

ええ、リクエストが到着したとき、4 つのプロセスがすべて起こされたのが見えます。今、より直感的にこのプロセスを見えるようにするために、straceを使ってプロファイルを取ってみましょう。

image

やはり、4 つのプロセスがすべて起こされましたが、Worker 3 だけがacceptに成功し、他のプロセスはacceptの際にEAGAINエラーを取得しました。

そして、Linux ドキュメントでは、EAGAINの説明は次のようになっています。

ソケットはノンブロッキングとしてマークされており、受け入れるための接続が存在しません。POSIX.1-2001 および POSIX.1-2008 では、この場合にどちらのエラーが返されるかを許可しており、これらの定数が同じ値を持つことを要求していないため、ポータブルなアプリケーションは両方の可能性を確認する必要があります。

これで、EPOLL の驚群問題について直感的な理解が得られましたね。では、驚群問題をどのように解決するか見てみましょう。

驚群問題の現在#

カーネルからの驚群問題の解決#

まず、前述のように、Accept の驚群問題は Linux Kernel 2.6 以降、カーネルレベルで解決されました。しかし、EPOLL はどうでしょうか?2016 年 1 月、Linux の父 Linus はカーネルにパッチを提出しました。

参照:epoll: add EPOLLEXCLUSIVE flag

その中の重要なコードは次の通りです。

		if (epi->event.events & EPOLLEXCLUSIVE)
			add_wait_queue_exclusive(whead, &pwq->wait);
		else
			add_wait_queue(whead, &pwq->wait);

簡単に言うと、EPOLLEXCLUSIVEフラグを補助として追加します。ユーザーがEPOLLEXCLUSIVEを有効にした場合、カーネルの待機キューに追加する際にadd_wait_queue_exclusiveを使用し、そうでない場合はadd_wait_queueを使用します。

これらの関数の使い方については、この記事を参照してくださいHanding wait queues

その中には次のような説明があります。

add_wait_queue()関数は、待機キューリストの最初の位置に非排他的プロセスを挿入します。add_wait_queue_exclusive()関数は、待機キューリストの最後の位置に排他的プロセスを挿入します。

さて、私たちのコードを修正してみましょう(カーネルバージョンは Linux Kernel 4.5 以降)。

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntlエラー");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl設定エラー");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("ワーカー %d のPIDは %d で、epoll_waitから値を取得しました\n", k, getpid());
    sleep(0.2);
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epollエラー\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("ワーカー %d が失敗しました\n", k);
          break;
        }
        printf("ワーカー %d が成功しました\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  // EPOLLEXCLUSIVEサポートを追加
  event.events = EPOLLIN | EPOLLEXCLUSIVE;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("ワーカー %d を作成します\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

では、結果を見てみましょう。

image

ええ?なぜまだ 2 つのプロセスが起こされているのでしょうか?理由は、EPOLLEXCLUSIVEは起こされたプロセスの数が有効にしたプロセスの数以下であることを保証するだけであり、すべてのプロセスを直接起こすわけでもなく、1 つのプロセスだけを起こすことを保証するわけでもありません。

公式の説明を見てみましょう。

対象ファイルディスクリプタにアタッチされている epoll ファイルディスクリプタの排他的起動モードを設定します。起動イベントが発生し、複数の epoll ファイルディスクリプタが同じターゲットファイルに EPOLLEXCLUSIVE を使用してアタッチされている場合、1 つ以上の epoll ファイルディスクリプタが epoll_wait (2) でイベントを受け取ります。このシナリオでは、EPOLLEXCLUSIVE が設定されていない場合のデフォルトは、すべての epoll ファイルディスクリプタがイベントを受け取ることです。したがって、EPOLLEXCLUSIVE は特定のシナリオで驚群問題を回避するのに役立ちます。

つまり、現時点では、システムは驚群問題の解決を厳密に保証することはできません。多くの場合、アプリケーション層自身の設計に依存する必要があります。

アプリケーション層での解決#

現在、アプリケーションでの驚群問題の解決には 2 つの戦略があります。

  1. これは受け入れ可能なコストであり、しばらくは無視します。これは私たちの大多数の戦略です。

  2. ロックやその他の手段を使用してこの問題を解決します。最も典型的な例は Nginx です。

Nginx がこの問題をどのように解決しているか見てみましょう。

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }

    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_event_process_posted(cycle, &ngx_posted_events);
}

ここで、Nginx の主な考え方はロックの形式でこの問題を処理することです。各プロセスは FD イベントをリッスンする前に、まずngx_trylock_accept_mutexを使用してグローバルロックを取得する必要があります。ロックを取得できた場合、ngx_process_eventsを使用してイベントの処理を試みます。ロックを取得できなかった場合は、今回の操作を放棄します。したがって、ある FD に対して、Nginx は同時に 1 つのワーカーだけが FD 上のイベントを処理することを保証し、驚群を回避します。

まとめ#

この記事は昨年から現在まで長い間引き延ばされてきました。驚群問題は私たちの日常業務で直面する問題であり、私自身、昨年から現在までの学びを記録する詳細なメモを書く必要があると感じています。これでほぼ終わりです。皆さんが楽しんで読んでいただけることを願っています。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。