目录
2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent
6. 通过Redis网络部分,学到如何实现reactor模式
上一章节讲解了Redis的网络交互流程,没有对关于网络部分的结构体有具体的讲解,所以该文章就主要讲解Redis网络部分使用的结构体。
1. epoll的封装
epoll有三个函数调用:
- epoll_create:创建epoll实例,返回一个epoll fd,即是用于管理待检测的文件描述符的集合。
- epoll_ctl:管理红黑树实例上的节点,可以进行添加、修改、删除操作。
- epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)。等待就绪的fd,并返回就绪的fd,存储在events中,以及个数。
结构体aeApiStae
- 其内部有变量epfd,即是调用epoll_create创建出来fd
- *events即是就绪的fd存储的位置(数组)
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
创建epoll fd的封装
主要流程:
- 申请内存空间给结构体aeApieState,申请内存空间给成员变量events。
- 使用epoll_create创建出epfd
//现在还没了解到aeEventLoop,可以先不用管aeEventLoop相关的,不影响看代码。
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
anetCloexec(state->epfd);
eventLoop->apidata = state;
return 0;
}
//重置aeApiState的events的大小,即是重置存放就绪fd的空间大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
aeApiState *state = eventLoop->apidata;
state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
return 0;
}
//释放结构体aeApiState
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->epfd);
zfree(state->events);
zfree(state);
}
epoll_ctl的封装
主要是两个:添加和删除。
- 添加的就是使用epoll_ctl(epfd,EPOLL_CTL_ADD ,....)
- 删除时使用epoll_ctl(epfd,EPOLL_CTL_MOD,....)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
//得到要关注的事件类型mask
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
epoll_wait的封装
即是调用epoll_wait(...)。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
//eventLoop->setsize就是state->evnets数组的元素个数
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) { //遍历就绪的fd
int mask = 0; //mask就是该fd就绪的事件
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
//把就绪的fd存储到 eventLoop->fired[j]中
//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
2. 结构体aeFileEvent、aeFiredEvent、aeTimeEvent
结构体aeFileEvent
一个客户端建立连接后,会把该客户端的fd添加到epoll上,并监听其关注的事件。假如当前fd需要监听读事件,那该fd读就绪,那对应的读事件函数就会被调用。
所以,可以把fd关注的事件和fd就绪会执行的回调函数封装成一个结构体,Redis叫其是aeFileEvent。
- mask就是fd关注的事件类型
- rfileProc是读就绪会执行的回调函数,wfileProc就是写就绪会执行的回调函数。这两个都需要后续去定义和注册
- clientData就是回调函数中的参数
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
//回调函数的类型
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
//前面的形式可能不好看出,用c++11写的话,如下
//using aeFileProc=std::function<void(aeEventLoop* eventLoop,int fd,void* clientData,int mask)>;
所以,客户端就会有自己的aeFileEvent,即是一个客户端就对应一个aeFileEvent。 当读或写就绪时,就会调用对应的函数。
其实,服务端也是对应一个aeFileEvent,因为也要监听服务端的读事件。当客户端来连接,服务端读事件就绪,就会调用服务端的读事件回调函数进行accept。
所以就会有创建aeFileEvent,每个客户端或者一个服务端都会使用函数aeCreateFileEvent。(后续会详讲,先留个印象)
看到这里读者可能会有疑惑,不是说要对某个fd进行注册监听的吗,怎么该结构体没有fd的呢?
那是因为aeFileEvent是存储在数组内。数组的下标就是该aeFileEvent对应的fd,也即是客户端对应的fd。(后续从代码中可以看出的)
结构体aeFiredEvent
如名字一样,其就是就绪事件。一个就绪事件,就肯定是可以从中知道是哪个fd就绪,还有知道是哪种类型事件就绪。
- fd表示epoll_wait返回的就绪fd
- mask表示epol_wait返回的事件类型
epoll_wait返回的一个就绪fd就对应一个aeFireEvent。
aeApiPoll就是把epoll_wait返回的就绪fd和事件类型 逐个赋值给aeFireEvent。
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
//把epoll_wait返回的就绪fd和事件赋值给fired[j]
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
//eventLoop->setsize就是state->evnets数组的元素个数
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
numevents = retval;
for (int j = 0; j < numevents; j++) { //遍历就绪的fd
int mask = 0; //mask就是该fd就绪的事件
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
//把就绪的fd存储到 eventLoop->fired[j]中
//可以先不用关注eventLoop,这里就是把就绪的fd存储到另一地方嘛
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
结构体aeTimeEvent
其是时间事件,可以认为是个定时器,定时会执行给定的函数。
从代码可以看到其是个双向链表。
- 每个时间事件都有一个事件id,aeEventLoop中的timeEventNextId是下一个要注册的时间事件id。
- when_sec、when_ms是时间事件(定时器任务)的发生时间
- timeProc是时间事件的处理回调函数。说明:aeTimeProc 需要返回一个 int 值,代表下次该超时事件触发的时间间隔。如果返回 - 1,则说明超时时间不需要再触发了,标记为删除即可
- finalizerProc是时间事件要删除时的处理函数
//时间事件结构
typedef struct aeTimeEvent {
long long id; //时间事件的唯一标识符,自增
//事件的到达时间(即是执行时间)
long when_sec; /* seconds */
long when_ms; /* milliseconds */
//事件处理函数 (到期执行的回调函数)
aeTimeProc *timeProc;
//事件释放函数 (回调函数)
aeEventFinalizerProc *finalizerProc;
void *clientData; //多路复用库的私有数据
//双向链表
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; //以防止计时器事件在递归时间事件调用中释放
} aeTimeEvent;
//时间事件回调函数类型
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
//删除定时事件的回调函数类型
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
3. struct aeEventLoop
EventLoop是个事件循环,其主要功能就是实现 while(1){ epoll_wait(....) }。所以说是个事件循环,这也对epoll再进一步封装。
aeEventLoop是Reactor模型的具体抽象,把网络读写事件和时间事件(定时器任务)可以统一处理。
//ae.h
// 事件循环
typedef struct aeEventLoop {
int maxfd; //已经注册的文件描述符的最大值
int setsize; //setsize是能注册的fd的最大值(epoll_wait函数中的参数maxevents)
long long timeEventNextId; //下一个要注册的时间事件id
time_t lastTime; //最后一次执行时间事件的时间
aeFileEvent *events; //是数组,已注册的文件事件 (就是IO event)
aeFiredEvent *fired; //数组,已就绪的文件事件
aeTimeEvent *timeEventHead; //定时器链表的头结点(头结点不一定是最早超时的任务)
int stop; //eventLoop的开关
void *apidata; //具体的IO多路复用实现,即是前面讲的结构体aeApiState变量。
aeBeforeSleepProc *beforesleep;//在处理事件前要执行的回调函数(即是在执行epoll_wait()之前)
aeBeforeSleepProc *aftersleep;//在处理事件后要执行的回调函数(即是在执行epoll_wait()之后)
int flags; //设置的标识位
} aeEventLoop;
//进入循环等待之前的回调函数类型
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
从上面的注释知道 aeEventLoop的成员变量的用处。所以这里就只重点讲解两个变量*events 和 *fired。
- *event就是个数组,setsize是数组的元素个数。events的元素是aeFileEvent。前面说了一个客户端对应一个aeFileEvent。所以,events就是存储需要被监听的客户端。events数组下标是aeFileEvent中的fd。
- fired数组是读写已就绪的 网络事件 数组。数组元素是记录了就绪的fd及其epoll_wait返回的事件类型。
aeEventLoop相关的函数
1. 创建eventloop
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
//分配内存给eventLoop
eventLoop = zmalloc(sizeof(*eventLoop))
//分配内存给eventLoop中的events 和 fired 数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
//设置其一些成员变量
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
//创建epoll fd,即是调用epoll_create, 内部把创建出来的apiState赋值给eventLoop->apidata
aeApiCreate(eventLoop)
//当前每个aeFileEvent都不关注任何事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
//省略了些内存申请失败的处理................
}
2. 创建aeFileEvent
内部会调用aeApiAddEvent。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) { //判断客户端fd是否大于events数组元素个数
errno = ERANGE;
return AE_ERR; //若是大于,返回错误
}
//从events数组中获取一个元素
aeFileEvent *fe = &eventLoop->events[fd]; //从这可以看出events的下标就是客户端的fd
if (aeApiAddEvent(eventLoop, fd, mask) == -1) //添加fd到epoll上,并关注事件mask
return AE_ERR;
fe->mask |= mask; //设置结构体aeFileEvent关注的事件
//设置回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData; //设置回调函数的参数
//更新已监听的fd的最大值
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
//创建时间事件,即是定时器
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
//设置定时器的回调函数
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData; //设置回调函数的参数
te->prev = NULL;
te->next = eventLoop->timeEventHead; //从这句代码可以看出其是往头部插入的
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te; //更新链表头部为新插入的节点
return id;
}
3. 删除aeFileEvent
通过传入的fd找到evnets数组中的元素,之后调用aeApiDelEvent进行删除。之后吧该位置的
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= eventLoop->setsize) return;
//通过传入的fd找到events数组中的元素
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
/* We want to always remove AE_BARRIER if set when AE_WRITABLE
* is removed. */
if (mask & AE_WRITABLE) mask |= AE_BARRIER;
aeApiDelEvent(eventLoop, fd, mask); //进行删除
fe->mask = fe->mask & (~mask); //取消监听mask事件
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j; //更新已注册的fd的最大值
}
}
4. 开始事件循环
aeMain就是一个while()循环,循环内部是aeProcessEvents函数。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
aeProcessEvents的主要步骤:
回调函数beforsleep和aftersleep可以先省略不关注的。因为目前其还没有用途,先知道有这两个回调函数就行。
- 计算epoll_wait()需要的阻塞时间。
- 执行beforsleep
- epoll_wait等待事件发生。
- 处理发生的IO事件,根据发生的事件类型来调用对应的回调函数:
- 若是AE_READABLE类型调用rfileProc,
- 若是AE_WRITABLE类型调用wfileProc。
- 处理时间事件。若该时间事件是周期性的,执行完后会再添加到时间事件链表的。
//为了能更好理解读懂,该函数只展示了主体流程,有些细节是没有展示
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){
struct timeval tv, *tvp;
//省略了epoll_wait函数参数timeout的计算过程,其不是框架的重点,后续可以再详细了解
...............................
//执行befroesleep回调函数,这是在初始化server时绑定的,可以先不关注
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
//即是调用epoll_wait, 等待就绪的fd
numevents = aeApiPoll(eventLoop, tvp);
...................
//遍历执行已就绪的fd的回调函数
for (int j = 0; j < numevents; j++) {
//在aeApiPoll中把就绪的fd和事件赋值给了fired[j]
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
//得到就绪的fd和事件类型
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
//下面就执行对应的回调函数
if (!invert && fe->mask & mask & AE_READABLE) { //读就绪,执行读回调函数
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
if (fe->mask & mask & AE_WRITABLE) { //写就绪,执行写回调函数
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
......................
processed++;
}
}
if (flags & AE_TIME_EVENTS) //flags有需要,就执行时间事件,即是执行定时器任务
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
4.什么时候设置回调函数的?
看aeProcessEvents时候可能会有疑惑,调用fe->rfileProc,但是都不知道该函数是什么内容的。
因为这是通过绑定的。就是把一个函数赋值给fe->rfileProc。
从main()入手。
//server.c
int main(int argc, char **argv) {
.......................
initServer();
}
//initServer函数中就有绑定回调函数的
void initServer(void) {
//省略了很多其他部分的内容
//socket(...),bind(...),listen(...),创建服务端的流程....................
............................................
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
/* Create an event handler for accepting new connections in TCP sockets. */
createSocketAcceptHandler(&server.ipfd, acceptTcpHandler)
/* Register before and after sleep handlers (note this needs to be done
* before loading persistence since it is used by processEventsWhileBlocked. */
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
................................
}
1. 绑定SleepProc
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
eventLoop->beforesleep = beforesleep;
}
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}
在服务器初始化时候就绑定了这两个函数,那么在运行aeProcessEvents时候,就会调用函数beforesleep和aftersleep。
2. 绑定服务端的读事件回调函数
就是在createSocketAcceptHandler,其就是调用aeCreateFileEvent。sfd->fd[j]就是服务端fd,AE_READABLE就是需要监听的事件,即是监听服务端的读事件。回调函数是accept_handler,即是把accept_handler赋值给fe->rfileProc。
从函数aeProcessEvents,会返回就绪的fd和就绪的事件类型。当服务端fd返回的时候,是读事件就绪,就会调用fe->rfileProc。而这时rfileProc就是accpet_handler,即是会调用accpet_handler。
//sfd是个数组,我们看源码的话,目前就当其是一个元素的即可,就不用使用for
//就把这个函数当做就是使用aeCreateFileEvent即可。
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
for (int j = 0; j < sfd->count; j++) {
aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL)
}
//错误处理没有展示
return C_OK;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
................
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
.....................
}
回调函数acceptTcpHandler很明显会调用accpet去对客户建立连接的。
3. 绑定客户端的读事件回调函数
继续跟着函数acceptTcpHandler。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
while(max--) {
//调用accept
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
............................
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
.................
/* Create connection and client */
client *c = createClient(conn))
/* Last chance to keep flags */
c->flags |= flags;
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
connSetReadHandler(conn, readQueryFromClient); //设置客户端的读回调函数
connSetPrivateData(conn, c);
}
...................
if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
}
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
}
set_read_handler也是个回调函数,其设置成了是函数connSocketSetReadHandler。
这里set_read_handler也是个回调函数是因为有两种类型的connection,Redis是把一个客户端封装成一个connnection(该结构体后续会讲解)。
//当前就认为conn->type->ae_handler是参数func即可,内部有很多兜兜转转
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
这样就把readQueryFromClient设置给了fe->rfileProc。当epoll_wait返回就绪的fd和事件,若该fd是客户端,也是读事件,那就会执行fe->rfileProc,即是执行readQueryFromClient。
4. 绑定客户端的写事件回调函数
这个是在绑定的beforeSleep函数中。
void beforeSleep(struct aeEventLoop *eventLoop) {
..........................
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
.............................
}
int handleClientsWithPendingWritesUsingThreads(void) {
...................
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//如果缓冲区中还有回复客户端的数据,就需要设置写回调函数,当epoll_wait返回客户端fd的写事件时候,就会执行sendReplyToClient
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
}
}
......................
}
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_write_handler(conn, func, 0);
}
和set_read_handler一样,set_write_handler也是后面设置的。该函数是connSocketSetWriteHandler。
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;
conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
这样就把 sendReplyToClient设置给了fe->wfileProc。当epoll_wait返回客户端fd的写事件时候,就会调用fe->wfileProc,即是执行sendReplyToClient。
5.绑定时间事件
使用aeCreateTimeEvent来绑定时间事件的回调函数。在aeProcessEvents函数内部会调用processTimeEvents去执行定时器任务。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
//设置事件的回调函数
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
5. 有了IO多路复用,为什么还需要reactor模式?
IO多路复用与事件驱动
首先要明确一点,reactor模式就是基于IO多路复用的。事件驱动也是IO多路复用的,不是说使用了reactor模式才是使用了事件驱动。
以事件为连接点,当有IO事件准备就绪时,就会通知用户,并且告知用户返回的是什么类型的事件,进而用户可以执行相对应的任务。这样就不用在IO等待上浪费资源,这便是事件驱动
的核心思想。
比如你点了两份外卖,外卖A,外卖B。之后你无需时刻打电话去问外卖到了没。外卖到的时候,外卖员会打电话通知你。这中途你就可以做自己的事,不用纯纯等待。还有可以知道是外卖A到了还是外卖B到了,外卖员会告知是哪个外卖到的。
这个就是事件驱动。IO事件准备就绪时,会自动通知用户,并会告知其事件类型。
所以应该是,IO多路复用 + 回调机制 构成了 reactor模式。
IO同步与异步的判断
还有reactor模式是同步的。因为其是使用IO多路复用的,而IO多路复用是同步的。
IO操作是同步还是异步,关键看数据在内核空间与用户空间的拷贝过程(数据读写的IO操作)
reactor模式的优点
网上很多说其可以很好处理高并发,但是我觉得IO多路复用也可以处理的。
还有说可扩展性,通过增加Reactor实例个数来充分利用CPU资源;那通过在其他线程再创建epoll也行的。
所以,我觉得一个很大的优势是:
- 应用处理请求的逻辑,与事件分发框架完全分离,即是解耦,有很高的复用性
即写应用处理时,只需关注如何处理业务逻辑。Reactor框架本身与具体事件处理逻辑无关。
假如是把reactor模式做成一个网络库给用户使用。那用户就只需要关注处理请求的逻辑即可。该网络库对外开放setCallback函数。
假设,用户写服务器服务,收到客户端发来的数据(一串数字),想对数字做加法 或者想对数字做乘法都行。只要使用setCallback函数设置好处理请求的逻辑就行。这就是应用处理请求的逻辑,与事件分发框架完全分离,这是很方便的。
假如该框架是不对外开放的(就是用户不能使用setCallback),像Redis一样,为什么还需要使用reactor模式,为什么不可以只用IO多路复用,不用回调机制呢?
我认为,也是解耦的好处。Redis编写人员需要改变处理请求的逻辑时候,就只改变某个函数即可,不需要深入到epoll框架去改变,这就是很方便的。
6. 通过Redis网络部分,学到如何实现reactor模式
- 先对epoll进行封装,方便后续使用
- 需要对fd进行监听,并注册需要关注的事件类型,并注册关注的事件类型就绪时,需要执行的函数。所以,可以把fd,关注的事件类型,事件就绪执行的函数三者封装在一个结构体aeFileEvent内。
- 我们简单使用epoll时候,肯定是会写 while(1){ epoll_wait(....); }。这就是个事件循环,所以可以再封装个结构体EventLoop, 其也是对epoll的再次封装,即是会调用前面封装好的epoll的函数。EventLoop内部应该有存储aeFileEvent的数组。
- 在调用epoll_wait时,会返回就绪的fd和事件类型,把返回的(就绪fd和事件类型)数组 赋值给EventLoop的aeFileEvent数组。之后就是使用该aeFileEvent,根据事件类型执行对应的回调函数
- 提供setCallback函数,设置好对应的回调函数,即是把需要执行的函数 赋值给 aeFileEvent中的 事件就绪执行函数。
Redis的reactor模式没有使用到结构体event_data_t的指针变量ptr。
若是想逐步实现recactor模式,这里推荐下本人写的0.仿造muduo,实现linux服务器开发思路
该文章专栏有详细的逐步实现的reactor模式的代码流程。欢迎查看。