Manjusaka

Manjusaka

聊聊網絡事件中的驚群效應

關於驚群問題,其實我是在去年開始去關注的。然後向 CPython 提了一個關於解決 selector 的驚群問題的補丁 BPO-35517。現在大概來聊聊關於驚群問題那點事吧

驚群問題的過去#

驚群問題是什麼?#

驚群問題又名驚群效應。簡單來說就是多個進程或者線程在等待同一個事件,當事件發生時,所有線程和進程都會被內核喚醒。喚醒後通常只有一個進程獲得了該事件並進行處理,其他進程發現獲取事件失敗後又繼續進入了等待狀態,在一定程度上降低了系統性能。

可能很多人想問,驚群效應為什麼會佔用系統資源?降低系統性能?

  1. 多進程 / 線程的喚醒,涉及到的一個問題是上下文切換問題。頻繁的上下文切換帶來的一個問題是數據將頻繁的在寄存器與運行隊列中流轉。極端情況下,時間更多的消耗在進程 / 線程的調度上,而不是執行

接下來我們來聊聊我們網路編程中常見的驚群問題。

常見的驚群問題#

在 Linux 下,我們常見的驚群效應發生於我們使用 accept 以及我們 selectpollepoll 等系統提供的 API 來處理我們的網路連接。

accept 驚群#

首先我們用一個流程圖來複習下我們傳統的 accept 使用方式

image

那麼在這裡存在一種情況,即當一個請求到達時,所有進程 / 線程都開始 accept ,但是最終只有一個獲取成功,我們來寫段代碼看看

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4

int worker_process(int listenfd, int i) {
  while (1) {
    printf("I am work %d, my pid is %d, begin to accept connections \n", i,
           getpid());
    struct sockaddr_in client_info;
    socklen_t client_info_len = sizeof(client_info);
    int connection =
        accept(listenfd, (struct sockaddr *)&client_info, &client_info_len);
    if (connection != -1) {
      printf("worker %d accept success\n", i);
      printf("ip :%s\t", inet_ntoa(client_info.sin_addr));
      printf("port: %d \n", client_info.sin_port);
    } else {
      printf("worker %d accept failed", i);
    }
    close(connection);
  }

  return 0;
}

int main() {
  int i = 0;
  struct sockaddr_in address;
  bzero(&address, sizeof(address));
  address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &address.sin_addr);
  address.sin_port = htons(SERVER_PORT);
  int listenfd = socket(PF_INET, SOCK_STREAM, 0);
  int ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
  ret = listen(listenfd, 5);
  for (i = 0; i < WORKER_COUNT; i++) {
    printf("Create worker %d\n", i + 1);
    pid_t pid = fork();
    /*child  process */
    if (pid == 0) {
      worker_process(listenfd, i);
    }
    if (pid < 0) {
      printf("fork error");
    }
  }

  /*wait child process*/
  int status;
  wait(&status);
  return 0;
}

我們來看看運行的結果

image

誒?怎麼回事?為什麼這裡沒有出現我們想要的現象(一個進程 accept 成功,三個進程 accept 失敗)?原因在於在 Linux 2.6 之後,Accept 的驚群問題從內核上被處理了

好,我們接著往下看

select/poll/epoll 驚群#

我們以 epoll 為例,我們來看看傳統的工作模式

image

好了,我們來看段代碼

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10087
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl error");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl set error");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("Worker %d pid is %d get value from epoll_wait\n", k, getpid());
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epoll err\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("worker %d accept failed\n", k);
          break;
        }
        printf("worker %d accept success\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  event.events = EPOLLIN;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("create worker %d\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

然後,我們用 telnet 發送一下 TCP 請求,看看效果,,我們能得到這樣的結果

image

嗯,我們能看到當一個請求到達時,我們四個進程都被喚醒了。現在為了更直觀的看到這一個過程,我們用 strace 來 profile 一下

image

我們還是能看到,四個進程都被喚醒,但是只有 Worker 3 成功 accept ,而其餘的進程在 accept 的時候,都獲取到了 EAGAIN 錯誤,

Linux 文檔 對於 EAGAIN 的描述是

The socket is marked nonblocking and no connections are present to be accepted. POSIX.1-2001 and POSIX.1-2008 allow
either error to be returned for this case, and do not require these constants to have the same value, so a portable
application should check for both possibilities.

現在我們對於 EPOLL 的驚群問題是不是有了直觀的了解?那麼怎麼樣去解決驚群問題呢?

驚群問題的現在#

從內核解決驚群問題#

首先如前面所說,Accept 的驚群問題在 Linux Kernel 2.6 之後就被從內核的層面上解決了。但是 EPOLL 怎麼辦?在 2016 年一月,Linux 之父 Linus 向內核提交了一個補丁

參見 epoll: add EPOLLEXCLUSIVE flag

其中的關鍵代碼是

		if (epi->event.events & EPOLLEXCLUSIVE)
			add_wait_queue_exclusive(whead, &pwq->wait);
		else
			add_wait_queue(whead, &pwq->wait);

簡而言之,通過增加一個 EPOLLEXCLUSIVE 標誌位作為輔助。如果用戶開啟了 EPOLLEXCLUSIVE ,那麼在加入內核等待隊列時,使用 add_wait_queue_exclusive 否則則使用 add_wait_queue

至於這兩個函數的用法,可以參考這篇文章Handing wait queues

其中有這樣一段描述

The add_wait_queue( ) function inserts a nonexclusive process in the first position of a wait queue list. The add_wait_queue_exclusive( ) function inserts an exclusive process in the last position of a wait queue list.

好了,我們現在來改一下我們的代碼(內核版本要在 Linux Kernel 4.5)之後

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl error");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl set error");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("Worker %d pid is %d get value from epoll_wait\n", k, getpid());
    sleep(0.2);
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epoll err\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("worker %d accept failed\n", k);
          break;
        }
        printf("worker %d accept success\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  // add EPOLLEXCLUSIVE support
  event.events = EPOLLIN | EPOLLEXCLUSIVE;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("create worker %d\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

然後我們來看看效果

image

誒?為什麼還是有兩個進程被喚醒了?原因在於 EPOLLEXCLUSIVE 只保證喚醒的進程數小於等於我們開啟的進程數,而不是直接喚醒所有進程,也不是只保證喚醒一個進程

我們來看看官方的描述

Sets an exclusive wakeup mode for the epoll file descriptor
that is being attached to the target file descriptor, fd.
When a wakeup event occurs and multiple epoll file descriptors
are attached to the same target file using EPOLLEXCLUSIVE, one
or more of the epoll file descriptors will receive an event
with epoll_wait(2). The default in this scenario (when
EPOLLEXCLUSIVE is not set) is for all epoll file descriptors
to receive an event. EPOLLEXCLUSIVE is thus useful for avoid‐
ing thundering herd problems in certain scenarios.

嗯,換句話說,就目前而言,系統並不能嚴格保證驚群問題的解決。很多時候我們還是要依靠應用層自身的設計來解決

應用層解決#

目前而言,應用解決驚群有兩種策略

  1. 這是可以接受的代價,那麼我們暫時不管。這是我們大多數時候的策略

  2. 通過加鎖或其餘的手段來解決這個問題,最典型的例子是 Nginx

我們來看看 Nginx 怎麼解決這樣的問題的

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }

    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_event_process_posted(cycle, &ngx_posted_events);
}

我們這裡能看到,Nginx 主體的思想是通過鎖的形式來處理這樣問題。我們每個進程在監聽 FD 事件之前,我們先要通過 ngx_trylock_accept_mutex 去獲取一個全局的鎖。如果拿鎖成功,那麼則開始通過
ngx_process_events 嘗試去處理事件。如果拿鎖失敗,則放棄本次操作。所以從某種意義上來講,對於某一個 FD ,Nginx 同時只有一個 Worker 來處理 FD 上的事件。從而避免驚群。

總結#

這篇文章從去年到現在拖了很久了,驚群問題一直是我們日常工作中遇到的問題,我自己覺得,還是有必要寫篇詳細的筆記,記錄下去年到現在的一些學習記錄。差不多就這樣吧,祝各位看的好。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。