0%

Thundering Herd(惊群现象,accept, epoll)

什么是“惊群现象”(thundering herd)

引用自wiki

In computer science, the thundering herd problem occurs when a large number of processes waiting for an event are awoken when that event occurs, but only one process is able to proceed at a time. After the processes wake up, they all demand the resource and a decision must be made as to which process can continue. After the decision is made, the remaining processes are put back to sleep, only to all wake up again to request access to the resource.
This occurs repeatedly, until there are no more processes to be woken up. Because all the processes use system resources upon waking, it is more efficient if only one process is woken up at a time.
This may render the computer unusable, but it can also be used as a technique if there is no other way to decide which process should continue (for example when programming with semaphores).

概括一下:
多个进程(或线程)监听同一个事件(或端口)。当事件发生时,所有监听进程都将被从sleep态唤醒,但事件只会被分配给一个进程,其他进程又将返回sleep态。这种多余的进程状态的切换是一种资源消耗。

常见的惊群现象发生在 accept/epoll 方法处。

accept 惊群

进程版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>

#define PORT 6666
#define BACKLOG 100
#define FORKNUM 5


int main(int argc, char** argv)
{
int sock, new, i;
struct sockaddr_in addr, client;
size_t size;
pid_t pid;

sock = socket(PF_INET, SOCK_STREAM, 0);
if (-1 == sock)
{
perror("socket");
exit(EXIT_FAILURE);
}

addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);

if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
perror("socket");
exit(EXIT_FAILURE);
}

if (listen(sock, BACKLOG) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}

for (i = 0; i < FORKNUM; ++i)
{
pid = fork();
if (-1 == pid)
{
perror("fork");
exit(EXIT_FAILURE);
}

if (0 == pid)
{
new = accept(sock, (struct sockaddr*)&client, (socklen_t *)&size);
if (new < 0)
{
perror("accept");
exit(EXIT_FAILURE);
}

printf("pid: %d\n", getpid());
close(new);

break;
}
}

close(sock);

return EXIT_SUCCESS;
}
  • 代码具体流程:

    1. 父进程调用 socket 一系列操作,socket,bind,listen
    2. 父进程 fork 出多个子进程,子进程调用 accept 阻塞在 socket 处
    3. 子进程 accept 成功,返回新句柄,打印日志,直接 close 新句柄。
  • 测试:

    1
    2
    # 执行多次
    nc 127.0.0.1 6666
  • 测试结果 每次只有一个进程打印日志,证明只有一个进程被唤醒。

线程版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <stdio.h>                                               
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

#define PORT 6666
#define BACKLOG 100
#define PTHREADNUM 5


void *pthread_fun(void *ptr)
{
int new;
struct sockaddr_in client;
size_t size;
int *sock = (int *)ptr;

new = accept(*sock, (struct sockaddr*)&client, (socklen_t *)&size);
if (new < 0)
{
perror("accept");
exit(EXIT_FAILURE);
}

printf("thread id: %lu\n", pthread_self());
close(new);

return ptr;
}


int main(int argc, char** argv)
{
int sock, i;
struct sockaddr_in addr;
pthread_t threads[PTHREADNUM];

sock = socket(PF_INET, SOCK_STREAM, 0);
if (-1 == sock)
{
perror("socket");
exit(EXIT_FAILURE);
}

addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);


if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
perror("socket");
exit(EXIT_FAILURE);
}

if (listen(sock, BACKLOG) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}


for (i = 0; i < PTHREADNUM; ++i)
{
if (pthread_create(&threads[i], NULL, pthread_fun, (void*)&sock) < 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
}

for (i = 0; i < PTHREADNUM; ++i )
{
if (0 != pthread_join(threads[i], NULL))
{
perror("pthread_join");
exit(EXIT_FAILURE);
}
}

close(sock);

return EXIT_SUCCESS;
}
  • 流程大致与“进程版”相似,测试结果也一致,一个请求到来时,只有一个线程被唤醒

结论

每次请求都只有一个进程(线程)被唤醒, accept方法不存在“惊群问题”。
Linux内核已修复accept处的“惊群问题”。

epoll 惊群

进程版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>

#define PORT 6666
#define BACKLOG 100
#define PROCESSNUM 5
#define MAXEVENT 1024

int main(int argc, char** argv)
{
int sock, new, i, fd, ret;
struct sockaddr_in addr, client;
size_t size;
pid_t pid;
struct epoll_event ev = {0}, ee[MAXEVENT];

sock = socket(PF_INET, SOCK_STREAM, 0);
if (-1 == sock)
{
perror("socket");
exit(EXIT_FAILURE);
}

addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);

if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
perror("socket");
exit(EXIT_FAILURE);

}

if (listen(sock, BACKLOG) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}

// 将 sock 设置为非阻塞,这样程序不会阻塞在accept处,误被唤醒的进程会立即返回
if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
{
perror("fcntl");
exit(EXIT_FAILURE);
}

fd = epoll_create(10);
if (-1 == fd)
{
perror("epoll_create");
exit(EXIT_FAILURE);
}

ev.events |= EPOLLIN;
ev.data.fd = sock;

if (epoll_ctl(fd, EPOLL_CTL_ADD, sock, &ev) < 0)
{
perror("epoll_ctl");
exit(EXIT_FAILURE);
}

for (i = 0; i < PROCESSNUM; ++i)
{
pid = fork();
if (-1 == pid)
{
perror("fork");
exit(EXIT_FAILURE);
}

if (0 == pid)
{
while (1)
{
ret = epoll_wait(fd, (struct epoll_event*)&ee, MAXEVENT, -1);
for (i = 0; i < ret; ++i)
{
if (ee[i].data.fd == sock)
{
printf("epoll_wait: %d\n", getpid());
sleep(1);
new = accept(sock, (struct sockaddr*)&client, (socklen_t *)&size);

if (new < 0)
{
break;
}

printf("accept: %d\n", getpid());
close(new);

goto back;
}
}
}

break;
}
}

back:
close(fd);

return EXIT_SUCCESS;
}
  • 代码具体流程

    1. 父进程调用 socket 一系列操作,socket,bind,listen
    2. 父进程调用 epoll_create, epoll_ctl, 将需要监听端口加入 epoll 的句柄中
    3. 父进程 fork 出多个子进程,子进程调用 epoll_wait ,子进程阻塞
    4. 子进程从 epoll_wait 中返回,调用 accept。accept 成功,返回新句柄,打印日志,直接 close 新句柄。
  • 测试:

    1
    2
    # 执行多次
    nc 127.0.0.1 6666
  • 测试结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    epoll_wait: 27849   
    epoll_wait: 27848
    epoll_wait: 27846
    epoll_wait: 27847
    epoll_wait: 27845
    accept: 27849
    #
    epoll_wait: 27847
    epoll_wait: 27845
    epoll_wait: 27846
    epoll_wait: 27848
    accept: 27847
    #
    epoll_wait: 27846
    epoll_wait: 27845
    epoll_wait: 27848
    accept: 27846
    #
    epoll_wait: 27845
    epoll_wait: 27848
    accept: 27845
    #
    epoll_wait: 27848
    accept: 27848
  • 分析结果:
    当一个请求到来,epoll_wait 在该 socket 的所有进程都被唤醒,但只有一个进程的 accept 能拿到新句柄,其他进程将继续阻塞在 accept。
    (本例已将句柄设置为非阻塞状态,不会在accept处阻塞,即使拿不到新句柄,也会立即返回,并重新阻塞在 epoll_wait 处)

  • 伪代码标示如何将误被唤醒的进程,重新返回 epoll_wait 处阻塞

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 将 sock 设置为非阻塞,这样程序不会阻塞在accept处,误被唤醒的进程会立即返回
    if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
    {
    perror("fcntl");
    exit(EXIT_FAILURE);
    }
    // accept 直接返回,
    while (1)
    {
    epoll_wait();
    for (...)
    {
    new = accept();
    if (new < 0)
    {
    break;
    }
    }
    }

线程版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <fcntl.h>

#define PORT 6666
#define BACKLOG 100
#define PTHREADNUM 5
#define MAXEVENT 1024

struct epoll_struct {
int fd;
int sock;
};

void *pthread_fun(void *ptr)
{
int new, ret, i;
struct sockaddr_in client;
size_t size;
struct epoll_struct *pes = (struct epoll_struct*)ptr;
struct epoll_event ee[MAXEVENT];

while (1)
{
ret = epoll_wait(pes->fd, (struct epoll_event*)&ee, MAXEVENT, -1);
for (i = 0; i < ret; ++i)
{
if (ee[i].data.fd == pes->sock)
{
printf("accept, thread id: %lu\n", pthread_self());
sleep(1);
new = accept(pes->sock, (struct sockaddr*)&client, (socklen_t *)&size);

if (new < 0)
{
break;
}

printf("thread id: %lu\n", pthread_self());
close(new);

goto back;
}
}
}

back:
return ptr;
}


int main(int argc, char** argv)
{
int sock, i, fd;
struct sockaddr_in addr;
pthread_t threads[PTHREADNUM];
struct epoll_struct es;
struct epoll_event ev;

sock = socket(PF_INET, SOCK_STREAM, 0);
if (-1 == sock)
{
perror("socket");
exit(EXIT_FAILURE);
}

addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);


if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
perror("socket");
exit(EXIT_FAILURE);
}

if (listen(sock, BACKLOG) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}

if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
{
perror("fcntl");
exit(EXIT_FAILURE);
}

fd = epoll_create(10);
if (-1 == fd)
{
perror("epoll_create");
exit(EXIT_FAILURE);
}

ev.events |= EPOLLIN;
ev.data.fd = sock;
if (epoll_ctl(fd, EPOLL_CTL_ADD, sock, &ev) < 0)
{
perror("epoll_ctl");
exit(EXIT_FAILURE);
}

es.fd = fd;
es.sock = sock;

for (i = 0; i < PTHREADNUM; ++i)
{
if (pthread_create(&threads[i], NULL, pthread_fun, (void*)&es) < 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
}

for (i = 0; i < PTHREADNUM; ++i )
{
if (0 != pthread_join(threads[i], NULL))
{
perror("pthread_join");
exit(EXIT_FAILURE);
}
}

close(sock);

return EXIT_SUCCESS;
}
  • 流程大致与“进程版”一致
  • 测试结果:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    accept, thread id: 139800247498496
    accept, thread id: 139800505693952
    accept, thread id: 139800763889408
    accept, thread id: 139801022084864
    accept, thread id: 139801280280320
    thread id: 139800247498496
    #
    accept, thread id: 139801280280320
    accept, thread id: 139801022084864
    accept, thread id: 139800763889408
    accept, thread id: 139800505693952
    thread id: 139801280280320
    #
    accept, thread id: 139800505693952
    accept, thread id: 139800763889408
    accept, thread id: 139801022084864
    thread id: 139800505693952
    #
    accept, thread id: 139801022084864
    accept, thread id: 139800763889408
    thread id: 139801022084864
    #
    accept, thread id: 139800763889408
    thread id: 139800763889408

结论

epoll_wait 方法处存在惊群现象,无论是在多进程还是在多线程的情况下。

Reference & Thanks