驚群問題については、実は昨年から注目していました。そして、CPython にselector
の驚群問題を解決するためのパッチ BPO-35517 を提出しました。さて、驚群問題について少しお話ししましょう。
驚群問題の過去#
驚群問題とは?#
驚群問題は驚群効果とも呼ばれます。簡単に言うと、複数のプロセスまたはスレッドが同じイベントを待っているとき、イベントが発生すると、すべてのスレッドとプロセスがカーネルによって起こされます。起こされた後、通常は 1 つのプロセスだけがそのイベントを取得して処理を行い、他のプロセスはイベントの取得に失敗したことに気づき、再び待機状態に入ります。これにより、ある程度システムの性能が低下します。
多くの人が疑問に思うかもしれませんが、驚群効果はなぜシステムリソースを消費し、システム性能を低下させるのでしょうか?
- マルチプロセス / スレッドの起こしには、コンテキストスイッチの問題が関わっています。頻繁なコンテキストスイッチによって、データがレジスタと実行キューの間で頻繁に流動します。極端な場合、時間はプロセス / スレッドのスケジューリングに多く消費され、実行には消費されません。
次に、ネットワークプログラミングでよく見られる驚群問題についてお話ししましょう。
よくある驚群問題#
Linux では、accept
やselect
、poll
、epoll
などのシステム提供の API を使用してネットワーク接続を処理する際に、よく見られる驚群効果が発生します。
accept の驚群#
まず、従来のaccept
の使用方法を復習するためにフローチャートを見てみましょう。
ここでの状況は、リクエストが到着したときに、すべてのプロセス / スレッドがaccept
を開始しますが、最終的には 1 つだけが成功するというものです。コードを見てみましょう。
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4
int worker_process(int listenfd, int i) {
while (1) {
printf("私は作業 %d です、私のPIDは %d です、接続を受け入れ始めます \n", i,
getpid());
struct sockaddr_in client_info;
socklen_t client_info_len = sizeof(client_info);
int connection =
accept(listenfd, (struct sockaddr *)&client_info, &client_info_len);
if (connection != -1) {
printf("ワーカー %d が成功しました\n", i);
printf("IP :%s\t", inet_ntoa(client_info.sin_addr));
printf("ポート: %d \n", client_info.sin_port);
} else {
printf("ワーカー %d が失敗しました", i);
}
close(connection);
}
return 0;
}
int main() {
int i = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, SERVER_ADDRESS, &address.sin_addr);
address.sin_port = htons(SERVER_PORT);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
int ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
ret = listen(listenfd, 5);
for (i = 0; i < WORKER_COUNT; i++) {
printf("ワーカー %d を作成します\n", i + 1);
pid_t pid = fork();
/*子プロセス */
if (pid == 0) {
worker_process(listenfd, i);
}
if (pid < 0) {
printf("forkエラー");
}
}
/*子プロセスを待つ*/
int status;
wait(&status);
return 0;
}
実行結果を見てみましょう。
え?どうしたの?なぜここで私たちが望んでいた現象(1 つのプロセスが成功し、3 つのプロセスが失敗する)が現れないのでしょうか?理由は、Linux 2.6 以降、Accept の驚群問題がカーネルレベルで処理されたからです。
では、次に進みましょう。
select/poll/epoll の驚群#
epoll
を例にとって、従来の作業モードを見てみましょう。
さて、コードを見てみましょう。
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10087
#define WORKER_COUNT 4
#define MAXEVENTS 64
static int create_and_bind_socket() {
int fd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
server_address.sin_port = htons(SERVER_PORT);
bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
return fd;
}
static int make_non_blocking_socket(int sfd) {
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1) {
perror("fcntlエラー");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1) {
perror("fcntl設定エラー");
return -1;
}
return 0;
}
int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
int k) {
while (1) {
int n;
n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
printf("ワーカー %d のPIDは %d で、epoll_waitから値を取得しました\n", k, getpid());
for (int i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
printf("%d\n", i);
fprintf(stderr, "epollエラー\n");
close(events[i].data.fd);
continue;
} else if (listenfd == events[i].data.fd) {
struct sockaddr in_addr;
socklen_t in_len;
int in_fd;
in_len = sizeof(in_addr);
in_fd = accept(listenfd, &in_addr, &in_len);
if (in_fd == -1) {
printf("ワーカー %d が失敗しました\n", k);
break;
}
printf("ワーカー %d が成功しました\n", k);
close(in_fd);
}
}
}
return 0;
}
int main() {
int listen_fd, s;
int epoll_fd;
struct epoll_event event;
struct epoll_event *events;
listen_fd = create_and_bind_socket();
if (listen_fd == -1) {
abort();
}
s = make_non_blocking_socket(listen_fd);
if (s == -1) {
abort();
}
s = listen(listen_fd, SOMAXCONN);
if (s == -1) {
abort();
}
epoll_fd = epoll_create(MAXEVENTS);
if (epoll_fd == -1) {
abort();
}
event.data.fd = listen_fd;
event.events = EPOLLIN;
s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
if (s == -1) {
abort();
}
events = calloc(MAXEVENTS, sizeof(event));
for (int i = 0; i < WORKER_COUNT; i++) {
printf("ワーカー %d を作成します\n", i);
int pid = fork();
if (pid == 0) {
worker_process(listen_fd, epoll_fd, events, i);
}
}
int status;
wait(&status);
free(events);
close(listen_fd);
return EXIT_SUCCESS;
}
次に、telnet
を使って TCP リクエストを送信し、結果を見てみましょう。
ええ、リクエストが到着したとき、4 つのプロセスがすべて起こされたのが見えます。今、より直感的にこのプロセスを見えるようにするために、strace
を使ってプロファイルを取ってみましょう。
やはり、4 つのプロセスがすべて起こされましたが、Worker 3 だけがaccept
に成功し、他のプロセスはaccept
の際にEAGAIN
エラーを取得しました。
そして、Linux ドキュメントでは、EAGAIN
の説明は次のようになっています。
ソケットはノンブロッキングとしてマークされており、受け入れるための接続が存在しません。POSIX.1-2001 および POSIX.1-2008 では、この場合にどちらのエラーが返されるかを許可しており、これらの定数が同じ値を持つことを要求していないため、ポータブルなアプリケーションは両方の可能性を確認する必要があります。
これで、EPOLL の驚群問題について直感的な理解が得られましたね。では、驚群問題をどのように解決するか見てみましょう。
驚群問題の現在#
カーネルからの驚群問題の解決#
まず、前述のように、Accept の驚群問題は Linux Kernel 2.6 以降、カーネルレベルで解決されました。しかし、EPOLL はどうでしょうか?2016 年 1 月、Linux の父 Linus はカーネルにパッチを提出しました。
参照:epoll: add EPOLLEXCLUSIVE flag
その中の重要なコードは次の通りです。
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
簡単に言うと、EPOLLEXCLUSIVE
フラグを補助として追加します。ユーザーがEPOLLEXCLUSIVE
を有効にした場合、カーネルの待機キューに追加する際にadd_wait_queue_exclusive
を使用し、そうでない場合はadd_wait_queue
を使用します。
これらの関数の使い方については、この記事を参照してくださいHanding wait queues。
その中には次のような説明があります。
add_wait_queue()
関数は、待機キューリストの最初の位置に非排他的プロセスを挿入します。add_wait_queue_exclusive()
関数は、待機キューリストの最後の位置に排他的プロセスを挿入します。
さて、私たちのコードを修正してみましょう(カーネルバージョンは Linux Kernel 4.5 以降)。
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4
#define MAXEVENTS 64
static int create_and_bind_socket() {
int fd = socket(PF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
server_address.sin_port = htons(SERVER_PORT);
bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
return fd;
}
static int make_non_blocking_socket(int sfd) {
int flags, s;
flags = fcntl(sfd, F_GETFL, 0);
if (flags == -1) {
perror("fcntlエラー");
return -1;
}
flags |= O_NONBLOCK;
s = fcntl(sfd, F_SETFL, flags);
if (s == -1) {
perror("fcntl設定エラー");
return -1;
}
return 0;
}
int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
int k) {
while (1) {
int n;
n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
printf("ワーカー %d のPIDは %d で、epoll_waitから値を取得しました\n", k, getpid());
sleep(0.2);
for (int i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
printf("%d\n", i);
fprintf(stderr, "epollエラー\n");
close(events[i].data.fd);
continue;
} else if (listenfd == events[i].data.fd) {
struct sockaddr in_addr;
socklen_t in_len;
int in_fd;
in_len = sizeof(in_addr);
in_fd = accept(listenfd, &in_addr, &in_len);
if (in_fd == -1) {
printf("ワーカー %d が失敗しました\n", k);
break;
}
printf("ワーカー %d が成功しました\n", k);
close(in_fd);
}
}
}
return 0;
}
int main() {
int listen_fd, s;
int epoll_fd;
struct epoll_event event;
struct epoll_event *events;
listen_fd = create_and_bind_socket();
if (listen_fd == -1) {
abort();
}
s = make_non_blocking_socket(listen_fd);
if (s == -1) {
abort();
}
s = listen(listen_fd, SOMAXCONN);
if (s == -1) {
abort();
}
epoll_fd = epoll_create(MAXEVENTS);
if (epoll_fd == -1) {
abort();
}
event.data.fd = listen_fd;
// EPOLLEXCLUSIVEサポートを追加
event.events = EPOLLIN | EPOLLEXCLUSIVE;
s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
if (s == -1) {
abort();
}
events = calloc(MAXEVENTS, sizeof(event));
for (int i = 0; i < WORKER_COUNT; i++) {
printf("ワーカー %d を作成します\n", i);
int pid = fork();
if (pid == 0) {
worker_process(listen_fd, epoll_fd, events, i);
}
}
int status;
wait(&status);
free(events);
close(listen_fd);
return EXIT_SUCCESS;
}
では、結果を見てみましょう。
ええ?なぜまだ 2 つのプロセスが起こされているのでしょうか?理由は、EPOLLEXCLUSIVE
は起こされたプロセスの数が有効にしたプロセスの数以下であることを保証するだけであり、すべてのプロセスを直接起こすわけでもなく、1 つのプロセスだけを起こすことを保証するわけでもありません。
公式の説明を見てみましょう。
対象ファイルディスクリプタにアタッチされている epoll ファイルディスクリプタの排他的起動モードを設定します。起動イベントが発生し、複数の epoll ファイルディスクリプタが同じターゲットファイルに EPOLLEXCLUSIVE を使用してアタッチされている場合、1 つ以上の epoll ファイルディスクリプタが epoll_wait (2) でイベントを受け取ります。このシナリオでは、EPOLLEXCLUSIVE が設定されていない場合のデフォルトは、すべての epoll ファイルディスクリプタがイベントを受け取ることです。したがって、EPOLLEXCLUSIVE は特定のシナリオで驚群問題を回避するのに役立ちます。
つまり、現時点では、システムは驚群問題の解決を厳密に保証することはできません。多くの場合、アプリケーション層自身の設計に依存する必要があります。
アプリケーション層での解決#
現在、アプリケーションでの驚群問題の解決には 2 つの戦略があります。
-
これは受け入れ可能なコストであり、しばらくは無視します。これは私たちの大多数の戦略です。
-
ロックやその他の手段を使用してこの問題を解決します。最も典型的な例は Nginx です。
Nginx がこの問題をどのように解決しているか見てみましょう。
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
}
if (ngx_use_accept_mutex) {
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
if (delta) {
ngx_event_expire_timers();
}
ngx_event_process_posted(cycle, &ngx_posted_events);
}
ここで、Nginx の主な考え方はロックの形式でこの問題を処理することです。各プロセスは FD イベントをリッスンする前に、まずngx_trylock_accept_mutex
を使用してグローバルロックを取得する必要があります。ロックを取得できた場合、ngx_process_events
を使用してイベントの処理を試みます。ロックを取得できなかった場合は、今回の操作を放棄します。したがって、ある FD に対して、Nginx は同時に 1 つのワーカーだけが FD 上のイベントを処理することを保証し、驚群を回避します。
まとめ#
この記事は昨年から現在まで長い間引き延ばされてきました。驚群問題は私たちの日常業務で直面する問題であり、私自身、昨年から現在までの学びを記録する詳細なメモを書く必要があると感じています。これでほぼ終わりです。皆さんが楽しんで読んでいただけることを願っています。