Tergel

I/O多路复用

如果理解了epoll, 那么就不需要理解epoll

标题起得很大,但这篇只是简单了解epoll机制和相关API的文章,没有很深入的内容。 简单讲,epoll就是由一个“实例”还有相关注册/注销fd的函数和等待函数组成。

create

既然说是实例,那么就有对应的创建函数,历史原因epoll现在有两个创建函数,分别叫 epoll_create(int size)epoll_create1(int flags)

epoll_create(int size)

返回值是epoll实例对应的fd

早期实现中在创建时需要传入一个int值,提示内核预期要监听的fd数量。 在后来的linux版本中(>=2.6.8)这个参数被忽略了,因为新版本使用了动态分配的方案,但为了向后兼容这个值还是必须大于0。

epoll_create1(int flags)

参数flags目前只有两种可能值

0: 与epoll_create行为相同

EPOLL_CLOEXEC: 新创建的epoll文件描述符会被设置为close-on-exec (FD_CLOEXEC)

close-on-exec是内核提供的特性。

背景是,操作系统fork进程时新程序会继承调用进程的大部分属性,包括打开的文件描述符。而close-on-exec标志,会在执行exec调用时自动关闭。

所以这里的目的就是防止epoll的fd暴露给子进程。

相关时间线

epoll_create忽略size参数是在Linux v2.6.8

epoll_create1引入是在v2.6.27

所以也就不会有flags为0时size是多少的问题。出现epoll_create1时epoll早已经变成了使用动态分配内存的方式。

control

epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

通过epfd对指定epoll实例添加/删除监听的fd

op可能的值有: EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL,根据名字也能知道分别对应添加,修改和删除。

event指定要监听的事件类型

struct epoll_event {
	uint32_t events; // 成员events指定要监听的事件
	epoll_data_t data; // 用户自定义数据
}

events可选值有:

  • EPOLLIN(0x001):准备好读取数据, 比如fd有有新的数据到达或有新的连接请求
  • EPOLLPRI(0x002):有紧急数据可读
  • EPOLLOUT(0x004): 准备好写入数据,比如fd缓冲区有足够空间写入数据
  • EPOLLERR(0x008)fd发生错误
  • EPOLLHUP(0x010): 发生挂起,比如远程客户端关闭连接

以下4个是更细粒度的读写事件

  • EPOLLRDNORM(0x040) 普通数据可读

  • EPOLLRDBAND(0x080): 有优先级数据可以读取

  • EPOLLWRNORM(0x100): 可以向文件描述符写入普通数据而不会阻塞

  • EPOLLWRBAND(0x200): 可以写入优先级数据

  • EPOLLMSG(0x400): EPOLLMSG主要与POSIX消息队列(mqueue)相关。它用于通知应用程序消息队列中有新消息到达

  • EPOLLRDHUP(0x2000): 对端关闭连接,或关闭了写入半连接

  • EPOLLEXCLUSIVE(1U«28): 设置独占唤醒模式,防止在多进程/多线程环境中的"惊群效应"

  • EPOLLWAKEUP(1U«29):设置了这个标志,并且进程有CAP_BLOCK_SUSPEND权限,epoll事件可以唤醒被挂起的系统

  • EPOLLONESHOT(1U«30): 使得事件仅报告一次,之后需要重新添加才能再次检测

  • EPOLLET(1U«31) :将epoll设置为边缘触发模式,而不是默认的水平触发模式

两种模式的区别:

水平触发模式(Level Triggered, LT): 在这种模式下,只要文件描述符就绪,每次调用epoll_wait都会返回该事件

边缘触发模式(Edge Triggered, ET): 仅在状态发生变化时才触发事件

observe

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

等待epoll实例上的事件, 监听哪些事件由上面的epoll_ctl设置。

参数: events参数用于接收发生的事件数组 maxevents表示events数组的大小 timeout超时事件,-1表示无限等待,0表示立即返回。

epoll_wait会等待实例上的事件,它会阻塞调用进程,直到发生

  • 被监听的fd有事件发生
  • 被信号处理程序中断
  • 超时时间到

返回值是就绪的fd数量

redis源码示例

ae.c

redis将相关api封装到了ae.c(async-event)中。根据平台不同可以用不同的多路复用技术(epoll, kqueue, select)实现,这里主要看一下epoll。(kqueue的函数好像更精炼)。

创建

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;
    long long timeEventNextId;
    aeFileEvent events[AE_SETSIZE]; /* Registered events */
    aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;

#define AE_SETSIZE (1024*10)    /* Max number of fd supported */
typedef struct aeApiState {
    int epfd;
    struct epoll_event events[AE_SETSIZE];
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
    if (state->epfd == -1) return -1;
    eventLoop->apidata = state;
    return 0;
}
  

注册fd

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}


static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    int mask = eventLoop->events[fd].mask & (~delmask);

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
         * EPOLL_CTL_DEL. */
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}
  
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= AE_SETSIZE) return AE_ERR;
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    if (fd >= AE_SETSIZE) return;
    aeFileEvent *fe = &eventLoop->events[fd];

    if (fe->mask == AE_NONE) return;
    fe->mask = fe->mask & (~mask);
    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
        /* Update the max fd */
        int j;

        for (j = eventLoop->maxfd-1; j >= 0; j--)
            if (eventLoop->events[j].mask != AE_NONE) break;
        eventLoop->maxfd = j;
    }
    aeApiDelEvent(eventLoop, fd, mask);
}

等待

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

redis.c

上面ae.c只是对epoll的封装,接下来简单看一下具体的使用场景:

  1. 在应用main函数中调用initServer初始化了redis服务器相关配置,其中就包括创建服务器socket监听fd,创建成功后通过aeCreateFileEvent监听这个fd。当有可读数据(AE_READABLE)到达时,便会触发 acceptHandler函数。

    server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
    
    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL) == AE_ERR) 
      oom("creating file event");
    
  2. acceptHandler内会调用createClient,针对每个客户端链接创建对应client对象。创建完以后同样调用aeCreateFileEvent监听该客户端链接上的数据。当有客户端请求到达时,便触发readQueryFromClient函数。

    static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        /*省略*/
        cfd = anetAccept(server.neterr, fd, cip, &cport);
        /*省略*/
        if ((c = createClient(cfd)) == NULL) {
          redisLog(REDIS_WARNING,"Error allocating resoures for the client");
            close(cfd); /* May be already closed, just ingore errors */
            return;
        }
      	/*省略*/
    }
    static redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(*c));
      	if(!c) return NULL;
        c->fd = fd;
    		/*省略*/
        if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
            readQueryFromClient, c) == AE_ERR) {
            freeClient(c);
            return NULL;
        }
    		/*省略*/
        return c;
    }
    
  3. readQueryFromClient从客户端连接fd中读取发过来的命令,解析并执行,但执行结果没有立即返回给该fd。而是加入该连接对象的缓冲队列中。

    static void addReply(redisClient *c, robj *obj) {
        if (listLength(c->reply) == 0 &&
            (c->replstate == REDIS_REPL_NONE ||
             c->replstate == REDIS_REPL_ONLINE) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
            sendReplyToClient, c) == AE_ERR) return;
    
        if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
            obj = dupStringObject(obj);
            obj->refcount = 0; /* getDecodedObject() will increment the refcount */
        }
        listAddNodeTail(c->reply,getDecodedObject(obj));
    }
    

    可以看到,对客户端连接fd创建了一个 AE_WRITABLE的监听,在fd实际变为可写时才将队列 c->reply中的数据写入fd中(sendReplyToClient)。

  4. 除此之外还有虚拟内存(redis v1.3.6),serverCron和主从拷贝相关的事件注册,这里就不展开了。

总结

可以看到,redis通过epoll_create(1024)创建了epoll实例。另外在注册fd监听时,也没有采用很复杂的事件类型,只有EPOLLIN和EPOLLOUT。对应AE_READABLE和AE_WRITABLE。

上面没有提到等待函数, 实际上在初始化server时,最后一步会调用aeMain,内部就是调用等待函数aeApiPoll根据返回值更新ae持有的对象状态,并触发函数了。

这篇结束。