Redis 序列化协议 RESP

(1) RESP协议是什么

Redis Serialization ProtocolRESP)是Redis客户端服务器之间使用的通信协议。
它是一种简单、高效的文本协议,易于实现和解析。

(1.1) 协议定义

以下是一个简单的RESP协议示例:

redis-client 发给 redis-server 的数据。

// querybuf =  "*6\r\n$3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
*6
$3
set
$19
9223372036854775806
$1
1
$2
ex
$5
86400
$2
nx

这个示例对应的redis命令 set 9223372036854775806 1 ex 86400 nx

这个示例表示一个SET命令,用于将键 9223372036854775806 的值设置为 1 。
RESP协议使用特殊的符号(如*、$、\r\n)来表示不同类型的数据和分隔符。


(2) 源码解读

在Redis源码中,RESP的实现主要分布在以下几个文件中:

src/networking.hsrc/networking.c:这些文件包含了Redis服务器与客户端之间的网络通信相关的函数,如创建服务器套接字、处理客户端连接、读取和写入数据等。

src/protocol.c:这个文件包含了RESP协议的解析和格式化函数,如processInputBuffer(处理客户端发送的命令)、addReply*系列函数(向客户端发送响应)等。

src/sds.hsrc/sds.c:简单动态字符串(SDS,Simple Dynamic String)是RESP的基础数据结构。SDS提供了对字符串的高效操作,如追加、截取等。

todo 整体处理流程

(2.1) 处理输入缓冲-processInputBuffer()

// filepath: redis/src/networking.c

/* This function is called every time, in the client structure 'c', there is
 * more query buffer to process, because we read more data from the socket
 * or because a client was blocked and later reactivated, so there could be
 * pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
    /* Keep processing while there is something in the input buffer */
    // 循环遍历输入缓冲区
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && 
            !(c->flags & CLIENT_PENDING_READ) && 
            clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;

        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        if (c->reqtype == PROTO_REQ_INLINE) { // 单个命令
            if (processInlineBuffer(c) != C_OK) break;
            /* If the Gopher mode and we got zero or one argument, process
             * the request in Gopher mode. To avoid data race, Redis won't
             * support Gopher if enable io threads to read queries. */
            if (server.gopher_enabled && !server.io_threads_do_reads &&
                ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
                  c->argc == 0))
            {
                processGopherRequest(c);
                resetClient(c);
                c->flags |= CLIENT_CLOSE_AFTER_REPLY;
                break;
            }
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {  // 批量
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            /* We are finally ready to execute the command. */
            if (processCommandAndResetClient(c) == C_ERR) {
                /* If the client is no longer valid, we avoid exiting this
                 * loop and trimming the client buffer later. So we return
                 * ASAP in that case. */
                return;
            }
        }
    }

    /* Trim to pos */
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }
}

(2.2) 协议解析-processMultibulkBuffer()

// filepath: redis/src/networking.c

/* 
 * 处理客户端 client *c 的查询缓冲区,设置用于命令执行的客户端参数向量。 
 * 如果运行该函数后客户端有一个格式良好的准备好处理的命令,则返回 C_OK,
 * 否则如果仍需要读取更多缓冲区来获取完整命令,则返回 C_ERR。 (输入数据不完整 input ill)
 * 当存在协议错误时,该函数还会返回 C_ERR:在这种情况下,客户端结构设置为回复错误并关闭连接。
 *
 * 如果 processInputBuffer() 检测到下一个命名是 RESP 格式 则调用此函数,
 * 命令里的第一个字节是'*'   (备注: 在RESP协议里 '*' 类似于作文里的 '。' 代表一句话/一条命令)
 * 否则调用内联命令 processInlineBuffer()  
 */
int processMultibulkBuffer(client *c) {
    char *newline = NULL;
    int ok;
    long long ll;

    if (c->multibulklen == 0) {
        /* The client should have been reset */
        serverAssertWithInfo(c,NULL,c->argc == 0);

        /* Multi bulk length cannot be read without a \r\n */
        newline = strchr(c->querybuf+c->qb_pos,'\r');
        if (newline == NULL) {
            // 保证输入长度 不大于 协议规定的最大长度 PROTO_INLINE_MAX_SIZE
            if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
                addReplyError(c,"Protocol error: too big mbulk count string");
                setProtocolError("too big mbulk count string",c);
            }
            return C_ERR;
        }

        /* Buffer should also contain \n */
        if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
            return C_ERR;

        /* We know for sure there is a whole line since newline != NULL,
         * so go ahead and find out the multi bulk length. */
        // 确保输入里有'*'
        // 解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段
        serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');

        // 尝试把string转成long
        ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
        if (!ok || ll > 1024*1024) {
            addReplyError(c,"Protocol error: invalid multibulk length");
            setProtocolError("invalid mbulk count",c);
            return C_ERR;
        } else if (ll > 10 && authRequired(c)) {
            addReplyError(c, "Protocol error: unauthenticated multibulk length");
            setProtocolError("unauth mbulk count", c);
            return C_ERR;
        }

        // 记录已解析命令的请求长度resp的长度
        c->qb_pos = (newline-c->querybuf)+2;

        if (ll <= 0) return C_OK;

        c->multibulklen = ll;

        /* Setup argv array on client structure */
        if (c->argv) zfree(c->argv);
        // 分配请求参数存储空间
        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
        c->argv_len_sum = 0;
    }

    serverAssertWithInfo(c,NULL,c->multibulklen > 0);

    // 循环解析每个请求参数
    while(c->multibulklen) {
        /* Read bulk length if unknown */
        if (c->bulklen == -1) {
            // 定位到行尾
            newline = strchr(c->querybuf+c->qb_pos,'\r');
            if (newline == NULL) {
                if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
                    addReplyError(c,
                        "Protocol error: too big bulk count string");
                    setProtocolError("too big bulk count string",c);
                    return C_ERR;
                }
                break;
            }

            /* Buffer should also contain \n */
            // 确保包含 '\n'
            if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
                break;

            // 校验命令是否以$开头
            // querybuf =  "*6\r\n$3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
            // c->qb_pos = 4
            if (c->querybuf[c->qb_pos] != '$') {
                addReplyErrorFormat(c,
                    "Protocol error: expected '$', got '%c'",
                    c->querybuf[c->qb_pos]);
                setProtocolError("expected $ but got something else",c);
                return C_ERR;
            }
            
            // querybuf =  "*6\r\n$3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
            // c->qb_pos = 4
            // newline = "3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
            // ll结果是3
            // 
            // 内存地址操作  c->querybuf + c->qb_pos + 1 
            // c->querybuf 是入参的字符串
            // c->qb_pos 是 当前指向的位置 
            // 1 是 $ 对应的位置
            ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
            if (!ok || ll < 0 ||
                (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
                addReplyError(c,"Protocol error: invalid bulk length");
                setProtocolError("invalid bulk length",c);
                return C_ERR;
            } else if (ll > 16384 && authRequired(c)) {
                addReplyError(c, "Protocol error: unauthenticated bulk length");
                setProtocolError("unauth bulk length", c);
                return C_ERR;
            }
            
            // "\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
            // c->qb_pos = 8
            c->qb_pos = newline-c->querybuf+2;
            if (ll >= PROTO_MBULK_BIG_ARG) {
                /* If we are going to read a large object from network
                 * try to make it likely that it will start at c->querybuf
                 * boundary so that we can optimize object creation
                 * avoiding a large copy of data.
                 *
                 * But only when the data we have not parsed is less than
                 * or equal to ll+2. If the data length is greater than
                 * ll+2, trimming querybuf is just a waste of time, because
                 * at this time the querybuf contains not only our bulk. */
                if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
                    sdsrange(c->querybuf,c->qb_pos,-1);
                    c->qb_pos = 0;
                    /* Hint the sds library about the amount of bytes this string is
                     * going to contain. */
                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
                }
            }
            // 命令行数为3  ll=3
            c->bulklen = ll;
        }

        /* Read bulk argument */
        if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
            /* Not enough data (+2 == trailing \r\n) */
            break;
        } else {
            /* Optimization: if the buffer contains JUST our bulk element
             * instead of creating a new object by *copying* the sds we
             * just use the current sds string. */
            if (c->qb_pos == 0 &&
                c->bulklen >= PROTO_MBULK_BIG_ARG &&
                sdslen(c->querybuf) == (size_t)(c->bulklen+2))
            {
                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
                c->argv_len_sum += c->bulklen;
                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
                /* Assume that if we saw a fat argument we'll see another one
                 * likely... */
                c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
                sdsclear(c->querybuf);
            } else {
                // 把命令存起来
                // 对应一行的值 比如 $5 86400 这里是把86400存成redisObject
                c->argv[c->argc++] =
                    createStringObject(c->querybuf+c->qb_pos,c->bulklen);
                c->argv_len_sum += c->bulklen;
                c->qb_pos += c->bulklen+2;
            }
            c->bulklen = -1;
            c->multibulklen--;
        }
    }

    /* We're done when c->multibulk == 0 */
    if (c->multibulklen == 0) return C_OK;

    /* Still not ready to process the command */
    return C_ERR;
}

参考

[1] Redis序列化协议规范-redis官网
[2] github
[3] antirez/resp3-github
[4] 为什么 Redis 6 只支持 RESP3 ?
[5] Redis核心技术与实战-蒋德钧-Redis客户端如何与服务器端交换命令和数据?