<返回更多

浅谈linux下基于UDP服务的负载均衡方法

2020-12-24    
加入收藏
浅谈linux下基于UDP服务的负载均衡方法

 

UDP在并入TCP/IP族的时候,就是作为IP协议的第四层抽象存在的,IP的协议单元叫做IP数据报,UDP的名字正是来源于此,用户数据报协议,其中“用户”便有了端到端的意思。起初UDP只是作为TCP的补充存在,应用于一些无需维护连接,无需维护状态的场合,然而随着TCP越来越复杂,随着复杂性越来越往上层转移,很多应用程序开始在UDP之上自行处理连接状态,数据序列等,这样便可以自行控制好复杂度,比较典型的是DTLS协议(SSL的UDP版本)以及OpenVPN。
OpenVPN原理上讲不便于使用TCP作为承载协议,使用UDP将会是比较高效的选择,然而这种高效却被其单进程单处理给抵消掉了,特别是当你在8核CPU上运行OpenVPN服务,却看到始终有7个CPU核心处于空闲状态的时候。愤慨之余要着手解决问题。我的random redirect NAT+多OpenVPN进程绑定多端口方式已经可以作为一个解决方案,然而它还是利用了很多外围的东西,最典型的就是它用ip_conntrack为UDP灌入连接的概念,因此它依赖于ip_conntrack!而我希望的是,可以像TCP那样实现一个多处理,比如预分配worker进程的多处理,比如连接到来后再分配进程的多处理,使用UDP,可以做到吗?

1.SO_REUSEADDR绑定相同的地址端口

REUSEADDR没有太多复杂的东西,总之就是要确保四元组的唯一性即可,不管对于TCP还是对于UDP都一样。对于TCP而言,bind的时候所有潜在的可能冲突(此时还不知道对端是谁)的绑定都会被禁止,对于connect的时候,此时本端对端均已经明确,只有明确不会破坏4元组唯一性的connect才会发送SYN包,对于Listen方,无需考虑4元组唯一性,因为connect方已经可以保证4元组唯一了。
UDP socket的4元组唯一性保证手段比较有趣,由于UDP无连接状态,没有办法通过“将新的连接匹配保存下来的已经有的4元组连接”来判断是否4元组冲突,那么办呢?办法就是按照固定的算法查找目标UDP socket,这样每次查到的都是UDP socket链表(或者hash表,具体存储形式无关紧要)固定位置的socket,这样自然而然把其它位置冲突的UDP socket给废掉了!你可以试着在一个进程或者多个进程内创建多个绑定相同端口,相同IP地址的UDP socket,然后在另外的机器向其发送数据,你会发现,只有最后一个创建的socket会接收到数据,其它的都是默默地等待,孤独的等待,像小说《十年》里面的那样...此时如果再创建一个绑定同样地址端口的UDP socket,这最后一个便开始接收数据了。以上说的“最后一个”并不绝对,取决于算法,关键是“固定的位置”,这个UDP socket查找算法很简单,只是基于目标IP地址和目标端口做查找,找到第一个则返回它,因此具体找到哪个取决于UDP socket在创建的时候是按何种顺序插入的。
对于UDP socket的客户端,也一样,如果你创建了两个绑定相同IP地址和相同端口的UDP socket,然后用第一个socket往一个特定的目标sendmsg,可能会是第二个socket接收到数据,到底是哪一个取决于UDP socket在第四层的查找算法。这个原理你可以看一下第四层的UDP socket查找逻辑,其核心在于compute_score函数:

if (inet->daddr) {
    if (inet->daddr != saddr)
        return -1;
    score += 2;
}
if (inet->dport) {
    if (inet->dport != sport)
        return -1;
    score += 2;
}

一般而言,UDP socket如果没有调用connect,其daddr和dport是不会赋值的,因此关于这两个字段的判断相当于没有结果的判断,如果对于客户端socket,且调用了connect,那么就会按照4元组完全匹配的原则来匹配。
因此,创建多个绑定相同IP地址,相同端口的UDP程序,会起到热备份的作用,不会起到负载均衡的作用。另外如果新创建了同样绑定的新UDP socket,会改变原有多个socket在链表(或者别的什么容器)中的位置,数据到来时,具体会匹配到哪个socket,依然取决于固定的算法。

2.SO_REUSEPORT的不同之处

linux 3.9内核支持了BSD系统早已有之的SO_REUSEPORT这个socket选项,有了它就可以支持UDP的负载均衡啦,哈哈!在给出具体做法和实例之前,我们先看一下它是如何做到的,和之前的版本一样,在查找目标socket的时候,compute_score的逻辑几乎没有什么变化,不同的是,在compute_score之后,数据包的源IP和源端口也参与了运算,而不再像老版本那样只是严格按照目标IP地址和目标端口那样作为查找键参与运算,这样即使按照目标IP和目标端口已经查找到了一个匹配的socket,也会按照其源IP和源端口继续下去,以便在多个匹配的socket中选择一个,算法是固定的,因此可以保证:
a.相同4元组的数据包总是匹配到相同的一个socket;
b.不同4元组的数据包可能会被hash到不同的socket。
以上的没有什么神奇之处,其实现和之前的非负载均衡版本唯一不同的是就是在查找目标socket的时候让源IP地址和源端口也参与了进来,而这并不破坏4元组的唯一性,所谓的唯一性,其目的只是要保证同一数据发给特定的目标socket,保证4元组唯一只是实现这一目标的手段而已,目的和手段是万万不能混淆的,意识到这一点之后,绑定相同端口相同IP地址的socket同时接收数据就是理所当然的了,只要保证来自同一IP地址和同一端口的数据总是发送给同一socket即可,而算法本身保证了这一点。
socket查找算法是重要的,查找算法没有使用任何随机因素比如随机数之类的,而是使用固定的计算选择一个目标socket,这足以保证只要是固定的源IP地址和源端口,算出的目标socket位置就是固定的,核心代码如下:

sk_nulls_for_each_rcu(sk, node, &hslot->head) {
    score = compute_score(sk, net, saddr, hnum, sport,
                     daddr, dport, dif);
    if (score > badness) {
        result = sk;
        badness = score;
        reuseport = sk->sk_reuseport;
        if (reuseport) {
            hash = inet_ehashfn(net, daddr, hnum,
                        saddr, htons(sport));
            matches = 1;
        }
    } else if (score == badness && reuseport) {
        matches++;
        if (((u64)hash * matches) >> 32 == 0)
            result = sk;
        hash = next_pseudo_random32(hash);
    }
}

SO_REUSEPORT是一个其它UNIX平台古已有之的选项,Linux终于也支持了,本文没有讨论其对TCP的影响,因为其他人讨论的已经很多了,毕竟TCP的应用要多得多,TCP的进程绑定机制以及其本身的连接绑定机制也使得SO_REUSEPORT对TCP的影响很好理解,Listen状态的多个绑定相同IP地址和相同端口的TCP socket很容易根据syn源IP和端口来为其绑定一个流并且记住它。但是SO_REUSEPORT对于UDP的影响,我没有搜到什么太多的资料,加之我又特别需要UDP服务的负载均衡,因此我决定自己做一个。

需要C/C++ Linux服务器架构师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQLredis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

浅谈linux下基于UDP服务的负载均衡方法

 

3.实现一个多进程的UDP服务

3.1.只为OpenVPN

OpenVPN,一个充满矛盾的存在,它没有多处理,多多少少和UDP有关,它提倡用UDP,多多少少和TCP有关(是一定有关系的,别跟我讲TCP的细节,什么按序,重传,慢启动,滑动窗口之类的,好好看看OpenVPN的man吧...),然而TCP的多处理,可谓汗牛充栋了。
如今,UDP也可以多处理了,好事儿!迫不及待想对它动个小手术,但是手术之前,还有几件事情要确认。

3.2.多进程的UDP

有了SO_REUSEPORT支持之后,第一个反应以及第一个动作,那就是试一下效果。初始设计是极其简单的,那就是创建多个UDP socket,绑定到相同的IP地址和相同的端口,然后poll,poll到谁有POLLIN事件了就recvfrom。此时不断得使用不同的进程往该这些socket绑定的IP地址和端口发送数据,你会发现,和之前不支持SO_REUSEPORT的时候不一样了,数据会发送到不同的UDP socket上,并且可以保证只要源地址和源端口是同一个,那么发送到的socket也是同一个,如果换一下源端口,那么就可能发送到另外的socket了。这个实验的代码及其简单,代码就不贴了。这个实验证明了通过SO_REUSEPORT选项做UDP的负载均衡是可行的。
可是要充分利用多CPU性能,最重要的还是要使用多线程/多进程结构,特别是在处理UDP“长连接”的时候,说起UDP长连接,可能有人会觉得我把概念弄糊涂了,实际上我清楚得很,只不过没有更好的词也不想发明新的词来形容使用UDP协议时的OpenVPN是怎么收发数据的。至于短连接且不太频繁的,多进程意义倒不是很大,反而会引入进程创建,进程切换带来的开销。那么多进程自然是好的,如何实现呢?自然而然想到的就是预先建立多个进程了,每一个进程分配一个UDP socket来处理数据的收发,涉及到细节有很多种:
a.写一个程序,创建一个使用SO_REUSEPORT绑定一个IP和端口的UDP socket,然后启动多个实例,启动脚本可以独立于程序以外;
b.在主进程中预先fork多个进程,每一个进程只处理一个UDP socket;
c.在主进程中预先fork多个进程,每一个进程可以处理全部的UDP socket;
d.试图建立UDP的accept模型

以上的方案中,第一个很明显没有什么意思,它也是我着手OpenVPN负载均衡的第一个方案,很傻瓜,很简单,很实用,没什么技巧可以炫耀,没什么“技术含量”,以前的文章中说的多了,本文就不说了,只不过之前不能绑定相同端口(其实可以,但是绑了也没有用)。另外的3个方案中,每一个都可以讲一段故事。在讲故事之前,首先给出方案b的代码,因为它最纯粹,也最成功,也是唯一可行的方案。代码如下:

#include <sys/types.h>
#include <string.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/socket.h>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/uio.h>  

#define NUM_PROCESS     10
#define SO_REUSEPORT    15
#define SERV_PORT       12345
#define MAXSIZE         1024

struct worker_arg {
        int sock_fd;
        int process_num;
};

void DEBUG(char *argv) {
      //TODO  
}
void client_echo(int sock, int process_num)
{
        int n;
        char buff[MAXSIZE];
        struct sockaddr clientfrom;
        socklen_t addrlen = sizeof(clientfrom);
        for(;;) {
                memset(buff, 0, MAXSIZE);
                n = recvfrom(sock, buff, MAXSIZE, 0, &clientfrom, &addrlen);
                printf("%dn", process_num);
                n = sendto(sock, buff, n, 0, &clientfrom, addrlen);
        }
}

int create_udp_sock(const char *str_addr)
{
        int sockfd;
        int optval = 1;
        int fdval = 0;
        struct sockaddr_in servaddr, cliaddr;
        sockfd = socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, 0); /* create a socket */
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
       //这是关键,关键中的关键。如果没有这个,此代码在之前内核照样完美运行!完美?完美!
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = inet_addr(str_addr);
        servaddr.sin_port = htons(SERV_PORT);
        if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
                perror("bind error");
                exit(1);
        }
                perror("bind ");
        
        return sockfd;
}

void woker_process(int sock, int num)
{        
        int woker_sock = sock;//recv_fd(sock);
        if(woker_sock < 0){
        }
        client_echo(woker_sock, num);
    
}

pid_t create_work(void *arg)
{
        struct worker_arg *warg = (struct worker_arg*)arg;
        pid_t pid = fork();
        if (pid > 0) {
                return pid;
        } else if (pid == 0){
                woker_process(warg->sock_fd, warg->process_num);
                // 不可能运行到此
        } else {
                exit(1);
        }
        // 不可能运行到此
        return pid;
}

void schedule_process(int socks[2], char *str_addr)
{
        pid_t pids[NUM_PROCESS] = {0};
        int udps[NUM_PROCESS] = {0};
        int i = 0;
        for (i = 0; i < NUM_PROCESS; i++) {
                udps[i] = create_udp_sock(str_addr);
        }
        for (i = 0; i < NUM_PROCESS; i++) {
                struct worker_arg warg;
                warg.sock_fd = udps[i];
                warg.process_num = i;
                pid_t pid = create_work(&warg);
                pids[i] = pid;
        }
        while (1) {
                int stat;
                int i;
                pid_t pid = waitpid(-1, &stat, 0);
                for (i = 0; i < NUM_PROCESS; i++) {
                        if (pid == pids[i]) {
                                // 最最关键的,那就是”把特定的套接字传递到特定的进程中“
                                struct worker_arg warg;
                                warg.sock_fd = udps[i];
                                warg.process_num = i;
                                pid_t pid = create_work(&warg);
                                pids[i] = pid;
                                break;
                        }
                }
        }
}

int main(int argc, char** argv)
{
        int unix_socks[2];
        char *str_addr = argv[1];
        if(socketpair(AF_UNIX, SOCK_STREAM, 0, unix_socks) == -1){
                exit(1);
        }
        schedule_process(unix_socks, str_addr);
        return 0;
}

最最重要的是上面的那个wait逻辑,你要保证,一个进程挂掉以后,要将其迅速拉起来,因为如果不这样,所有命中(通过第四层的socket查找算法)该socket的数据都将得不到处理。

本来,我的代码使用的是进程间传递文件描述符的方式,也就是经典著作里面讲述的那种方式,但是我觉得它太UNIX了,就没有用,于是使用了fork这种更加UNIX的方式!以上的代码写的不好,但是那是可以体现UDP多进程负载均衡的最好方式了,什么东西都在代码本身!不过这代码可真够烂的...

4.问题1:中间有个socket死了会弄乱其它活着socket的位置

我们知道,SO_REUSEPORT选项设定了以后,会根据数据包的源地址和源端口来计算一个值,该值对应所有相同目标IP地址相同目标端口的socket中的一个socket的位置,内核便把这个位置的socket作为接收数据的socket,然而,如果这个socket死掉了的话,即使重新创建了一个新的socket凑足了数,也会使整个socket链表重新排序,除非死掉的是最后一个。由于socket查找算法是不变的,计算位置的数据也不变,诸多socket的顺序改变意味着数据源和目标处理进程的对应关系会发生改变,从而本应该处理该数据的socket没有收到数据,反而被别的socket接收,这也是上述多进程方案中a方案的唯一弊端,你必须在外部监控哪个进程挂掉了,进程挂掉意味着socket被关闭,你又没有办法(没有这样的API)再创建一个socket插入到挂掉进程使用的那个socket所在的位置,唯一的方法就是全部杀掉,重新开始,但是这显然不是最好的解决方案,最好的方案...
解决:保证UDP不死,即它不被关闭
怎么保证呢?很简单,那就是socket的创建和关闭全部由主进程来统一管理,工作子进程们只处理网络IO,不关闭socket,这就要求socket在创建的时候要带有CLOEXEC标志。这也是方案b和方案c被提出的原因(同时也是结果)。

5.UDP失败的accept模型-按需建立UDP处理进程

多么想为UDP建立一个accept模型,有了SO_REUSEPORT选项以后,貌似有了一点希望,那就着手写代码了。代码是写出来了,也能用,看样子还不错,可是总是有解决不了的小尾巴,最终由于UDP和4元组没有必然的对应关系(这也是UDP的本质,否则它就成了有连接协议了,详见问题3)推论出UDP的accept几乎是没有希望的,之所以还可以用SO_REUSEPORT选项来做UDP的多进程负载均衡,其实完全依赖了Linux内核处理UDP socket查找时的一个算法,该算法只是一种实现而已,并不能保证其它的系统或者未来的Linux内核不会改变算法的行为,因此这种UDP的多进程负载均衡并非标准方案,它的使用有赖于你对系统内核行为的熟悉以及特定版本特定算法的副作用的理解。

不管怎么样,还是给出一个很XX的实现吧,毕竟目标不是为了展示“瞧啊,我实现了一个UDP的accept”,这无异于对别人说,我有更麻烦的解法。醉翁之意不在酒,在于OpenVPN?NO,是为了引出一个排他唤醒的问题。看代码:

#define _GNU_SOURCE
#include <sys/types.h>
#include <string.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <sys/socket.h>
#include <unistd.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/uio.h>  
#include <poll.h>

#include <pthread.h>

#include "list.h"

#define NUM_PROCESS     10
#define SO_REUSEPORT    15
#define SERV_PORT       12345
#define MAXSIZE         1024

struct client_socket {
        struct list_head list;
        pid_t pid;
        int sock_fd;
        int process_num;
        struct sockaddr *src_addr;
        char init_data[MAXSIZE]; 
};


void DEBUG(char *argv) {
        
}

void client_echo(int sock, int process_num)
{
        int n;
        char buff[MAXSIZE];
        struct sockaddr clientfrom;
        socklen_t addrlen = sizeof(clientfrom);

                struct pollfd pfds[1];
                int pfd[1] = {-1};
                pfds[0].fd = sock;
                pfds[0].events = POLLIN|POLLOUT;
                while (1) {
                        int efds = -1;
                //以下注释掉的代码如果不放开,就会有问题!具体还请看我下面的关于”排他唤醒“的分析
                //      if ((efds = poll(pfds,1,-1)) < 0) {
                //              return -1;
                //      }
                //      if(pfds[0].revents & POLLIN) {
                                memset(buff, 0, MAXSIZE);
                                n = recvfrom(sock, buff, MAXSIZE, 0, &clientfrom, &addrlen);
                                printf("recv data:%s   size:%d   num:%d  pid:%dn", buff, n, process_num, getpid());
                                n = sendto(sock, buff, n, 0, &clientfrom, addrlen);
                //      }
                }
}

int create_udp_socks(const char *str_addr)
{
        int sockfd;
        int optval = 1;
        int fdval = 0;
        struct sockaddr_in servaddr, cliaddr;
        sockfd = socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, 0); /* create a socket */
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
        setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = inet_addr(str_addr);
        servaddr.sin_port = htons(SERV_PORT);
        if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
                perror("bind error");
                exit(1);
        }
        
        return sockfd;
}

void woker_process(int sock, int num, char *init_data, pid_t pid)
{        
        int woker_sock = sock;
        client_echo(woker_sock, num);
    
}

pid_t create_worker(void *arg)
{
        struct client_socket *pcs = (struct client_socket*)arg;
        pid_t pid = fork();
        if (pid > 0) {
                pcs->pid = pid;
                return pid;
        } else if (pid == 0){
                woker_process(pcs->sock_fd, pcs->process_num, pcs->init_data, pcs->pid);
                // 不可能运行到此
        } else {
                exit(1);
        }
        // 不可能运行到此
        return pid;
}

struct client_socket* udp_accept(struct pollfd *pfds)
{
        int ret_fd = -1;
        int efds = -1;
        static LIST_HEAD(worker_list);
        int i;
        struct client_socket *pcs = NULL;
        if ((efds = poll(pfds, NUM_PROCESS, -1)) < 0) {
                printf("poll errorn");
                return NULL;
        }
        for (i = 0; i < NUM_PROCESS; i++) {
                // 此处的循环只会处理没有hash到既有worker的UDP套接字
                if(pfds[i].revents & POLLIN) {
                        struct sockaddr_in *pclientfrom = (struct sockaddr_in *)calloc(1, sizeof(struct sockaddr_in));
                        socklen_t addrlen = sizeof(struct sockaddr_in);
                        char pesud_buf[MAXSIZE];
                        int n = 0;
                        struct list_head *pos;
                        struct client_socket *ocs;
                        n = recvfrom(pfds[i].fd, pesud_buf, MAXSIZE, MSG_PEEK, pclientfrom, &addrlen);
                        printf("accept n:%d   port:%dn", n, ntohs(pclientfrom->sin_port));
                        int flag = 0;
                        // 此处的循环为了防止为同一个“连接”(即相同的五元组)创建两个或者多个worker进程
                        list_for_each(pos, &worker_list) {
                                ocs = list_entry(pos, struct client_socket, list);
                                if (!memcmp(pclientfrom, ocs->src_addr, sizeof(struct sockaddr_in))) {
                                        flag = 1;
                                        break;
                                }
                        }
                        // 如果该“连接”已经被hash到一个既有的worker,忽略它,该worker自会处理!
                        if (flag) {
                                free (pclientfrom);
                                continue;
                        }
                        ret_fd = pfds[i].fd;
                        pcs = (struct client_socket*)calloc(1, sizeof(struct client_socket));
                        pcs->sock_fd = ret_fd;
                        pcs->process_num = i;
                        pcs->src_addr = pclientfrom;
                        INIT_LIST_HEAD(&pcs->list);
                        list_add(&pcs->list, &worker_list);
                        break;
                }
        }
        return pcs;
}

void schedule_process(int socks[2], char *str_addr, struct pollfd *pfds)
{
        pid_t pids[NUM_PROCESS] = {0};
        int udps[NUM_PROCESS] = {0};
        int i = 0;
        for (i = 0; i < NUM_PROCESS; i++) {
                udps[i] = create_udp_socks(str_addr);
                pfds[i].fd = udps[i];
                pfds[i].events = POLLIN;
        }
        while (1) {
                pid_t pid;
                struct client_sock *pcs = udp_accept(&pfds[0]);
                printf("accept :%pn", pcs);
                if (pcs != NULL) {
                        pid = create_worker(pcs);
                }
        }
}

void *wait_thread(void *arg)
{
        struct pollfd *pfds = (struct pollfd*)arg;
        while (1) {
                int stat;
                int i;
                pid_t pid = waitpid(-1, &stat, 0);
                // 一直以来,我总觉得应该wait一点什么,但是...
        }       
}

int main(int argc, char** argv)
{
        int unix_socks[2];
        char *str_addr = argv[1];
        struct pollfd pfds[NUM_PROCESS] = {0};
        pthread_t tid;
        int ret = -1;
        if(socketpair(AF_UNIX, SOCK_STREAM, 0, unix_socks) == -1){
                exit(1);
        }
        ret = pthread_create(&tid, NULL, wait_thread, &pfds[0]);
        if (ret) {
               exit(1);
        }
        schedule_process(unix_socks, str_addr, &pfds[0]);
        return 0;
}

以上的代码看似体现了UDP的accept逻辑,但是却不是!它真实地体现了UDP是不可能实现accept的,除非你使用类似conntrack的附加因素!我们还是尊重UDP的原本的意义吧,如果你非要想为UDP添加一个第四层的conntrack逻辑,何必不直接使用TCP呢?也许你会回答,TCP太低效,太复杂,哦,是的,你不喜欢TCP,是的,我也不喜欢,但是你可以实现一个用户态的逻辑而不要纠结于第四层,毕竟UDP本身就是无连接的。见过有谁为IP建立过连接机制吗?是的,太多了,ip_conntrack,MPLS,有状态Firewall...不一而足,,,这到底是协议设计之初的缺陷呢,还是人们得寸进尺的证明?如果你想做点什么,以上都是例子,另外,还有更好的,那就是DTLS和OpenVPN!

6.问题2:两个进程带PEEK以及不带PEEK同时recv一个socket-排他唤醒问题

说实话,之所以写下这个失败的UDP accept模型并不是为了想讲UDP怎么怎么地,而是想说一下Linux的排他唤醒问题,甚至更往前的,也就是UDP多进程处理的方案c的提出,即所有进程处理所有的socket,也是想说一下这个排他唤醒的问题。真正和UDP相关的问题,是本小节的子问题。
所谓的排他唤醒机制,简单的说就是:如果多个进程都睡眠在等待一个事件上,当该事件发生的时候,只能唤醒其中一个。socket在阻塞recv的时候,如果没有数据,就会睡眠等待,此时如果有数据来,会唤醒socket关联的进程,该唤醒就是排他唤醒,也就是说,如果有两个进程都在等待同一个socket上进行阻塞recv,有数据来的时候,只有一个进程会得到数据并返回,另一个进程继续睡眠等待。具体可以参见内核函数__wake_up_common:

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

知道了这一点后,我们来分析一下UDP accept模型中的一段代码,在主进程中:

while (1) {
    poll
    if (POLLIN) {
        recvfrom(with PEEK)
    }
}

在子进程中:

while (1) {
    recvfrom(...)
}

带有PEEK标志的recvfrom是不会把数据从接收缓冲去移走的,我们假设数据到来,会有唤醒动作,如果主进程poll时排在子进程recvfrom之前睡眠,那么会先唤醒主进程,poll返回,由于poll被唤醒不是排他的,所以子进程接着也会被唤醒,而此时主进程会去带有PEEK的读数据,但是子进程赶在前面把数据读走了,recvfrom返回,此时主进程的recvfrom扑了个空,进而主进程的recvfrom阻塞,等待数据,此时子进程又一次调用recvfrom,到此为止,两个进程都阻塞在recvfrom上睡眠,且主进程排在前面(它先睡眠),此时又来了一个数据,主进程被排他唤醒,子进程继续睡眠等待,主进程的recvfrom返回,由于带有PEEK标志,数据继续留在接收队列里面,下一轮的poll直接返回,进而继续调用PEEK版的recvfrom也会直接返回,如果再也没有新的数据或者信号到来的话,子进程将不会被唤醒,它将永远也得不到那个一直被主进程PEEK的数据...这就要求,子进程必须要用poll+recvfrom,因为poll保证真正有数据的时候才会返回,让进程继而去recvfrom,而主进程只是PEEK数据,所以就不会出现上述死循环,另外,本人觉得,带有PEEK标志的recv系列函数在被唤醒的时候,不应该是排他唤醒的!
是的,是的, PEEK唤醒不应该是排他的,因为之所以recv是排他的,是为了避免“承诺给一个进程”的数据被其他进程取走了,这里的关键词是“取走了”,然而PEEK版的recv是不可能取走任何东西的!但是,不管怎样,我不想改内核代码,何况我又不会真的去用这个UDP accept,因此也就用不到PEEK,除了PEEK方式的recv,其他真正的recv是有必要是排他的,否则到底谁执行数据处理呢?鉴于此,UDP多处理方案c被彻底否决。

6.1.子问题:UDP socket和4元组以及处理进程的对应关系

UDP无连接,它无法“记住“对端,因此也就不知道一个数据包的前置和后续,也就是说UDP不知道它的前置是否曾经来过,也不晓得它的后继还会不会来以及如果还会来的话,还会来多久,因此就无法为其预先分配资源,而数据处理资源的容器是什么?是进程(爱较真儿的又要说还有线程且线程更好了...)或者线程。无法预先分配资源是多处理的一大障碍!对于TCP而言,这很简单,一个syn包到来,理论上就可以为其分配资源了,典型得就是创建一个进程,最后一个fin或者reset之类的到来就知道该结束进程了,只是说系统没有这么实现罢了,而是把满三次握手成功后最为资源分配点,最先分配的是一个socket,也就是accept返回的那个,然后使其实际情况,是要fork呢,要继续select/poll呢,还是要event poll呢,由调用者决定。对于UDP,很难,一个UDP包的到来,它可能就是唯一的一个,比如DNS查询,也可能是一次会话的开始,比如DTLS或者OpenVPN连接,但是仅凭第四层,你很难区分。你几乎不可能为单独的UDP 4元组分配一个单独的进程,几乎不可能。
那么怎么办?只能当靶子守株待兔了,事先创建多个进程,一个进程处理一个socket,然后静等,等待有数据包来命中,至于撞到了哪一个socket,由第四层的socket查找算法决定!

7.问题3:连接关闭的问题以及何时结束进程的问题-UDP多路负载均衡方案并不通用

本文的讨论,最终选择了方案b,但是还有一个问题,那就是你无法在外面得知进程什么时候该退出了的消息,因为UDP不像TCP那样有一系列的诸如FIN,RST之类的标志表示连接断开,即使像OpenVPN那样的”有连接的UDP“,也无法在协议层面上得知,它的有连接是第七层的连接,也许你不知道,我可以告诉你,OpenVPN有一个叫做exit-notify的参数,可以代表连接的断开...即使像底层的ip_conntrack那样,也无法更好地去对待UDP的”连接状态“,虽然它真的追踪了UDP的4元组信息,俨然它已经成了有连接的协议(这难道不是它的主要工作吗?否则愧对于自己的名字啊)。除了超时这种机制之外,没有更好的了,毕竟当所有的裁决手段都失效的时候,一切都要靠时间来冲淡。然而即使是时间,也对第七层的逻辑无能为力,在这个意义上,你要么狠一点,它可能很不准,要么准一点,但可能很不狠,狠和准,看你怎么选,对于OpenVPN,我不选,...我在等待官方新版本...
UDP无法对应一个4元组,但是由于Linux内核在第四层查找socket有明确的算法可以保证相同的4元组始终对应一个socket,那么就可以保证相同的4元组总是对应一个进程,只要保证该进程始终处理该socket即可!但是这并不是绝对的,因为按照UDP协议的原始含义,它就是数据报协议,协议假设任意两个数据报都是没有任何关联的,因此协议栈第四层的socket查找逻辑不应该总是保证相同的4元组对应一个socket!然而UDP除了保持其数据报的语义外,还是一种真真切切的传输层协议,是一种IP协议的端到端映射,这就意味着你可以在其上面建立任何的”流“,虽然这不是协议栈的职责,但是Linux的socket查找算法帮我们做到了最重要的事,如果在__udp4_lib_lookup调用的inet_ehashfn中:

static inline unsigned int inet_ehashfn(struct net *net,
                    const __be32 laddr, const __u16 lport,
                    const __be32 faddr, const __be16 fport)
{
    return jhash_3words((__force __u32) laddr,
                (__force __u32) faddr,
                ((__u32) lport) << 16 | (__force __u32)fport,
                inet_ehash_secret + net_hash_mix(net));
}

稍微改变了一下方略,引入了一个random,那么以上的所有负载均衡方案全部报废!

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>