Skip to content

Latest commit

 

History

History
330 lines (298 loc) · 22.1 KB

ae.md

File metadata and controls

330 lines (298 loc) · 22.1 KB

Redis中一个基础的事件驱动库

Redis数据库最大的一个特点便是其具有高并发性,可以每秒处理数万次的并发读写操作。除了使用使用单线程以无锁的方式处理核心数据逻辑以避免由于锁争用而带来的性能下降之外,Redis使用了基于I/O多路复用的事件驱动的方式来处理客户端通过网络发送来的请求。而基于I/O多路复用的的事件驱动模式正是一种可以高效处理网络并发请求的编程模式。

我们知道默认被创建套接字文件描述符都是阻塞的,例如acceptconnectreadwrite这些函数在操作阻塞文件描述符时,如果期待事件没有发生,那么函数会将程序阻塞直到有新连接到来、连接建立、有数据可读或者缓冲区现在可写,那么频繁地阻塞程序势必造成程序性能的下降。

通过前面对套接字通用接口一文的介绍,我们可以调用anetNonBlock接口将一个套接字文件描述符设置成非阻塞,然而这样会出现一个问题,当我们针对一个非阻塞套接字文件描述符调用accept等函数时,这些函数总是会立即返回。显然单纯的非阻塞套接字无法提高网络程序的并发性能。

我们期望当有事件就绪的时候在正对非阻塞套接字文件描述符调用对应的函数接口,这样才能够发挥出非阻塞套接字的特性提升系统的并发性能。例如当有连接已经到来的时候,我们在去调用accept函数来接收新连接;当一条与客户端的连接上有数据可读的时候,我们在调用read函数来读取数据。而I/O多路复用正式提供了这样的一种支持,与acceptconnectreadwrite这些函数只能操作单一文件描述符不同,I/O多路复用可以同时监听多个文件描述符,通过selectpollepoll_wait这些调用阻塞等待多个文件描述符,只要其中有一个文件描述符就绪,函数就会返回,我们可以调用对应的事件处理函数来处理不同的事件,这将大大提高服务器的处理效率。

Redis对于基于I/O多路复用的事件驱动的声明与定义在src/ae.hsrc/ae.csrc/ae_epoll.csrc/ae_ecport.csrc/ae_kqueue.csrc/ae_select.c这些文件之中。

事件驱动库的基础数据结构定义与描述

枚举类型描述

事件类型

Redis中,事件驱动需要关注两种事件的处理:

  1. #define AE_FILE_EVENTS 1,文件事件,这类事件主要是用于处理服务器与客户端之间的连接,在Linux系统之中”一切皆文件“,而表示一条TCP连接的套接字实际上也是一个文件描述符。
  2. #define AE_TIME_EVENTS 2,时间事件,这类事件主要用处理服务器的一些需要周期性被执行的操作。例如服务器心跳处理函数serverCron

可触发事件类型

首先在Redis中注册了四种可触发类型:

  1. #define AE_NONE 0,没有事件被注册
  2. #define AE_READABLE 1,当描述符可读时触发,对于服务器的监听套接字文件述符,当有新连接到达时,也就是收到客户端发来的SYN报文时,文件描述符可读;对于服务器与客户端连接的套接字文件描述符,当内核TCP接收缓冲区中有数据时,文件描述符可读。
  3. #define AE_WRITABLE 2,当描述符可写时触发,对于表示连接的套接字文件描述符,当内核TCP发送缓冲区有空余空间时,文件描述符可写;对于客户端调用非阻塞connect函数后的套接字文件描述符,当连接正式建立时,文件描述符可写。
  4. #define AE_BARRIER 4,如果在同一个事件循环迭代中,如果有读事件触发,那么即使可写也不触发该事件。这在我们期望在发送反馈信息前将某些数据持久化到磁盘上时很有用。

数据结构描述

文件事件类型数据结构

/*文件事件结构体*/
typedef struct aeFileEvent {
    int mask;                 //当前时间监听的可触发事件,AE_READABLE|WRITEABLE|BARRIER
    aeFileProc *rfileProc;    //处理可读事件的回调函数函数指针
    aeFileProc *wfileProc;    //处理可写事件的回调函数函数指针
    void *clientData;         //客户端数据
} aeFileEvent;

时间事件类型数据结构

/*时间事件结构体*/
typedef struct aeTimeEvent {
    long long id;                         //时间事件ID
    long when_sec;                        //触发时间秒数
    long when_ms;                         //触发时间毫秒
    aeTimeProc *timeProc;                 //时间事件到期时处理函数的函数指针
    aeEventFinalizerProc *finalizerProc;  //事件最终被删除时处理函数的函数指针
    void *clientData;                     //客户端数据
    struct aeTimeEvent *prev;             //双向链表中前一个时间事件的指针
    struct aeTimeEvent *next;             //双向链表中后一个时间事件的指针
} aeTimeEvent;

触发事件数据结构

/*触发事件结构体,用于表示将要被处理的文件事件*/
typedef struct aeFiredEvent {
    int fd;        //被触发事件的文件描述符
    int mask;      //触发事件的掩码
} aeFiredEvent;

事件循环数据结构类型

对于事件循环的基本数据类型

typedef struct aeEventLoop {
    int maxfd;
    int setsize;
    long long timeEventNextId;
    time_t lastTime;
    aeFileEvent *events;
    aeFiredEvent *fired;
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata;
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

在上述数据结构之中:

  1. aeEventLoop.maxfd,记录当前事件循环之中文件事件类型最大的文件描述符数值。
  2. aeEventLoop.setsize,用于记录当前事件循环之中被追踪的文件描述符的数量。
  3. aeEventLoop.timeEventNextId,对应于aeTimeEvent.id,用于记录下一个时间事件的ID。
  4. aeEventLoop.lastTime,对应上次处理时间事件的时间戳。
  5. aeEventLoop.events,一个aeFileEvent结构体的指针,指向一个动态分配的数组,其长度为aeEventLoop.setsize,用于存储所有被注册的文件事件。
  6. aeEventLoop.fired,一个aeFiredEvent结构体的指针,指向一个动态分配的数组,其长度与aeEventLoop.events一致,用于存储从I/O多路复用之中返回的已被触发的事件。
  7. aeEventLoop.timeEventHead,用于指向存储时间事件双向链表的头指针。
  8. aeEventLoop.stop,用于标记事件循环是否终止的标记。
  9. aeEventLoop.apidata,用于存储调用底层I/O多路复用接口的数据。
  10. aeEventLoop.beforesleep,每次进入事件循环前调用函数的函数指针。
  11. aeEventLoop.aftersleep,每次退出事件循环后调用函数的函数指针。

IO多路复用接口

Redis根据系统所提供的支持,会使用最优的IO多路复用的实现接口:

  1. src/ae_epoll.c,对于实现了epoll接口的系统,使用epoll_wait作为IO多路复用的接口。
  2. src/ae_kqueue.c,对于Mac OS系统,使用kqueue作为IO多路复用的接口。
  3. src/ae_select.c,对于老的系统,使用select作为IO多路复用的接口。

而在事件循环的数据结构中的aeEventLoop.apidata这个字段会根据系统使用的IO多路复用接口的不同,保存所需要的数据结构。 对于使用epoll接口,apidata这个字段所保存的数据结构为aeApiState,其定义为:

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event
{
  uint32_t events;   /* Epoll events */
  epoll_data_t data;    /* User data variable */
} __attribute__ ((__packed__));

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

这其中:

  1. aeApiState.epfd,是调用epoll_create系统调用创建的epoll文件描述符,系统可以通过阻塞监听这个文件描述符来监听其他一组文件描述符的状态变化。
  2. eaApiState.events,对应于aeEventLoop.events这个字段,也是一个动态分配的数组,存储epoll监听事件对应的数据结构。

在上述这些源文件中,定义了一组静态函数,用于封装系统的IO多路复用接口,供Redis事件循环也就是aeEventLoop调用(以epoll系统调用为例):

  1. static int aeApiCreate(aeEventLoop *eventLoop),通过调用epoll_create创建epoll接口的文件描述符,同时为epoll_event事件队列分配空间。
  2. static int aeApiResize(aeEventLoop *eventLoop, int setsize),调整事件循环中epoll_event队列的大小。
  3. static void aeApiFree(aeEventLoop *eventLoop),释放事件循环中,为epoll接口数据分配的相关资源。
  4. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask),通过调用epoll_ctl系统调用,向内核注册监听一个文件描述符,或者更新一个文件描述符的监听事件。
  5. static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask),通过调用epoll_ctl系统调用,删除一个已监听文件描述符上的事件,或者删除一个已监听的事件描述符。
  6. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp),该组静态函数的核心,通过调用epoll_wait等待监听的文件描述符上事件触发。

aeApiPoll函数中:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    //阻塞在epoll_wait,等待事件ready
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            //将从内核中返回的就绪事件提取出来
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            //将内核中返回的epoll_event上的文件描述符以及就绪事件拷贝到事件循环aeFiredEvent队列中
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
}

使用epoll接口实现的aeApiPoll函数其主要流程为:

  1. 通过aeApiAddEvent函数注册所需要监听的文件描述符。
  2. 通过epoll_wait这个系统调会以阻塞的方式等待前面注册的文件描述符上的事件,当有事件就绪,该函数会返回就绪事件的个数。
  3. 通过循环遍历,将内核中返回的就绪事件提取出来,将其复制到Redis事件循环的已触发事件队列aeEventLoop.fired上面。

这样,通过上述的步骤,Redis的事件循环便可以通过epoll_wait接口,从内核中取出一系列的就绪事件,然后对所有的就绪事件进行处理。

事件循环接口函数定义

前面我们在系统调用层面介绍了事件驱动的实现细节,下面我们来看一下Redis如何应用系统调用来实现事件驱动的事件循环的,首先我们先来看一下相关的函数接口:

  1. aeEventLoop *aeCreateEventLoop(int setsize),这个函数用于创建并初始化一个事件循环aeEventLoop的结构体,初始可以监听的文件描述符列表aeEventLoop.events的长度会被设置为setsize
  2. void aeDeleteEventLoop(aeEventLoop *eventLoop),在程序结束时释放整个事件循环结构体。
  3. void aeStop(aeEventLoop *eventLoop),用于设置aeEventLoop.stop停止标记。
  4. int aeGetSetSize(aeEventLoop *eventLoop),获取事件循环中监听队列大小。
  5. int aeResizeSetSize(aeEventLoop *eventLoop, int setsize),调整事件循环的监听队列大小。
  6. void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep),设置事件驱动进入事件循环前需要执行函数接口。
  7. void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep),设置事件驱动结束一次事件循环时需要执行的函数接口。
  8. int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData),向事件循环之中创建一个文件事件,设置其监听的事件掩码mask以及事件处理函数proc;同时会设置客户端数据clientData,这个数据将会被赋值给文件事件结构体aeFileEvent.clientData字段中。
  9. void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask),从事件循环中删除指定文件描述符fd的文件事件上监听的mask事件。
  10. int aeGetFileEvents(aeEventLoop *eventLoop, int fd),根据文件描述符找出对应的文件事件在事件循环之中监听的事件掩码。
  11. long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc),向事件循环中添加一个milliseconds毫秒后被触发的时间事件,设定其处函数proc以及客户端数据clientData,并且可以选择性设定结束处理函数finalizerProcRedis会根据aeEventLoop.timeEventNextId为新的事件事件分配一个序号,记录在aeTimeEvent.id字段上。
  12. int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id),给定一个时间事件序号,将其从事件循环之中删除。
  13. void aeMain(aeEventLoop *eventLoop),事件驱动执行主程序。
  14. int aeProcessEvents(aeEventLoop *eventLoop, int flags),事件循环处理函数。
  15. int aeWait(int fd, int mask, long long milliseconds),在某个文件描述符fd上阻塞等待,直到事件mask被触发,或者阻塞时间超过milliseconds

下面我们来看一下其中几个比较重要的函数接口。

首先是aeMain这个事件驱动的入口函数,其在Redismain函数之中被调用,用以启动事件循环:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        //如果调用aeSetBeforeSleepProc设置了beforesleep回调,那么在启动一次事件循环开启时,会执行beforesleep调用
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //启动一次事件循环,处理文件事件以及时间事件,同时会尝试调用aftersleep
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

这其中aeProcessEvents函数是处理一次事件循环的主体,处理每个待处理的时间事件,然后处理每个待处理的文件事件(可以通过刚刚处理的时间事件回调来注册)。 没有特殊标志的情况下,该函数将一直休眠,直到引发某些文件事件或下次发生事件(如果有)时为止。

  • 如果flags为0,则该函数不执行任何操作并返回。
  • 如果flags设置了AE_ALL_EVENTS,则处理所有类型的事件。
  • 如果flags设置了AE_FILE_EVENTS,则处理文件事件。
  • 如果flags设置了AE_TIME_EVENTS,则将处理时间事件。
  • 如果flags设置了AE_DONT_WAIT,则该函数将尽快返回直到所有。
  • 如果flags设置了AE_CALL_AFTER_SLEEP,则将调用aftersleep回调,这些事件可能无需等待就可以处理。

这个函数会返回已处理事件的数量。

aeProcessEvents函数的基本流程为:

  1. 通过调用aeSearchNearestTimer这个静态函数来找到最近一个要到期的时间时间,获取其到期时间以确定aeApiPoll等待的timeval,这段逻辑可以保证,在即使没有文件事件触发的情况下,aeApiPoll仍然可以在最近的一次计时器时间事件到来之前返回,而不会错过对后续时间事件的处理。其具体实现代码为:
    //如果设置了AE_TIME_EVENTS标记,同时没有设置AE_DONT_WAIT,搜索最近的时间事件
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
        shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
        //通过shortest这个时间事件,来计算等待的timeval
        long now_sec, now_ms;
        aeGetTime(&now_sec, &now_ms);
        ...
    } else {
        if (flags & AE_DONT_WAIT) {
            //如果设置了AE_DONT_WAIT,那么设置timeval为0,aeApiPoll立即返回
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        } else {
            //一直等待,知道有事件触发
            tvp = NULL; /* wait forever */
        }            
    }
  1. 调用封装的aeApiPoll这个核心I/O多路复用API,等待文件事件上的时间触发触发返回被触发事件的数量,或者超时进入时间事件的处理流程:
    numevents = aeApiPoll(eventLoop, tvp);
  1. 如果设置了AE_CALL_AFTER_SLEEP标记,同时通过调用aeSetAfterSleepProc设置了aftersleep,那么会在aeApiPoll唤醒返回后,执行aftersleep逻辑:
    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        eventLoop->aftersleep(eventLoop);
  1. 由于在aeApiPoll接口中已经将从内核返回的就绪事件拷贝到了事件循环中的fired就绪事件列表中了,后续会循环处理所有的就绪事件,通常情况下,我们会先处理可读事件,然后执行可写事件的处理逻辑,这在类似于执行查询操作后立即反馈查询结果这样的场景中很有用。但是如果在flags掩码中设置了AE_BARRIER标记,那么Redis会要求我们将执行顺序翻转过来,也就是在可读事件之后,绝不触发可写事件的处理。例如当我们需要在响应客户端请求之前,通过调用beforeSleep执行类似同步文件到磁盘的操作时,会很用:
    for (j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        ...
        int invert = fe->mask & AE_BARRIER;
        if (!invert && fe->mask & mask & AE_READABLE) {
            //不需要翻转操作时,调用rfileProc处理文件描述符上的可读事件
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            fired++;
        }
        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                //处理文件描述符上的可写事件
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }
        if (invert && fe->mask & mask & AE_READABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                //需要翻转操作时,在处理完可写事件后,处理读事件
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
        }
    }
  1. 调用processTimeEvents来处理事件事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

现在我们来看一下处理时间事件的processTimeEvents函数,其基础逻辑为:

  1. 如果系统的时钟曾经被移动到未来,那么在再次处理时间事件时,需要尝试对时钟进行修正。同时当这种情况发生时,Redis将会强制处理所有队列之中时间事件,通过将aeTimeEvent.when_sec设置为0来实现强制提前处理,Redis的作者认为当系统时间出现错位时,提早处理时间事件要比延后处理这些事件更为安全。
    time_t now = time(NULL);
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;
  1. 从双向链表中删除已近被调度过的,同时被设置为AE_DELETED_EVENT_ID的时间事件,如果这个时间时间被设置了finalizerProc回调,那么在释放这个时间事件之前,会调用finalizerProc来处理相关释放逻辑:
    while(te) {
        ...
        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            
            ...

            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        ...
    }
  1. 处理已经到期的时间事件,会调用时间事件timeProc的处理函数,来处理到期触发的计时器时间事件,如果这个时间事件是一个一次性触发的时间事件,那么会返回AE_NOMORE标记,这时将该事件设置为AE_DELETED_EVENT_ID,在下次事件循环中删除这个事件;如果这个时间事件是一个周期触发的时间事件,那么timeProc会返回下次触发的毫秒数,通过aeAddMillisecondsToNow,将新的时间更新到这个时间事件上:
    while(te) {
        ...
        if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }

最后我们来看一下aeWait这个函数接口,这个函数会调用poll这个I/O多路复用接口来等待给定的文件描述符若干毫秒,这个函数接口是在aeMain之外的另外一个将程序阻塞的接口,两者的区别在于aeMain函数阻塞等待多个文件描述符,而aeWait函数只能阻塞给定的文件描述符。为何RedisaeMain的基础上还要实现一个aeWait接口呢?这个接口主要用于Redis的一些同步阻塞操作,例如在Master实例与Slave实例之间使用SYNC命令以同步阻塞的方式同步数据。


公众号二维码

喜欢的同学可以扫描二维码,关注我的微信公众号,马基雅维利incoding