<返回更多

Linux两种处理模式reactor模式proactor模式

2020-12-24    
加入收藏

前言

一、Reactor模式

Linux两种处理模式reactor模式proactor模式

 

Linux两种处理模式reactor模式proactor模式

 


Linux两种处理模式reactor模式proactor模式

 

需要C/C++ Linux服务器架构师学习资料后台私信“1”免费获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQLredis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

多线程Reactor模式多线程Reactor模式特点:它要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将时间通知工作线程(逻辑单元)。除此之外,主线程不做任何其他实质性的工作读写数据,接受新的连接,以及处理客户请求均在工作线程中完成工作流程:①主线程往epoll内核事件表中注册socket上有数据可读②主线程调用epoll_wait等待socket上有数据可读③当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列④睡眠在请求请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪时间⑤主线程调用epoll_wait等到socket可写⑥当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列⑦睡眠在请求队列上的某个工作线程被唤醒,它向socket上写入服务器处理客户请求的结果

单线程Reactor模式单线程Reactor模式与多线程Reactor模式原理相同。但是工作都是在同一个线程中完成的单线程优缺点:优点:Reactor模型开发效率上比起直接使用IO复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源。优点为每个事件处理中很多时候可以 不考虑共享资源的互斥访问缺点:可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定 律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力单线程Reactor使用多核:如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆(Reactor),每个反应堆对应一颗CPU核心这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如Nginx这样的http静态服务器下面是单线程Reactor模式的实现代码,下载下来之后可以直接编译运行:

 

// reactor.c

// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/server-client/reactor.c

// gcc -o reactor reactor.c

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <string.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <sys/socket.h>

#include <sys/types.h>

#include <sys/epoll.h>

#include <errno.h>

#include <time.h>

#include <libgen.h>

#include <fcntl.h>

 

#define MAX_EPOLL_EVENTS    1024

#define MAX_BUFFER_SIZE     4096

 

typedef int NCALLBACK(int, int, void*);

 

// 事件结构体, 每个套接字都会被封装为一个事件

struct ntyevent {

 int fd;           // 事件对应的fd

 int events;       // 事件类型(  本代码中我们只处理EPOLL_IN和EPOLL_OUT)

 

 void *arg;        // 事件回调函数的参数3, 实际传入的是一个struct ntyreactor结构体指针

 int (*callback)(int fd, int events, void *arg); //事件回调函数

 

 int status;       // 当前事件是否位于epoll集合中: 1表示在, 0表示不在

 

 char buffer[MAX_BUFFER_SIZE]; // 读写缓冲区

 int length;       //缓冲区数据的长度

 

 long last_active; // 最后一次活跃的时间

};

 

 

// Reactor主体

struct ntyreactor {

 int epoll_fd;             // epoll套接字

 struct ntyevent *events; // reactor当前处理的事件集

};

 

// 创建一个Tcp Server

int init_server(char *ip, short port);

// 向reactor中添加一个服务器监听事件

int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback);

 

 

/***下面这3个函数是用来对reactor操作的***/

// 初始化reactor

struct ntyreactor *ntyreactor_init();

// 销毁reactor

int ntyreactor_destroy(struct ntyreactor *reactor);

// reactor运行函数

int ntyreactor_run(struct ntyreactor *reactor);

 

 

 

/***下面这3个函数是用来对ntyevent事件结构操作的***/

// 将一个fd封装到事件结构中

int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);

// 将一个事件添加/更新到epoll的事件表中

int nty_event_add(int epoll_fd, struct ntyevent* ev);

// 将一个事件移出epoll事件表

int nty_event_del(int epoll_fd, struct ntyevent* event);

 

 

/***下面这3个函数是ntyevent事件可以使用的回调函数***/

int accept_callback(int fd, int events, void *arg);

int recv_callback(int fd, int events, void *arg);

int send_callback(int fd, int events, void *arg);

 

 

 

int main(int argc, char *argv[])

{

 if(argc != 3)

    {

 printf("usage: ./%s [ip] [port]n", basename(argv[0]));

 exit(EXIT_FAILURE);

    }

 

 char *ip = argv[1];

 short port = atoi(argv[2]);

 

 int sock_fd;

 

 // 1.初始化一个Tcp Server

    sock_fd = init_server(ip, port);

 

 // 2.初始化reactor

 struct ntyreactor *reactor = ntyreactor_init();

 if( reactor == NULL)

    {

 printf("Error in %s(), ntyreactor_init: create reactor errorn", __func__);

 exit(EXIT_FAILURE);

    }

 

 // 3.将Tcp Server添加到reactor事件集中

    ntyreactor_addlistener(reactor, sock_fd, accept_callback);

 

 // 4.运行reactor

    ntyreactor_run(reactor);

 

 // 5.销毁

    ntyreactor_destroy(reactor);

 

    close(sock_fd);

 

 return 0;

}

 

int init_server(char *ip, short port)

{

 // 1.创建套接字

 int sock_fd = socket(AF_INET, SOCK_STREAM, 0);

 if(sock_fd == -1)

    {

 printf("Error in %s(), socket: %sn", __func__, strerror(errno));

 return -1;

    }

 

 // 2.初始化服务器地址

 struct sockaddr_in server_addr;

 memset(&server_addr, 0, sizeof(server_addr));

    server_addr.sin_family = AF_INET;

 if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1)

    {

 printf("Error in %s(), inet_pton: %sn", __func__, strerror(errno));

 return -1;

    }

    server_addr.sin_port = htons(port);

 

 // 3.绑定服务器地址

 if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1)

    {

 printf("Error in %s(), bind: %sn", __func__, strerror(errno));

 return -1;

    }

 

 // 3.监听

 if(listen(sock_fd, 20) == -1)

    {

 printf("Error in %s(), listen: %sn", __func__, strerror(errno));

 return -1;

    }

 

 printf("Listen start [%s:%d]...n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));

 

 return sock_fd;

}

 

struct ntyreactor *ntyreactor_init()

{

 // 1.创建一个reactor

 struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));

 if(reactor == NULL)

 return NULL;

 memset(reactor, 0, sizeof(struct ntyreactor));

 

 // 2.创建reacotr的epoll_fd

    reactor->epoll_fd = epoll_create(1);

 if(reactor->epoll_fd == -1)

    {

 printf("Error in %s(), epoll_create: %sn", __func__, strerror(errno));

 free(reactor);

 return NULL;

    }

 

 // 3.创建reactor的事件集

    reactor->events = (struct ntyevent*)malloc(sizeof(struct ntyevent) * MAX_EPOLL_EVENTS);

 if(reactor->events == NULL)

    {

 printf("Error in %s(), malloc: %sn", __func__, strerror(errno));

        close(reactor->epoll_fd);

 free(reactor);

 return NULL;

    }

 

 return reactor;

}

 

int ntyreactor_destroy(struct ntyreactor *reactor)

{

 if(reactor == NULL)

    {

 printf("Error in %s(): %sn", __func__, "reactor arg is NULL");

 return -1;

    }

 

 // 关闭epoll_fd、销毁事件集、释放结构

    close(reactor->epoll_fd);

 free(reactor->events);

 

 free(reactor);

 

 return 0;

}

 

int ntyreactor_run(struct ntyreactor *reactor)

{

 // 1.判断参数

 if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL)

    {

 printf("Error in %s(): %sn", __func__, "reactor arg is error");

 return -1;

    }

 

 

 struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];

 

 // 2.进行epoll_wait()

 int nready;

 while(1)

    {

 // 超时检测

 /*

        int checkpos = 0, i;

        long now = time(NULL);

		for (i = 0; i < MAX_EPOLL_EVENTS; i++, checkpos ++) {

			if (checkpos == MAX_EPOLL_EVENTS) {

				checkpos = 0;

			}

            // 如果当前索引处的事件status为0, 则不检测, 进行下一个

			if (reactor->events[checkpos].status != 1) {

				continue;

			}



            // 如果超过60秒, 那么就认定为超时, 超时后关闭移除

			long duration = now - reactor->events[checkpos].last_active;

			if (duration >= 60) {

				close(reactor->events[checkpos].fd);

				printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);

				nty_event_del(reactor->epfd, &reactor->events[checkpos]);

			}

		}*/

 

        nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000);

 // 3.函数出错

 if(nready == -1)

        {

 // 如果函数在阻塞过程中接收到信号, 那么继续进行epoll_wait()

 if(errno == EAGAIN || errno == EWOULDBLOCK)

 continue;

 printf("Error in %s(), epoll_wait: %sn", __func__, strerror(errno));

 return -1;

        }

 // 4.函数超时

 else if(nready == 0)

 continue;

 // 5.有事件准备好

 else

        {

 // 遍历处理已就绪的事件

 int i;

 for(i = 0; i < nready; ++i)

            {

 // 获取事件结构体, 保存在struct epoll_event结构的data.ptr中

 struct ntyevent* ev = (struct ntyevent*)ep_events[i].data.ptr;

 

 // 如果事件可读

 if((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN))

                    ev->callback(ev->fd, ev->events, ev->arg);

 

 // 如果事件可写

 if((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))

                    ev->callback(ev->fd, ev->events, ev->arg);

            }

        }

    }

 

 return 0;

}

 

int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback)

{

 if(reactor == NULL || fd <0 || callback == NULL)

    {

 printf("Error in %s(): %sn", __func__, "arg error");

 return -1;

    }

 

 // 初始化ntyevent事件结构, 然后添加到reactor的epoll事件表中即可

    nty_event_set(&reactor->events[fd], fd, EPOLLIN, 0, 0, callback, reactor);

    nty_event_add(reactor->epoll_fd, &reactor->events[fd]);

 

 return 0;

}

 

int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg)

{

 if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0)

    {

 printf("Error in %s(): %sn", __func__, "arg error");

 return -1;

    }

 

 // 初始化ntyevent结构的相关内容即可

    ev->fd = fd;

    ev->events = event;

    ev->arg = arg;

    ev->callback = callback;

    ev->status = status;

    ev->length = length;

    ev->last_active = time(NULL);

 

 return 0;

}

 

int nty_event_add(int epoll_fd, struct ntyevent* ev)

{

 if(epoll_fd <0 || ev == NULL)

    {

 printf("Error in %s(): %sn", __func__, "arg error");

 return -1;

    }

 

 // 1.创建一个epoll事件结构

 struct epoll_event ep_event;

 memset(&ep_event, 0, sizeof(ep_event));

    ep_event.events = ev->events;

    ep_event.data.ptr = ev;

 //ep_event.data.fd = ev->fd; data成员是一个联合体, 不能同时使用fd和ptr成员

 

 // 2.如果当前ev已经在epoll事件表中, 那么就修改; 否则就把ev加入到epoll事件表中

 int op;

 if(ev->status == 0)

    {

        op = EPOLL_CTL_ADD;

        ev->status = 1;

    } 

 else

        op = EPOLL_CTL_MOD;

 

 // 3.添加/更新 

 if(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) == -1)

    {

 printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno));

 return -1;

    }

 

 return 0;

}

 

int nty_event_del(int epoll_fd, struct ntyevent* ev)

{

 if(epoll_fd < 0 || ev == NULL || ev->status != 1)

    {

 printf("Error in %s(): %sn", __func__, "ev arg is error");

 return -1;

    }

 

 // 初始要删除的epoll事件结构

 struct epoll_event ep_event;

 memset(&ep_event, 0, sizeof(ep_event));

    ep_event.data.ptr = ev;

 //ep_event.data.fd = ev->fd; data成员是一个枚举, 不能同时使用ptr和fd成员

    ev->status = 0;

 

 // 从epoll事件表中删除epoll事件

 if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) == -1)

    {

 printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno));

 return -1;

    }

 

 return 0;

}

 

int accept_callback(int fd, int events, void *arg)

{

 // 1.获得reactor结构

 struct ntyreactor *reactor = (struct ntyreactor*)arg;

 // 2.获取该fd对应的事件结构

 struct ntyevent *ev = reactor->events + fd;

 

 // 3.初始化客户端地址结构

 struct sockaddr_in cli_addr;

 memset(&cli_addr, 0 , sizeof(cli_addr));

 socklen_t len = sizeof(cli_addr);

 

 // 4.接收客户端

 int cli_fd;

    cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len);

 if(cli_fd == -1)

    {

 printf("Error in %s(), accept: %sn", __func__, strerror(errno));

 return -1;

    }

 

 int i;

 do {

 // 5.在reactor事件表中找到第一个空位置, 用i表示新事件存放的位置, 也是其套接字的值

 // reactor->events的0、1、2、3、4都被占用了, 客户端第一个可以使用的套接字为5, 因此此处从5开始遍历

 for(i = 5; i< MAX_EPOLL_EVENTS; ++i)

        {

 if(reactor->events[i].status == 0)

 break;

        }

 

 // 6.如果满了, 就退出

 if(i == MAX_EPOLL_EVENTS)

        {

 printf("Error in %s(): max connect limit[%d]n", __func__, MAX_EPOLL_EVENTS);

 return -1;

        }

 

 // 7.将套接字设置为非阻塞

 int flag = 0;

 if ((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK)) < 0) {

 printf("Error in %s(), fcntl: %sn", __func__, strerror(errno));

 return -1;

		}

 

 // 8.将新事件添加到reactor事件表中

 // 此处我们将新客户端的回调函数首先设置为recv_callback, 事件类型为EPOLLIN, 因为一般都是客户端向服务器发送数据的

        nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);

        nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]);

    } while(0);

 

 printf("New connect: [%s:%d], [time:%ld], pos[%d]n", 

        inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);

 

 return 0;

}

 

int recv_callback(int fd, int events, void *arg)

{

 // 1.获得reactor结构

 struct ntyreactor *reactor =(struct ntyreactor*)arg;

 // 2.获取该fd对应的事件结构

 struct ntyevent *ev = reactor->events + fd;

 

 // 3.先将事件从epoll事件集移除

    nty_event_del(reactor->epoll_fd, ev);

 

 // 3.接收数据

 int rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE, 0);

 if(rc < 0)        //recv出错

    {

 //if(errno == EAGAIN || errno == EWOULDBLOCK)

 //    return rc;

 

 printf("Error in %s(), recv: %sn", __func__, strerror(errno));

 

 // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了

        close(ev->fd);

    }

 else if(rc == 0)  //对方关闭了

    {

 printf("Client closed the connection, fd = %dn", ev->fd);

 

 // 此处我们也当做错误处理

 // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了

        close(ev->fd);

    } 

 else //接收到数据

    {

        ev->buffer[rc] = '';

 printf("Recv[fd = %d]: %sn", ev->fd, ev->buffer);

 

 // 将事件变为可读, 然后加入到epoll事件表中

        nty_event_set(ev, ev->fd, EPOLLOUT, rc, 0, send_callback, reactor);

        nty_event_add(reactor->epoll_fd, ev);

    }

 

 return rc;

}

 

int send_callback(int fd, int events, void *arg)

{

 // 1.获得reactor结构

 struct ntyreactor *reactor =(struct ntyreactor*)arg;

 // 2.获取该fd对应的事件结构

 struct ntyevent *ev = reactor->events + fd;

 

 // 3.此处我们把接收的内容再回送给对象, 因此使用的是ev->buffer

 int rc = send(ev->fd, ev->buffer, ev->length, 0);

 if(rc > 0) //send成功

    {

 printf("Send[fd = %d]: %sn", ev->fd, ev->buffer);

 

 // 移除、添加: 将其变为可读

        nty_event_del(reactor->epoll_fd, ev);

        nty_event_set(ev, ev->fd, EPOLLIN, 0, 0, recv_callback, reactor);

        nty_event_add(reactor->epoll_fd, ev);

    }

 else //send失败

    {

 printf("Error in %s(), send: %sn", __func__, strerror(errno));

 

 // 关闭、移除

        close(ev->fd);

        nty_event_del(reactor->epoll_fd, ev);

    }

 

 return rc;

}

二、Proactor模式

Proactor模式特点与Reactor不同,Proactor模式将所有的I/O操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑

Proactor模式的工作流程①主线程调用aio_read函数向内核注册socket上读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例)②主线程继续处理其他逻辑③当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用④应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(这里以信号为例)⑤主线程继续处理其他逻辑⑥当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕⑦应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket在上图中,连接socket上的读写事件是通过aio_read/aio_write向内核注册的,因此内核将通过信号来向应用程序报告连接socket上的读写事件。所以,主线程的epoll_wait调用仅能用来检测监听socket上的连接请求事件,而不能用来检测连接socket的读写事件

三、使用同步I/O模拟Proactor模式

原理:主线程执行数据读写操作,读写完成之后,主线程向工作线程通知这一“完成事件”。那么从工作线程的角度来看,它们就直接获得了数据读写的结果,接下来要做的只是对读写的结果进行逻辑处理

工作流程:①主线程往epoll内核事件表中注册socket上的读就绪事件②主线程调用epoll_wait等待socket上有数据可读③当socket上有数据可读时,epoll_wait通知主线程。主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列④睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册socket上的写就绪事件⑤主线程调用epoll_wait等到socket可写⑥当socket可写时,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果

四、几种开源库

优先级libevent:激活的事件组织在优先级队列中,各类事件默认的优先级是相同的,可以通过设置 事件的优先级使其优先被处理libev:也是通过优先级队列来管理激活的时间,也可以设置事件优先级libuv:也是通过优先级队列来管理激活的时间,也可以设置事件优先级

事件循环libevent:event_base 用于管理事件libev:激活的事件组织在优先级队列中,各类事件默认的优先级是相同的,libuv:可以通 过设置事件的优先级 使其优先被处理

线程安全event_base 和 loop 都不是线程安全的,一个 event_base 或 loop 实例只能在用户的一个线程 内访问(一般是主线程),注册到 event_base 或者 loop 的 event 都是串行访问的,即每个执 行过程中,会按照优先级顺序访问已经激活的事件,执行其回调函数。所以在仅使用一个 event_base 或 loop 的情况下,回调函数的执行不存在并行关系

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>