这一篇将主要集中在服务器读取客户端命令并解析执行的逻辑。
之前在另一篇中我提到过,服务器创建对应客户端连接client对象时,同时设置监听了客户端连接fd。当有数据到达时触发readQueryFromClient。
我们就从这里开始。
readQueryFromClient
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
参数签名其实是对应的ae.c中定义的ae事件处理函数声明:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
参数解析
aeEventLoop: 是redis中ae对epoll的封装
fd: 目标fd,在这里就是客户端socket fd
privdata: 注册函数时传递的数据,在这里就是对应 redisClient对象
mask: 标记监听事件类型,在这里就是AE_WRITABLE或者AE_READABLE
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// 恢复redisClient对象
redisClient *c = (redisClient*) privdata;
// 准备读取缓存,REDIS_IOBUF_LEN == 1024
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 从fd读取数据
nread = read(fd, buf, REDIS_IOBUF_LEN);
// 没有读到数据
if (nread == -1) {
// 检查错误码
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) { //
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
// 字符串拼接,并赋给c->querybuf
c->querybuf = sdscatlen(c->querybuf, buf, nread);
// 更新时间戳
c->lastinteraction = time(NULL);
} else {
return;
}
// 判断状态位,是否阻塞
if (!(c->flags & REDIS_BLOCKED))
// 处理输入
processInputBuffer(c);
}
处理输入缓冲区
static void processInputBuffer(redisClient *c) {
again:
// 判断状态标记,如果被阻塞,则暂停
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
// 判断是否为批量执行命令, -1否
if (c->bulklen == -1) {
// 从缓冲区字符串找到首个换行符位置
char *p = strchr(c->querybuf,'\n');
size_t querylen;
if (p) { // 不为空说明找到了换行符
sds query, *argv;
int argc, j;
query = c->querybuf;
// 重制c->querybuf
c->querybuf = sdsempty();
// 获得首行命令长度
querylen = 1+(p-(query));
// 字符串长度大于命令长度
if (sdslen(query) > querylen) {
// 将剩余数据,存入querybuf
c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
}
// 用\0替换\n
*p = '\0'; /* remove "\n" */
if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
// 刷新字符串长度信息,因为前面通过*p=\0作了截断
sdsupdatelen(query);
// 通过" "对命令截断
argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
sdsfree(query);
// 根据数量argc初始化c->argv
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*argc);
// 遍历argv,分配内存存入c->argv
for (j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
// 命令解析完毕,如果存在有效的命令(c->argc>0)
if (c->argc) {
// 执行命令
if (processCommand(c) && sdslen(c->querybuf)) goto again;
} else {
// argc == 0, 但存在输入缓冲,重新解析
if (sdslen(c->querybuf)) goto again;
}
return;
} else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
// 缓冲区长度超过最大值 256MB
redisLog(REDIS_VERBOSE, "Client protocol error");
freeClient(c);
return;
}
} else {
// 批量执行命令
int qbl = sdslen(c->querybuf);
if (c->bulklen <= qbl) {
/* Copy everything but the final CRLF as final argument */
c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
c->argc++;
c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
/* Process the command. If the client is still valid after
* the processing and there is more data in the buffer
* try to parse it. */
if (processCommand(c) && sdslen(c->querybuf)) goto again;
return;
}
}
}
解析客户端指令, 并执行
static int processCommand(redisClient *c) {
struct redisCommand *cmd;
/* Free some memory if needed (maxmemory setting) */
if (server.maxmemory) freeMemoryIfNeeded();
/* Handle the multi bulk command type. This is an alternative protocol
* supported by Redis in order to receive commands that are composed of
* multiple binary-safe "bulk" arguments. The latency of processing is
* a bit higher but this allows things like multi-sets, so if this
* protocol is used only for MSET and similar commands this is a big win. */
if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
if (c->multibulk <= 0) {
resetClient(c);
return 1;
} else {
decrRefCount(c->argv[c->argc-1]);
c->argc--;
return 1;
}
} else if (c->multibulk) {
if (c->bulklen == -1) {
if (((char*)c->argv[0]->ptr)[0] != '$') {
addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
resetClient(c);
return 1;
} else {
int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
decrRefCount(c->argv[0]);
if (bulklen < 0 || bulklen > 1024*1024*1024) {
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c);
return 1;
}
c->argc--;
c->bulklen = bulklen+2; /* add two bytes for CR+LF */
return 1;
}
} else {
c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
c->mbargv[c->mbargc] = c->argv[0];
c->mbargc++;
c->argc--;
c->multibulk--;
if (c->multibulk == 0) {
robj **auxargv;
int auxargc;
/* Here we need to swap the multi-bulk argc/argv with the
* normal argc/argv of the client structure. */
auxargv = c->argv;
c->argv = c->mbargv;
c->mbargv = auxargv;
auxargc = c->argc;
c->argc = c->mbargc;
c->mbargc = auxargc;
/* We need to set bulklen to something different than -1
* in order for the code below to process the command without
* to try to read the last argument of a bulk command as
* a special argument. */
c->bulklen = 0;
/* continue below and process the command */
} else {
c->bulklen = -1;
return 1;
}
}
}
/* -- end of multi bulk commands processing -- */
/* The QUIT command is handled as a special case. Normal command
* procs are unable to close the client connection safely */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
freeClient(c);
return 0;
}
// 通过指令名寻找对应指令
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) { // 没找到
// 返回失败信息给客户端
addReplySds(c,
sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
(char*)c->argv[0]->ptr));
// 重制client对象状态
resetClient(c);
return 1;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) || // 参数数量未匹配或者参数数量少于所需参数
(c->argc < -cmd->arity)) {
// 返回错误信息给客户端
addReplySds(c,
sdscatprintf(sdsempty(),
"-ERR wrong number of arguments for '%s' command\r\n",
cmd->name));
resetClient(c);
return 1;
} else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
// 当设置了server.maxmemory时,检查已占用内存,如果超过阈值,返回失败信息
addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
resetClient(c);
return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { // 批量处理命令
/* This is a bulk command, we have to read the last argument yet. */
int bulklen = atoi(c->argv[c->argc-1]->ptr);
decrRefCount(c->argv[c->argc-1]);
if (bulklen < 0 || bulklen > 1024*1024*1024) {
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c);
return 1;
}
c->argc--;
c->bulklen = bulklen+2; /* add two bytes for CR+LF */
/* It is possible that the bulk read is already in the
* buffer. Check this condition and handle it accordingly.
* This is just a fast path, alternative to call processInputBuffer().
* It's a good idea since the code is small and this condition
* happens most of the times. */
if ((signed)sdslen(c->querybuf) >= c->bulklen) {
c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
c->argc++;
c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
} else {
/* Otherwise return... there is to read the last argument
* from the socket. */
return 1;
}
}
/* Let's try to share objects on the command arguments vector */
if (server.shareobjects) {
int j;
for(j = 1; j < c->argc; j++)
c->argv[j] = tryObjectSharing(c->argv[j]);
}
/* Let's try to encode the bulk object to save space. */
if (cmd->flags & REDIS_CMD_BULK)
tryObjectEncoding(c->argv[c->argc-1]);
// 如果需要验证,检查客户端是否已经做过验证
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
// 返回错误信息
addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
resetClient(c);
return 1;
}
// 正在执行事务,并且命令不是 execCommand和 discardCommand等两个命令
if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
queueMultiCommand(c,cmd); // 将命令加入队列
addReply(c,shared.queued); // 返回信息
} else {
if (server.vm_enabled && server.vm_max_threads > 0 &&
blockClientOnSwappedKeys(cmd,c)) return 1;
// 执行任务
call(c,cmd);
}
/* Prepare the client for the next command */
resetClient(c);
return 1;
}
static struct redisCommand *lookupCommand(char *name) {
int j = 0;
// 遍历找到对应指令对象
while(cmdTable[j].name != NULL) {
if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
j++;
}
return NULL;
}
static void call(redisClient *c, struct redisCommand *cmd) {
long long dirty;
dirty = server.dirty;
// 执行命令对应的函数
cmd->proc(c);
if (server.appendonly && server.dirty-dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
if (server.dirty-dirty && listLength(server.slaves))
replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
server.stat_numcommands++;
}