Redis 序列化协议 RESP
(1) RESP协议是什么
Redis Serialization Protocol
(RESP
)是Redis客户端
和服务器
之间使用的通信协议。
它是一种简单、高效的文本协议,易于实现和解析。
(1.1) 协议定义
以下是一个简单的RESP协议示例:
redis-client 发给 redis-server 的数据。
// querybuf = "*6\r\n$3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
*6
$3
set
$19
9223372036854775806
$1
1
$2
ex
$5
86400
$2
nx
这个示例对应的redis命令 set 9223372036854775806 1 ex 86400 nx
这个示例表示一个SET命令,用于将键 9223372036854775806 的值设置为 1 。
RESP协议使用特殊的符号(如*、$、\r\n)来表示不同类型的数据和分隔符。
(2) 源码解读
在Redis源码中,RESP的实现主要分布在以下几个文件中:
src/networking.h
和 src/networking.c
:这些文件包含了Redis服务器与客户端之间的网络通信相关的函数,如创建服务器套接字、处理客户端连接、读取和写入数据等。
src/protocol.c
:这个文件包含了RESP协议的解析和格式化函数,如processInputBuffer(处理客户端发送的命令)、addReply*系列函数(向客户端发送响应)等。
src/sds.h
和 src/sds.c
:简单动态字符串(SDS,Simple Dynamic String)是RESP的基础数据结构。SDS提供了对字符串的高效操作,如追加、截取等。
todo 整体处理流程
(2.1) 处理输入缓冲-processInputBuffer()
// filepath: redis/src/networking.c
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
// 循环遍历输入缓冲区
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) &&
!(c->flags & CLIENT_PENDING_READ) &&
clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) { // 单个命令
if (processInlineBuffer(c) != C_OK) break;
/* If the Gopher mode and we got zero or one argument, process
* the request in Gopher mode. To avoid data race, Redis won't
* support Gopher if enable io threads to read queries. */
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) { // 批量
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}
/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
(2.2) 协议解析-processMultibulkBuffer()
// filepath: redis/src/networking.c
/*
* 处理客户端 client *c 的查询缓冲区,设置用于命令执行的客户端参数向量。
* 如果运行该函数后客户端有一个格式良好的准备好处理的命令,则返回 C_OK,
* 否则如果仍需要读取更多缓冲区来获取完整命令,则返回 C_ERR。 (输入数据不完整 input ill)
* 当存在协议错误时,该函数还会返回 C_ERR:在这种情况下,客户端结构设置为回复错误并关闭连接。
*
* 如果 processInputBuffer() 检测到下一个命名是 RESP 格式 则调用此函数,
* 命令里的第一个字节是'*' (备注: 在RESP协议里 '*' 类似于作文里的 '。' 代表一句话/一条命令)
* 否则调用内联命令 processInlineBuffer()
*/
int processMultibulkBuffer(client *c) {
char *newline = NULL;
int ok;
long long ll;
if (c->multibulklen == 0) {
/* The client should have been reset */
serverAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) {
// 保证输入长度 不大于 协议规定的最大长度 PROTO_INLINE_MAX_SIZE
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big mbulk count string");
setProtocolError("too big mbulk count string",c);
}
return C_ERR;
}
/* Buffer should also contain \n */
if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
return C_ERR;
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
// 确保输入里有'*'
// 解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段
serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
// 尝试把string转成long
ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError("invalid mbulk count",c);
return C_ERR;
} else if (ll > 10 && authRequired(c)) {
addReplyError(c, "Protocol error: unauthenticated multibulk length");
setProtocolError("unauth mbulk count", c);
return C_ERR;
}
// 记录已解析命令的请求长度resp的长度
c->qb_pos = (newline-c->querybuf)+2;
if (ll <= 0) return C_OK;
c->multibulklen = ll;
/* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
// 分配请求参数存储空间
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
c->argv_len_sum = 0;
}
serverAssertWithInfo(c,NULL,c->multibulklen > 0);
// 循环解析每个请求参数
while(c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
// 定位到行尾
newline = strchr(c->querybuf+c->qb_pos,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
addReplyError(c,
"Protocol error: too big bulk count string");
setProtocolError("too big bulk count string",c);
return C_ERR;
}
break;
}
/* Buffer should also contain \n */
// 确保包含 '\n'
if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
break;
// 校验命令是否以$开头
// querybuf = "*6\r\n$3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
// c->qb_pos = 4
if (c->querybuf[c->qb_pos] != '$') {
addReplyErrorFormat(c,
"Protocol error: expected '$', got '%c'",
c->querybuf[c->qb_pos]);
setProtocolError("expected $ but got something else",c);
return C_ERR;
}
// querybuf = "*6\r\n$3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
// c->qb_pos = 4
// newline = "3\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
// ll结果是3
//
// 内存地址操作 c->querybuf + c->qb_pos + 1
// c->querybuf 是入参的字符串
// c->qb_pos 是 当前指向的位置
// 1 是 $ 对应的位置
ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 ||
(!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
addReplyError(c,"Protocol error: invalid bulk length");
setProtocolError("invalid bulk length",c);
return C_ERR;
} else if (ll > 16384 && authRequired(c)) {
addReplyError(c, "Protocol error: unauthenticated bulk length");
setProtocolError("unauth bulk length", c);
return C_ERR;
}
// "\r\nset\r\n$19\r\n9223372036854775806\r\n$1\r\n1\r\n$2\r\nex\r\n$5\r\n86400\r\n$2\r\nnx\r\n"
// c->qb_pos = 8
c->qb_pos = newline-c->querybuf+2;
if (ll >= PROTO_MBULK_BIG_ARG) {
/* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data.
*
* But only when the data we have not parsed is less than
* or equal to ll+2. If the data length is greater than
* ll+2, trimming querybuf is just a waste of time, because
* at this time the querybuf contains not only our bulk. */
if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
/* Hint the sds library about the amount of bytes this string is
* going to contain. */
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
}
}
// 命令行数为3 ll=3
c->bulklen = ll;
}
/* Read bulk argument */
if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
/* Not enough data (+2 == trailing \r\n) */
break;
} else {
/* Optimization: if the buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
if (c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
c->argv_len_sum += c->bulklen;
sdsIncrLen(c->querybuf,-2); /* remove CRLF */
/* Assume that if we saw a fat argument we'll see another one
* likely... */
c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
sdsclear(c->querybuf);
} else {
// 把命令存起来
// 对应一行的值 比如 $5 86400 这里是把86400存成redisObject
c->argv[c->argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->argv_len_sum += c->bulklen;
c->qb_pos += c->bulklen+2;
}
c->bulklen = -1;
c->multibulklen--;
}
}
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK;
/* Still not ready to process the command */
return C_ERR;
}
参考
[1] Redis序列化协议规范-redis官网
[2] github
[3] antirez/resp3-github
[4] 为什么 Redis 6 只支持 RESP3 ?
[5] Redis核心技术与实战-蒋德钧-Redis客户端如何与服务器端交换命令和数据?