Tergel

Redis源码阅读1

管你有多少并发,总之先上Redis再说

这一篇将主要集中在服务器读取客户端命令并解析执行的逻辑。

之前在另一篇中我提到过,服务器创建对应客户端连接client对象时,同时设置监听了客户端连接fd。当有数据到达时触发readQueryFromClient。

我们就从这里开始。

readQueryFromClient

static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)

参数签名其实是对应的ae.c中定义的ae事件处理函数声明:

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

参数解析

aeEventLoop: 是redis中ae对epoll的封装

fd: 目标fd,在这里就是客户端socket fd

privdata: 注册函数时传递的数据,在这里就是对应 redisClient对象

mask: 标记监听事件类型,在这里就是AE_WRITABLE或者AE_READABLE


static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
	// 恢复redisClient对象
    redisClient *c = (redisClient*) privdata;
    // 准备读取缓存,REDIS_IOBUF_LEN == 1024
    char buf[REDIS_IOBUF_LEN];
    int nread;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    // 从fd读取数据
    nread = read(fd, buf, REDIS_IOBUF_LEN);
    // 没有读到数据
    if (nread == -1) {
		    // 检查错误码
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) { //
        redisLog(REDIS_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }
    if (nread) {
		// 字符串拼接,并赋给c->querybuf
        c->querybuf = sdscatlen(c->querybuf, buf, nread);
	    // 更新时间戳
        c->lastinteraction = time(NULL);
    } else {
        return;
    }
    // 判断状态位,是否阻塞
    if (!(c->flags & REDIS_BLOCKED))
		// 处理输入
        processInputBuffer(c);
}

处理输入缓冲区

static void processInputBuffer(redisClient *c) {
again:
    // 判断状态标记,如果被阻塞,则暂停
    if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
    // 判断是否为批量执行命令, -1否
    if (c->bulklen == -1) {
        // 从缓冲区字符串找到首个换行符位置
        char *p = strchr(c->querybuf,'\n');
        size_t querylen;

        if (p) { // 不为空说明找到了换行符
            sds query, *argv;
            int argc, j;
            query = c->querybuf;
            // 重制c->querybuf
            c->querybuf = sdsempty();
            // 获得首行命令长度
            querylen = 1+(p-(query));
            // 字符串长度大于命令长度
            if (sdslen(query) > querylen) {
		            // 将剩余数据,存入querybuf
                c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
            }
            // 用\0替换\n
            *p = '\0'; /* remove "\n" */
            if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
            // 刷新字符串长度信息,因为前面通过*p=\0作了截断
            sdsupdatelen(query);
						// 通过" "对命令截断
            argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
            sdsfree(query);

						// 根据数量argc初始化c->argv
            if (c->argv) zfree(c->argv);
            c->argv = zmalloc(sizeof(robj*)*argc);

						// 遍历argv,分配内存存入c->argv
            for (j = 0; j < argc; j++) {
                if (sdslen(argv[j])) {
                    c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
                    c->argc++;
                } else {
                    sdsfree(argv[j]);
                }
            }
            zfree(argv);
            // 命令解析完毕,如果存在有效的命令(c->argc>0)
            if (c->argc) {
                // 执行命令
                if (processCommand(c) && sdslen(c->querybuf)) goto again;
            } else {
                // argc == 0, 但存在输入缓冲,重新解析
                if (sdslen(c->querybuf)) goto again;
            }
            return;
        } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
		        // 缓冲区长度超过最大值 256MB
            redisLog(REDIS_VERBOSE, "Client protocol error");
            freeClient(c);
            return;
        }
    } else {
		    // 批量执行命令
        int qbl = sdslen(c->querybuf);

        if (c->bulklen <= qbl) {
            /* Copy everything but the final CRLF as final argument */
            c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
            c->argc++;
            c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
            /* Process the command. If the client is still valid after
             * the processing and there is more data in the buffer
             * try to parse it. */
            if (processCommand(c) && sdslen(c->querybuf)) goto again;
            return;
        }
    }
}

解析客户端指令, 并执行

static int processCommand(redisClient *c) {
    struct redisCommand *cmd;

    /* Free some memory if needed (maxmemory setting) */
    if (server.maxmemory) freeMemoryIfNeeded();

    /* Handle the multi bulk command type. This is an alternative protocol
     * supported by Redis in order to receive commands that are composed of
     * multiple binary-safe "bulk" arguments. The latency of processing is
     * a bit higher but this allows things like multi-sets, so if this
     * protocol is used only for MSET and similar commands this is a big win. */

    if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
        c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
        if (c->multibulk <= 0) {
            resetClient(c);
            return 1;
        } else {
            decrRefCount(c->argv[c->argc-1]);
            c->argc--;
            return 1;
        }
    } else if (c->multibulk) {
        if (c->bulklen == -1) {
            if (((char*)c->argv[0]->ptr)[0] != '$') {
                addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
                resetClient(c);
                return 1;
            } else {
                int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
                decrRefCount(c->argv[0]);
                if (bulklen < 0 || bulklen > 1024*1024*1024) {
                    c->argc--;
                    addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
                    resetClient(c);
                    return 1;
                }
                c->argc--;
                c->bulklen = bulklen+2; /* add two bytes for CR+LF */
                return 1;
            }
        } else {
            c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
            c->mbargv[c->mbargc] = c->argv[0];
            c->mbargc++;
            c->argc--;
            c->multibulk--;
            if (c->multibulk == 0) {
                robj **auxargv;
                int auxargc;

                /* Here we need to swap the multi-bulk argc/argv with the
                 * normal argc/argv of the client structure. */
                auxargv = c->argv;
                c->argv = c->mbargv;
                c->mbargv = auxargv;

                auxargc = c->argc;
                c->argc = c->mbargc;
                c->mbargc = auxargc;

                /* We need to set bulklen to something different than -1
                 * in order for the code below to process the command without
                 * to try to read the last argument of a bulk command as
                 * a special argument. */
                c->bulklen = 0;
                /* continue below and process the command */
            } else {
                c->bulklen = -1;
                return 1;
            }
        }
    }
    /* -- end of multi bulk commands processing -- */

    /* The QUIT command is handled as a special case. Normal command
     * procs are unable to close the client connection safely */
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        freeClient(c);
        return 0;
    }

    // 通过指令名寻找对应指令
    cmd = lookupCommand(c->argv[0]->ptr);
    if (!cmd) { // 没找到
		    // 返回失败信息给客户端
        addReplySds(c,
            sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
                (char*)c->argv[0]->ptr));
        // 重制client对象状态
        resetClient(c);
        return 1;
    } else if ((cmd->arity > 0 && cmd->arity != c->argc) || // 参数数量未匹配或者参数数量少于所需参数
               (c->argc < -cmd->arity)) {
        // 返回错误信息给客户端
        addReplySds(c,
            sdscatprintf(sdsempty(),
                "-ERR wrong number of arguments for '%s' command\r\n",
                cmd->name));
        resetClient(c);
        return 1;
    } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
		    // 当设置了server.maxmemory时,检查已占用内存,如果超过阈值,返回失败信息
        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
        resetClient(c);
        return 1;
    } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { // 批量处理命令
        /* This is a bulk command, we have to read the last argument yet. */
        int bulklen = atoi(c->argv[c->argc-1]->ptr);

        decrRefCount(c->argv[c->argc-1]);
        if (bulklen < 0 || bulklen > 1024*1024*1024) {
            c->argc--;
            addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
            resetClient(c);
            return 1;
        }
        c->argc--;
        c->bulklen = bulklen+2; /* add two bytes for CR+LF */
        /* It is possible that the bulk read is already in the
         * buffer. Check this condition and handle it accordingly.
         * This is just a fast path, alternative to call processInputBuffer().
         * It's a good idea since the code is small and this condition
         * happens most of the times. */
        if ((signed)sdslen(c->querybuf) >= c->bulklen) {
            c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
            c->argc++;
            c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
        } else {
            /* Otherwise return... there is to read the last argument
             * from the socket. */
            return 1;
        }
    }
    /* Let's try to share objects on the command arguments vector */
    if (server.shareobjects) {
        int j;
        for(j = 1; j < c->argc; j++)
            c->argv[j] = tryObjectSharing(c->argv[j]);
    }
    /* Let's try to encode the bulk object to save space. */
    if (cmd->flags & REDIS_CMD_BULK)
        tryObjectEncoding(c->argv[c->argc-1]);

    // 如果需要验证,检查客户端是否已经做过验证
    if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
		    // 返回错误信息
        addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
        resetClient(c);
        return 1;
    }

    // 正在执行事务,并且命令不是 execCommand和 discardCommand等两个命令
    if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
        queueMultiCommand(c,cmd); // 将命令加入队列
        addReply(c,shared.queued); // 返回信息
    } else {
        if (server.vm_enabled && server.vm_max_threads > 0 &&
            blockClientOnSwappedKeys(cmd,c)) return 1;
        // 执行任务
        call(c,cmd);
    }

    /* Prepare the client for the next command */
    resetClient(c);
    return 1;
}

static struct redisCommand *lookupCommand(char *name) {
    int j = 0;
    // 遍历找到对应指令对象
    while(cmdTable[j].name != NULL) {
        if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
        j++;
    }
    return NULL;
}

static void call(redisClient *c, struct redisCommand *cmd) {
    long long dirty;

    dirty = server.dirty;
    // 执行命令对应的函数
    cmd->proc(c);
    if (server.appendonly && server.dirty-dirty)
        feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
    if (server.dirty-dirty && listLength(server.slaves))
        replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}