标题起得很大,但这篇只是简单了解epoll机制和相关API的文章,没有很深入的内容。 简单讲,epoll就是由一个“实例”还有相关注册/注销fd的函数和等待函数组成。
create
既然说是实例,那么就有对应的创建函数,历史原因epoll现在有两个创建函数,分别叫 epoll_create(int size)和epoll_create1(int flags)。
epoll_create(int size)
返回值是epoll实例对应的fd
早期实现中在创建时需要传入一个int值,提示内核预期要监听的fd数量。 在后来的linux版本中(>=2.6.8)这个参数被忽略了,因为新版本使用了动态分配的方案,但为了向后兼容这个值还是必须大于0。
epoll_create1(int flags)
参数flags目前只有两种可能值
0: 与epoll_create行为相同
EPOLL_CLOEXEC: 新创建的epoll文件描述符会被设置为close-on-exec (FD_CLOEXEC)
close-on-exec是内核提供的特性。
背景是,操作系统fork进程时新程序会继承调用进程的大部分属性,包括打开的文件描述符。而close-on-exec标志,会在执行exec调用时自动关闭。
所以这里的目的就是防止epoll的fd暴露给子进程。
相关时间线
epoll_create忽略size参数是在Linux v2.6.8
epoll_create1引入是在v2.6.27
所以也就不会有flags为0时size是多少的问题。出现epoll_create1时epoll早已经变成了使用动态分配内存的方式。
control
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
通过epfd对指定epoll实例添加/删除监听的fd
op可能的值有: EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL,根据名字也能知道分别对应添加,修改和删除。
event指定要监听的事件类型
struct epoll_event {
uint32_t events; // 成员events指定要监听的事件
epoll_data_t data; // 用户自定义数据
}
events可选值有:
- EPOLLIN(0x001):准备好读取数据, 比如fd有有新的数据到达或有新的连接请求
- EPOLLPRI(0x002):有紧急数据可读
- EPOLLOUT(0x004): 准备好写入数据,比如fd缓冲区有足够空间写入数据
- EPOLLERR(0x008)fd发生错误
- EPOLLHUP(0x010): 发生挂起,比如远程客户端关闭连接
以下4个是更细粒度的读写事件
EPOLLRDNORM(0x040) 普通数据可读
EPOLLRDBAND(0x080): 有优先级数据可以读取
EPOLLWRNORM(0x100): 可以向文件描述符写入普通数据而不会阻塞
EPOLLWRBAND(0x200): 可以写入优先级数据
EPOLLMSG(0x400): EPOLLMSG主要与POSIX消息队列(mqueue)相关。它用于通知应用程序消息队列中有新消息到达
EPOLLRDHUP(0x2000): 对端关闭连接,或关闭了写入半连接
EPOLLEXCLUSIVE(1U«28): 设置独占唤醒模式,防止在多进程/多线程环境中的"惊群效应"
EPOLLWAKEUP(1U«29):设置了这个标志,并且进程有CAP_BLOCK_SUSPEND权限,epoll事件可以唤醒被挂起的系统
EPOLLONESHOT(1U«30): 使得事件仅报告一次,之后需要重新添加才能再次检测
EPOLLET(1U«31) :将epoll设置为边缘触发模式,而不是默认的水平触发模式
两种模式的区别:
水平触发模式(Level Triggered, LT): 在这种模式下,只要文件描述符就绪,每次调用epoll_wait都会返回该事件
边缘触发模式(Edge Triggered, ET): 仅在状态发生变化时才触发事件
observe
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待epoll实例上的事件, 监听哪些事件由上面的epoll_ctl设置。
参数: events参数用于接收发生的事件数组 maxevents表示events数组的大小 timeout超时事件,-1表示无限等待,0表示立即返回。
epoll_wait会等待实例上的事件,它会阻塞调用进程,直到发生
- 被监听的fd有事件发生
- 被信号处理程序中断
- 超时时间到
返回值是就绪的fd数量
redis源码示例
ae.c
redis将相关api封装到了ae.c(async-event)中。根据平台不同可以用不同的多路复用技术(epoll, kqueue, select)实现,这里主要看一下epoll。(kqueue的函数好像更精炼)。
创建
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd;
long long timeEventNextId;
aeFileEvent events[AE_SETSIZE]; /* Registered events */
aeFiredEvent fired[AE_SETSIZE]; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
#define AE_SETSIZE (1024*10) /* Max number of fd supported */
typedef struct aeApiState {
int epfd;
struct epoll_event events[AE_SETSIZE];
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
if (state->epfd == -1) return -1;
eventLoop->apidata = state;
return 0;
}
注册fd
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= AE_SETSIZE) return AE_ERR;
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
if (fd >= AE_SETSIZE) return;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;
for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
aeApiDelEvent(eventLoop, fd, mask);
}
等待
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,AE_SETSIZE,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
redis.c
上面ae.c只是对epoll的封装,接下来简单看一下具体的使用场景:
在应用main函数中调用initServer初始化了redis服务器相关配置,其中就包括创建服务器socket监听fd,创建成功后通过aeCreateFileEvent监听这个fd。当有可读数据(AE_READABLE)到达时,便会触发 acceptHandler函数。
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL) == AE_ERR) oom("creating file event");acceptHandler内会调用createClient,针对每个客户端链接创建对应client对象。创建完以后同样调用aeCreateFileEvent监听该客户端链接上的数据。当有客户端请求到达时,便触发readQueryFromClient函数。
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /*省略*/ cfd = anetAccept(server.neterr, fd, cip, &cport); /*省略*/ if ((c = createClient(cfd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); close(cfd); /* May be already closed, just ingore errors */ return; } /*省略*/ } static redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(*c)); if(!c) return NULL; c->fd = fd; /*省略*/ if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) { freeClient(c); return NULL; } /*省略*/ return c; }readQueryFromClient从客户端连接fd中读取发过来的命令,解析并执行,但执行结果没有立即返回给该fd。而是加入该连接对象的缓冲队列中。
static void addReply(redisClient *c, robj *obj) { if (listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return; if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { obj = dupStringObject(obj); obj->refcount = 0; /* getDecodedObject() will increment the refcount */ } listAddNodeTail(c->reply,getDecodedObject(obj)); }可以看到,对客户端连接fd创建了一个 AE_WRITABLE的监听,在fd实际变为可写时才将队列 c->reply中的数据写入fd中(sendReplyToClient)。
除此之外还有虚拟内存(redis v1.3.6),serverCron和主从拷贝相关的事件注册,这里就不展开了。
总结
可以看到,redis通过epoll_create(1024)创建了epoll实例。另外在注册fd监听时,也没有采用很复杂的事件类型,只有EPOLLIN和EPOLLOUT。对应AE_READABLE和AE_WRITABLE。
上面没有提到等待函数, 实际上在初始化server时,最后一步会调用aeMain,内部就是调用等待函数aeApiPoll根据返回值更新ae持有的对象状态,并触发函数了。
这篇结束。