Manjusaka

Manjusaka

Discussing the herd effect in online events

I actually started paying attention to the thundering herd problem last year. Then I submitted a patch to CPython regarding the solution to the selector thundering herd problem BPO-35517. Now let's talk about the thundering herd problem.

The Past of the Thundering Herd Problem#

What is the Thundering Herd Problem?#

The thundering herd problem, also known as the thundering herd effect, occurs when multiple processes or threads are waiting for the same event. When the event occurs, all threads and processes are awakened by the kernel. After waking up, usually only one process successfully acquires the event and processes it, while the other processes find that they failed to acquire the event and continue to wait, which reduces system performance to some extent.

Many people might wonder why the thundering herd effect consumes system resources and reduces system performance.

  1. The awakening of multiple processes/threads involves a context switching issue. Frequent context switching leads to data frequently circulating between registers and the run queue. In extreme cases, more time is consumed on scheduling processes/threads rather than executing them.

Next, let's discuss the common thundering herd problems we encounter in network programming.

Common Thundering Herd Problems#

In Linux, we commonly see the thundering herd effect when we use accept and system-provided APIs like select, poll, or epoll to handle our network connections.

Thundering Herd with Accept#

First, let's review our traditional usage of accept with a flowchart.

image

Here, there is a situation where when a request arrives, all processes/threads start to accept, but ultimately only one succeeds. Let's write some code to see.

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4

int worker_process(int listenfd, int i) {
  while (1) {
    printf("I am work %d, my pid is %d, begin to accept connections \n", i,
           getpid());
    struct sockaddr_in client_info;
    socklen_t client_info_len = sizeof(client_info);
    int connection =
        accept(listenfd, (struct sockaddr *)&client_info, &client_info_len);
    if (connection != -1) {
      printf("worker %d accept success\n", i);
      printf("ip :%s\t", inet_ntoa(client_info.sin_addr));
      printf("port: %d \n", client_info.sin_port);
    } else {
      printf("worker %d accept failed", i);
    }
    close(connection);
  }

  return 0;
}

int main() {
  int i = 0;
  struct sockaddr_in address;
  bzero(&address, sizeof(address));
  address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &address.sin_addr);
  address.sin_port = htons(SERVER_PORT);
  int listenfd = socket(PF_INET, SOCK_STREAM, 0);
  int ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
  ret = listen(listenfd, 5);
  for (i = 0; i < WORKER_COUNT; i++) {
    printf("Create worker %d\n", i + 1);
    pid_t pid = fork();
    /*child process */
    if (pid == 0) {
      worker_process(listenfd, i);
    }
    if (pid < 0) {
      printf("fork error");
    }
  }

  /*wait child process*/
  int status;
  wait(&status);
  return 0;
}

Let's look at the results of running this code.

image

Huh? What's going on? Why didn't we see the phenomenon we expected (one process successfully accepted, while three processes failed)? The reason is that after Linux 2.6, the thundering herd problem with accept was handled at the kernel level.

Alright, let's continue.

Thundering Herd with Select/Poll/Epoll#

Taking epoll as an example, let's look at the traditional working mode.

image

Now, let's look at some code.

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10087
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl error");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl set error");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("Worker %d pid is %d get value from epoll_wait\n", k, getpid());
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epoll err\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("worker %d accept failed\n", k);
          break;
        }
        printf("worker %d accept success\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  event.events = EPOLLIN;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("create worker %d\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

Then, we send a TCP request using telnet to see the effect, and we get the following result.

image

We can see that when a request arrives, all four processes are awakened. Now, to visualize this process more clearly, let's use strace to profile it.

image

We can still see that all four processes are awakened, but only Worker 3 successfully accepts, while the other processes received an EAGAIN error during accept.

The Linux documentation describes EAGAIN as:

The socket is marked nonblocking and no connections are present to be accepted. POSIX.1-2001 and POSIX.1-2008 allow either error to be returned for this case, and do not require these constants to have the same value, so a portable application should check for both possibilities.

Now, do we have an intuitive understanding of the thundering herd problem with EPOLL? So how do we solve the thundering herd problem?

The Present of the Thundering Herd Problem#

Solving the Thundering Herd Problem from the Kernel#

As mentioned earlier, the thundering herd problem with accept has been solved at the kernel level since Linux Kernel 2.6. But what about EPOLL? In January 2016, Linus, the father of Linux, submitted a patch to the kernel.

See epoll: add EPOLLEXCLUSIVE flag

The key code is:

		if (epi->event.events & EPOLLEXCLUSIVE)
			add_wait_queue_exclusive(whead, &pwq->wait);
		else
			add_wait_queue(whead, &pwq->wait);

In short, an EPOLLEXCLUSIVE flag was added as an auxiliary. If the user enables EPOLLEXCLUSIVE, then when adding to the kernel wait queue, add_wait_queue_exclusive is used; otherwise, add_wait_queue is used.

For the usage of these two functions, you can refer to this article Handling wait queues.

There is a description that states:

The add_wait_queue( ) function inserts a nonexclusive process in the first position of a wait queue list. The add_wait_queue_exclusive( ) function inserts an exclusive process in the last position of a wait queue list.

Now, let's modify our code (the kernel version must be Linux Kernel 4.5 or later).

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl error");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl set error");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("Worker %d pid is %d get value from epoll_wait\n", k, getpid());
    sleep(0.2);
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epoll err\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("worker %d accept failed\n", k);
          break;
        }
        printf("worker %d accept success\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  // add EPOLLEXCLUSIVE support
  event.events = EPOLLIN | EPOLLEXCLUSIVE;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("create worker %d\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

Now, let's look at the effect.

image

Huh? Why are there still two processes awakened? The reason is that EPOLLEXCLUSIVE only guarantees that the number of awakened processes is less than or equal to the number of processes we have started, and does not directly wake up all processes, nor does it guarantee that only one process is awakened.

Let's look at the official description:

Sets an exclusive wakeup mode for the epoll file descriptor that is being attached to the target file descriptor, fd. When a wakeup event occurs and multiple epoll file descriptors are attached to the same target file using EPOLLEXCLUSIVE, one or more of the epoll file descriptors will receive an event with epoll_wait(2). The default in this scenario (when EPOLLEXCLUSIVE is not set) is for all epoll file descriptors to receive an event. EPOLLEXCLUSIVE is thus useful for avoiding thundering herd problems in certain scenarios.

In other words, as it stands, the system cannot strictly guarantee the resolution of the thundering herd problem. Many times we still need to rely on the design of the application layer itself to solve it.

Application Layer Solutions#

Currently, there are two strategies for applications to solve the thundering herd problem:

  1. If the cost is acceptable, then we temporarily ignore it. This is the strategy we use most of the time.

  2. Solve the problem through locking or other means, the most typical example being Nginx.

Let's see how Nginx solves this problem.

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }

    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_event_process_posted(cycle, &ngx_posted_events);
}

Here we can see that the main idea of Nginx is to handle this problem through locking. Before each process listens for FD events, we first need to acquire a global lock through ngx_trylock_accept_mutex. If we successfully acquire the lock, we then attempt to process events through ngx_process_events. If we fail to acquire the lock, we abandon the current operation. Thus, in a sense, only one worker processes events on a particular FD at a time, thereby avoiding the thundering herd problem.

Conclusion#

This article has been delayed for a long time since last year. The thundering herd problem has always been an issue we encounter in our daily work. I feel it is necessary to write a detailed note to record some of my learning experiences from last year to now. That's about it, and I hope you enjoy reading it.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.