Redis 数据结构 压缩列表(ziplist)

Redis是内存数据库,为了节省内存,在一些条件下使用了紧凑型数据结构,其中一个就是压缩列表(ziplist)

(1) 压缩列表(ziplist)是什么

ziplist是一种特殊编码的双向链表,旨在提高内存效率。
它存储字符串和整数值,其中整数被编码为实际整数而不是一系列字符。
它允许在O(1)时间内对列表的任一侧进行推送和弹出操作。
但是,由于每个操作都需要重新分配 ziplist 使用的内存,因此实际的复杂性与 ziplist 使用的内存量有关。

(1.1) 压缩列表结构

压缩列表结构


(2) 为什么要使用压缩列表

压缩列表可以节约内存

如果使用链表存储元素,链表节点里需要 val、next,而使用压缩列表,只需要存储val即可。

(2.1) 压缩列表特点

ziplist 的特点:

  1. 连续内存存储:每个元素紧凑排列,内存利用率高
  2. 变长编码:存储数据时,采用变长编码(满足数据长度的前提下,尽可能少分配内存)
    3)寻找元素需遍历:存放太多元素,性能会下降(适合少量数据存储)
  3. 级联更新:更新、删除元素,会引发级联更新(因为内存连续,前面数据膨胀/删除了,后面要跟着一起动)

(3) 压缩列表原理

area        |<---- ziplist header ---->|<----------- entries ------------->|<-end->|

size          4 bytes  4 bytes  2 bytes    ?        ?        ?        ?     1 byte
            +---------+--------+-------+--------+--------+--------+--------+-------+
component   | zlbytes | zltail | zllen | entry1 | entry2 |  ...   | entryN | zlend |
            +---------+--------+-------+--------+--------+--------+--------+-------+
                                       ^                          ^        ^
address                                |                          |        |
                                ZIPLIST_ENTRY_HEAD                |   ZIPLIST_ENTRY_END
                                                                  |
                                                         ZIPLIST_ENTRY_TAIL

压缩列表是一块连续的内存空间。
为了方便查询,在头部记录了压缩列表占用的总字节数、entry尾部的地址 以及 压缩列表的元素个数。

字段 长度 作用
zlbytes 4字节 整个ziplist占用的内存字节数,对ziplist进行内存重分配,或者计算末端时使用。
zltail 4字节 ziplist尾节点的偏移量。通过这个偏移量,可以快速获取尾部节点。
zllen 2字节 快速获取ziplist长度
zlend 1字节 结束符

(3.1) 创建新的ziplist

area        |<---- ziplist header ---->|<-- end -->|

size          4 bytes   4 bytes 2 bytes  1 byte
            +---------+--------+-------+-----------+
component   | zlbytes | zltail | zllen | zlend     |
            |         |        |       |           |
value       |  1011   |  1010  |   0   | 1111 1111 |
            +---------+--------+-------+-----------+
                                       ^
                                       |
                               ZIPLIST_ENTRY_HEAD
                                       &
address                        ZIPLIST_ENTRY_TAIL
                                       &
                               ZIPLIST_ENTRY_END

(3.2) 将节点添加到末端

将新节点添加到 ziplist 的末端需要执行以下几个步骤:

  1. 记录到达 ziplist 末端所需的偏移量(因为之后的内存重分配可能会改变 ziplist 的地址,因此记录偏移量而不是保存指针)
  2. 根据新节点要保存的值,计算出编码这个值所需的空间大小,以及编码它前一个节点的长度所需的空间大小,然后对 ziplist 进行内存重分配。
  3. 设置新节点的各项属性: pre_entry_length 、 encoding 、 length 和 content 。
  4. 更新 ziplist 的各项属性,比如记录空间占用的 zlbytes ,到达表尾节点的偏移量 zltail ,以及记录节点数量的 zllen 。
area        |<---- ziplist header ---->|<--- entries -->|<-- end -->|

size          4 bytes  4 bytes  2 bytes  5 bytes          1 bytes
            +---------+--------+-------+----------------+-----------+
component   | zlbytes | zltail | zllen | entry 1        | zlend     |
            |         |        |       |                |           |
value       |  10000  |  1010  |   1   | ?              | 1111 1111 |
            +---------+--------+-------+----------------+-----------+
                                       ^                ^
                                       |                |
address                         ZIPLIST_ENTRY_HEAD   ZIPLIST_ENTRY_END
                                       &
                                ZIPLIST_ENTRY_TAIL

(4) 压缩列表源码解析

压缩列表

(4.1) 压缩列表结构

压缩列表结构

area        |<---- ziplist header ---->|<----------- entries ------------->|<-end->|

size          4 bytes  4 bytes  2 bytes    ?        ?        ?        ?     1 byte
            +---------+--------+-------+--------+--------+--------+--------+-------+
component   | zlbytes | zltail | zllen | entry1 | entry2 |  ...   | entryN | zlend |
            +---------+--------+-------+--------+--------+--------+--------+-------+
                                       ^                          ^        ^
address                                |                          |        |
                                ZIPLIST_ENTRY_HEAD                |   ZIPLIST_ENTRY_END
                                                                  |
                                                         ZIPLIST_ENTRY_TAIL

如果没有特殊指定,所有字段存储都使用小端存储。

参考ziplistNew()函数

entry节点结构

area        |<------------------- entry -------------------->|

            +------------------+----------+--------+---------+
component   | pre_entry_length | encoding | length | content |
            +------------------+----------+--------+---------+

pre_entry_length 记录了前一个节点的长度,通过这个值,可以进行指针计算,从而跳转到上一个节点。

/* 
 * 我们使用此函数接收有关 ziplist entry的信息。
 * 请注意,这并不是数据的实际编码方式,只是我们为了更容易操作而通过函数填充的内容。 
 */
typedef struct zlentry {
    unsigned int prevrawlensize; // 前一个entry长度 对应的字节数
    unsigned int prevrawlen;     // 前一个entry长度 
    unsigned int lensize;        // 用于编码此entry类型/长度 的字节数。  // int long 对应的字节数
                                 // 例如,字符串有一个 1、2 或 5 字节的标头。 整数总是使用一个字节。
    unsigned int len;            // 用于表示实际entry的长度。
                                 // 对于字符串,这只是字符串长度,
                                 // 而对于整数,它是 1、2、3、4、8 或 0(for 4 bit immediate),具体取决于数字范围。
    unsigned int headersize;     // prevrawlensize + lensize.
    unsigned char encoding;      // 当前节点数据编码  
                                 // 根据entry编码设置为 ZIP_STR_* 或 ZIP_INT_*。 
                                 // 然而,对于 4 bits immediate integers,这可以假定一个值范围并且必须进行范围检查。
    unsigned char *p;            // 指向条目最开始的指针,即 this 指向 previous-entry-in 字段。
} zlentry;

(4.2) 压缩列表提供的功能

函数名 作用 算法复杂度
ziplistNew 创建一个新的 ziplist O(1)
ziplistResize 调整 ziplist 内存大小 O(N)
ziplistPush 将一个包含给定值的新节点插入 ziplist 的表头或者表尾 O(N^2)
zipEntry 取出给定地址上的节点信息 O(1)
ziplistInsert 将一个包含给定值的新节点插入到给定地址 O(N^2)
ziplistDelete 删除给定地址上的节点 O(N^2)
ziplistDeleteRange 在给定索引上,连续进行多次删除 O(N^2)
ziplistFind 在 ziplist 中查找并返回包含给定值的节点 O(N)
ziplistLen 返回 ziplist 保存的节点数量 O(N)
ziplistBlobLen 以字节为单位,返回 ziplist 占用的内存大小 O(1)

(4.3) 创建压缩列表

// file: ziplist.c

/* 创建一个空的压缩列表 */
unsigned char *ziplistNew(void) {
    // 计算长度  
    // ZIPLIST_HEADER_SIZE = 32bits + 32bits + 16bits
    // ZIPLIST_END_SIZE = 8bits
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    // 初始分配的大小
    unsigned char *zl = zmalloc(bytes);
    // 压缩列表总字节长度
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    // 尾部节点字节距离  结尾下标
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    // 长度设置为0
    ZIPLIST_LENGTH(zl) = 0;
    // 最后一个字节 设置成 结束符255,标记结束
    zl[bytes-1] = ZIP_END;
    return zl;
}

ziplistNew函数的逻辑很简单,就是创建一块连续的内存空间,大小为 ZIPLIST_HEADER_SIZE(10字节) 和 ZIPLIST_END_SIZE(1字节) 的总和,然后再把该连续空间的最后一个字节赋值为 ZIP_END,表示列表结束。

空压缩列表内存结构

//ziplist的列表头大小,包括2个32 bits整数和1个16bits整数,
// 分别表示压缩列表的总字节数,列表最后一个元素的离列表头的偏移,以及列表中的元素个数
/* 
 * ziplist的列表头大小:2个32 bits整数,用于表示总字节数 和 最后一项偏移量。 
 * 用1个16 bits的整数表示元素个数
 * 
 */
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))

/*  
 * ziplist的列表尾大小,只有1字节,表示列表结束。
 */
#define ZIPLIST_END_SIZE        (sizeof(uint8_t))

// ziplist的列表尾字节内容
#define ZIP_END 255         /* 特殊的 "ziplist 结尾" 实体。 */

intrev32ifbe函数为大小端转换函数,统一转换为小端存储。 为什么要进行转换?
因为压缩列表的操作中涉及到的位运算很多,后续的所有位运算都是在小端存储的基础上进行的。

(4.4) 调整压缩列表内存大小

/* 
 * 调整 ziplist 内存大小  
 */
unsigned char *ziplistResize(unsigned char *zl, size_t len) {
    // 校验 确保长度< 2^32
    assert(len < UINT32_MAX);
    // 压缩列表分配内存
    zl = zrealloc(zl,len);
    // 压缩列表总字节长度
    ZIPLIST_BYTES(zl) = intrev32ifbe(len);
    // 最后一个字节 设置成 结束符255,标记结束
    zl[len-1] = ZIP_END;
    return zl;
}

(4.5) 压缩列表新增元素-ziplistPush

1.计算实际插入元素的长度

/*
 * 在压缩列表的entry头部或尾部插入元素 
 *
 * @param *zl 压缩列表
 * @param *s 新增数据
 * @param slen 长度
 * @param where  
 */
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
    // 判断在entry头部或尾部插入元素
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    // 插入元素
    return __ziplistInsert(zl,p,s,slen);
}
/*
 * 在entry指定位置插入元素 
 * 
 * @param *zl 压缩列表
 * @param *p 插入下标的指针
 * @param *s 要插入的数据
 * @param slen 插入数据长度
 */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    // 
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;

    // 找出插入的entry的 prevlen。 
    if (p[0] != ZIP_END) { // 插入位置不在尾部
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else { // 插入位置在尾部
        // 最后一个entry的指针
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) { // 不是结束标识
            // 前一个entry的长度
            prevlen = zipRawEntryLength(ptail);
        }
    }

    // 查看条目是否可以编码成整数
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        // 'encoding' 设置为适当的整数编码
        reqlen = zipIntSize(encoding);
    } else {
        // 'encoding' 未受影响,zipStoreEntryEncoding 将使用字符串长度来确定如何对其进行编码。
        reqlen = slen;
    }

    // 需要前一个entry的长度 和 有效载荷的长度 
    reqlen += zipStorePrevEntryLength(NULL,prevlen);  // 加上前一个entry的长度
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);  // 加上当前entry编码长度和当前entry数据长度

    // 当插入位置不等于尾部时,我们需要确保下一个entry可以在其 prevlen 字段中保存本entry的长度。 
    int forcelarge = 0;
    // 用来判断插入位置元素的prevlen长度和实际所需的prevlen长度是否相等
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    // 这个是为了修复一个bug    ziplist源码确实不太好读呀 😭  详细分析见:https://segmentfault.com/a/1190000018878466
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }

    // 存储下标,因为重新分配内存可能更改ziplist的地址
    offset = p-zl;
    // 调整ziplist内存大小 // 当前长度 + 需要申请的长度 + nextdiff 
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    // 必要时应用内存移动并更新尾部偏移量。
    if (p[0] != ZIP_END) { // 不是结束字符
        // 由于 ZIP_END 字节减一 /* Subtract one because of the ZIP_END bytes */
        // 将p节点后移(没有移动p节点前一节点长度信息),留出当前节点位置
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry's raw length in the next entry. */
        // 写入p节点前一节点长度信息(要插入节点的长度)
        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        // 更新尾节点偏移量
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        zipEntry(p+reqlen, &tail);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else { // 是结束字符 也就是列表是空的 
        // 空列表插入,只更新尾节点偏移量 
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    // 这个类似于数组前插入元素,整体都要移动,这个复杂度 🍵  
    // 当 nextdiff != 0 时,下一个节点的prevlen需要改变,所以我们需要在整个 ziplist 中级联更新
    if (nextdiff != 0) {
        offset = p-zl;
        // 连锁更新 
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    // 写入前一节点长度信息
    p += zipStorePrevEntryLength(p,prevlen);
    // 写入节点编码与长度信息
    p += zipStoreEntryEncoding(p,encoding,slen);
    // 写入数据
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    // 增加压缩列表长度
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

可以看到,在新增节点时首先会尝试编码,如果编码成数字使用整形数字编码,否则使用字符串编码
但是不同编码占用的内存空间不同,整数编码占用 1/2/3/4/8 字节,字符串编码占用 slen 字节

当在一个元素 A 前插入一个新的元素 B 时,A 的 prevlen 和 prevlensize 都要根据 B 的长度进行相应的变化。
假设 A 的 prevlen 原本只占用 1 字节(也就是 prevlensize 等于 1),而能记录的前一项长度最大为 253 字节。此时,如果 B 的长度超过了 253 字节,A 的 prevlen 就需要使用 5 个字节来记录,这样就需要申请额外的 4 字节空间了。

连锁更新一旦发生,就会导致 ziplist 占用的内存空间要多次重新分配,这就会直接影响到 ziplist 的访问性能。

(4.5.1) 字符串尝试编码为整数-zipTryEncoding

/*
 * 检查"entry"指向的字符串是否可以编码为整数。 
 * 将整数值存储在“v”中,并将其编码存储在“encoding”中。
 * 
 * @param *entry 字符串
 * @param entrylen 节点长度
 * @param *v 整数值
 * @param 编码
 */
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    // 字符串转换为整数后的值
    long long value;

    if (entrylen >= 32 || entrylen == 0) return 0;
    // 字符转整形数字(8字节)  转换后的值存储在&value 
    if (string2ll((char*)entry,entrylen,&value)) {
        // 可以对字符串进行编码。 检查可以容纳此值的最小编码类型。 
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        // 设置值
        *v = value;
        return 1;
    }

    // 返回false
    return 0;
}

判断字符串是否可以编码为整数
如果可以编码为整数,编码根据大小设置为 ZIP_INT_8B / ZIP_INT_16B / ZIP_INT_24B / ZIP_INT_32B / ZIP_INT_64B,分表占用 1/2/3/4/8 字节内存空间。


(4.5.2) 压缩整数占用内存空间大小-zipIntSize

/*
 * 返回存储由'encoding'编码的整数所需的字节数。
 */
unsigned int zipIntSize(unsigned char encoding) {
    switch(encoding) {
    case ZIP_INT_8B:  return 1;
    case ZIP_INT_16B: return 2;
    case ZIP_INT_24B: return 3;
    case ZIP_INT_32B: return 4;
    case ZIP_INT_64B: return 8;
    }
    // 
    if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX)
        return 0; /* 4 bit immediate */
    panic("Invalid integer encoding 0x%02X", encoding);
    return 0;
}

ZIP_INT_8B / ZIP_INT_16B / ZIP_INT_24B / ZIP_INT_32B / ZIP_INT_64B,分表占用 1/2/3/4/8 字节内存空间

#define ZIP_INT_16B (0xc0 | 0<<4)
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)
#define ZIP_INT_24B (0xc0 | 3<<4)
#define ZIP_INT_8B 0xfe

#define ZIP_INT_IMM_MIN 0xf1    /* 11110001 */
#define ZIP_INT_IMM_MAX 0xfd    /* 11111101 */

(4.5.3) 前一个节点长度占用内存大小-zipStorePrevEntryLength

/*
 * 编码前一个entry的长度并将其写入指针"p"。
 * 如果"p"为 NULL,则返回编码此长度所需的字节数。 
 *
 * @param *p 
 * @param len 长度
 */
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
    if (p == NULL) {
        // ZIP_BIG_PREVLEN = 254 
        // len < 254 返回 1字节,否则返回 int占用4字节+1字节=5字节
        return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(len)+1;
    } else {
        if (len < ZIP_BIG_PREVLEN) { // len < 254 
            p[0] = len;
            // 返回1字节
            return 1;
        } else {
            // 返回对应的大小
            return zipStorePrevEntryLengthLarge(p,len);
        }
    }
}
/* 
 * 编码前一个entry的长度并将其写入指针"p"。 
 * 这仅使用较大的编码(__ziplistCascadeUpdate 中需要)。
 */
int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {
    if (p != NULL) {
        // 第一个字节设置成 254 
        p[0] = ZIP_BIG_PREVLEN;
        // len是int类型,占用4个字节
        // 接下来4个字节保存长度len
        memcpy(p+1,&len,sizeof(len));
        // 
        memrev32ifbe(p+1);
    }
    // 返回 1字节+int占用4字节=5字节
    return 1+sizeof(len);
}

根据编码方式的不同, pre_entry_length 域可能占用 1 字节或者 5 字节:
1 字节:如果前一节点的长度小于 254 字节,便使用一个字节保存它的值。
5 字节:如果前一节点的长度大于等于 254 字节,那么将第 1 个字节的值设为 254 ,然后用接下来的4个字节保存实际长度。

253对应的entry

area        |<------------------- entry -------------------->|

size          1 byte             ?          ?        ?
            +------------------+----------+--------+---------+
component   | pre_entry_length | encoding | length | content |
            |                  |          |        |         |
value       | 1111 1101        |          |        |         |
            +------------------+----------+--------+---------+

1024对应的entry

area        |<------------------------------ entry ---------------------------------->|

size          5 bytes                                     ?          ?        ?
            +-------------------------------------------+----------+--------+---------+
component   | pre_entry_length                          | encoding | length | content |
            |                                           |          |        |         |
            | 11111110 00000000000000000000010000000000 | ?        | ?      | ?       |
            +-------------------------------------------+----------+--------+---------+
            |<------->|<------------------------------->|
              1 byte       4 bytes

(4.5.4) 节点编码占用的内存大小-zipStoreEntryEncoding

/* 
 * 在指针'p'中写入entry的编码头。 
 * 如果 p 为 NULL,它只返回编码该长度所需的字节数。 
 * 参数:
 *  'encoding' 用于entry的编码。 对于单字节小立即整数,它可以是 ZIP_INT_* 或 ZIP_STR_* 或介于 ZIP_INT_IMM_MIN 和 ZIP_INT_IMM_MAX 之间。
 *  'rawlen' 仅用于 ZIP_STR_* 编码并且是该条目表示的字符串的长度。
 *
 *  该函数返回存储在指针"p"中的编码/长度标头使用的字节数。
 */
unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
    // 长度
    unsigned char len = 1, buf[5];

    if (ZIP_IS_STR(encoding)) { // 字符串编码
        // 虽然给定了编码,但它可能没有为字符串设置,所以我们在这里使用原始长度来确定它。 
        if (rawlen <= 0x3f) {  // 字符串长度小于等于63字节(16进制为0x3f)
            if (!p) return len;
            buf[0] = ZIP_STR_06B | rawlen;
        } else if (rawlen <= 0x3fff) {  // 字符串长度小于等于16383字节(16进制为0x3fff)
            len += 1; // 编码结果是2字节
            if (!p) return len;
            buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
            buf[1] = rawlen & 0xff;
        } else {  // 字符串长度大于16383字节
            len += 4; // 编码结果是5字节
            if (!p) return len;
            buf[0] = ZIP_STR_32B;
            buf[1] = (rawlen >> 24) & 0xff;
            buf[2] = (rawlen >> 16) & 0xff;
            buf[3] = (rawlen >> 8) & 0xff;
            buf[4] = rawlen & 0xff;
        }
    } else {
        // 表示整数编码,因此长度始终为 1。 
        if (!p) return len;
        buf[0] = encoding;
    }

    // 把长度存储到指针p
    memcpy(p,buf,len);
    return len;
}

根据字符串不同的长度,分别占用 1/2/5 字节内存空间。

针对不同长度的数据,使用不同大小的元数据信息(encoding),这种方法可以有效地节省内存开销。

(4.5.5) 前一个节点长度内存空间之差-zipPrevLenByteDiff

/* 
 * 给定一个指针"p"指向 entry 前缀的 prevlen 信息,
 * 如果前一个entry的大小发生变化,此函数将返回编码 prevlen 所需的字节数差异。
 *
 * 如果 A 是现在用于对"prevlen"字段进行编码的字节数。 
 * 如果前一个元素将更新为大小为"len",则 B 是编码"prevlen"所需的字节数。
 * 函数返回 B - A
 *
 * 因此,如果需要更多空间,该函数返回一个正数,如果需要更少空间,则返回负数,如果需要相同空间,则返回零。  
 *
 * @param *p 
 * @param len 占用的内存空间大小/长度
 */
int zipPrevLenByteDiff(unsigned char *p, unsigned int len) {
    unsigned int prevlensize;
    // 前一个节点长度
    ZIP_DECODE_PREVLENSIZE(p, prevlensize);
    // B - A 
    return zipStorePrevEntryLength(NULL, len) - prevlensize;
}

(4.5.6) 前一节点长度占用的内存空间-ZIP_DECODE_PREVLENSIZE

/* 
 * 返回用于对前一个entry的长度进行编码的字节数。 
 * 通过设置 'prevlensize' 返回长度。
 */
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          \
    if ((ptr)[0] < ZIP_BIG_PREVLEN) {                                          \
        (prevlensize) = 1;                                                     \
    } else {                                                                   \
        (prevlensize) = 5;                                                     \
    }                                                                          \
} while(0)

前一节点长度小于254时,使用1个字节保存前一节点的长度信息。
前一节点长度大于254时,使用5个字节保存前一节点的长度信息。

(4.6) 取指定节点信息

/* 
 * 返回包含有关entry结构的所有信息。
 */
void zipEntry(unsigned char *p, zlentry *e) {
    // 前一哥节点长度信息解析
    ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
    // 当前节点 数据长度解析 与 编码信息解析
    ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
    // 设置header长度
    e->headersize = e->prevrawlensize + e->lensize;
    // 指针指向节点e
    e->p = p;
}

(4.7) 插入数据

/*
 * 在entry指定位置插入元素 
 * 
 * @param *zl 压缩列表
 * @param *p 插入下标的指针
 * @param *s 要插入的数据
 * @param slen 插入数据长度
 */
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    return __ziplistInsert(zl,p,s,slen);
}

(4.8) 删除数据

/* Delete a single entry from the ziplist, pointed to by *p.
 * Also update *p in place, to be able to iterate over the
 * ziplist, while deleting entries. */
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {
    size_t offset = *p-zl;
    // 
    zl = __ziplistDelete(zl,*p,1);

    /* Store pointer to current element in p, because ziplistDelete will
     * do a realloc which might result in a different "zl"-pointer.
     * When the delete direction is back to front, we might delete the last
     * entry and end up with "p" pointing to ZIP_END, so check this. */
    *p = zl+offset;
    return zl;
}
/* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;

    zipEntry(p, &first);
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p);
        deleted++;
    }

    totlen = p-first.p; /* Bytes taken by the element(s) to delete. */
    if (totlen > 0) {
        if (p[0] != ZIP_END) {
            /* Storing `prevrawlen` in this entry may increase or decrease the
             * number of bytes required compare to the current `prevrawlen`.
             * There always is room to store this, because it was previously
             * stored by an entry that is now being deleted. */
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);

            /* Note that there is always space when p jumps backward: if
             * the new previous entry is large, one of the deleted elements
             * had a 5 bytes prevlen header, so there is for sure at least
             * 5 bytes free and we need just 4. */
            p -= nextdiff;
            zipStorePrevEntryLength(p,first.prevrawlen);

            /* Update offset for tail */
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);

            /* When the tail contains more than one entry, we need to take
             * "nextdiff" in account as well. Otherwise, a change in the
             * size of prevlen doesn't have an effect on the *tail* offset. */
            zipEntry(p, &tail);
            if (p[tail.headersize+tail.len] != ZIP_END) {
                ZIPLIST_TAIL_OFFSET(zl) =
                   intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
            }

            /* Move tail to the front of the ziplist */
            memmove(first.p,p,
                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
        } else {
            /* The entire tail was deleted. No need to move memory. */
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe((first.p-zl)-first.prevrawlen);
        }

        /* Resize and update length */
        offset = first.p-zl;
        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
        ZIPLIST_INCR_LENGTH(zl,-deleted);
        p = zl+offset;

        /* When nextdiff != 0, the raw length of the next entry has changed, so
         * we need to cascade the update throughout the ziplist */
        if (nextdiff != 0)
            zl = __ziplistCascadeUpdate(zl,p);
    }
    return zl;
}

(4.9) 查找-ziplistFind

要查找列表中间的元素时,ziplist 就得从列表头或列表尾遍历才行。

/* 
 * 查找指向等于指定节点的节点的指针。 
 * 在每次比较之间跳过'skip'节点。 
 * 找不到字段时返回 NULL。
 * 
 * @param *p  节点指针
 * @param *vstr 
 * @param vlen
 * @param skip 
 */
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    int skipcnt = 0;
    unsigned char vencoding = 0;
    long long vll = 0;

    // 遍历压缩列表
    while (p[0] != ZIP_END) {  // 不等于结束符
        unsigned int prevlensize, encoding, lensize, len;
        unsigned char *q;

        ZIP_DECODE_PREVLENSIZE(p, prevlensize);
        ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
        q = p + prevlensize + lensize;

        if (skipcnt == 0) {
            /* Compare current entry with specified entry */
            if (ZIP_IS_STR(encoding)) {  // 如果编码为字符串 
                // 判断长度相同 并且 
                if (len == vlen && memcmp(q, vstr, vlen) == 0) {
                    return p;
                }
            } else {
                /* Find out if the searched field can be encoded. Note that
                 * we do it only the first time, once done vencoding is set
                 * to non-zero and vll is set to the integer value. */
                if (vencoding == 0) {
                    // 首次比对时,对传入值进行解码
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        /* If the entry can't be encoded we set it to
                         * UCHAR_MAX so that we don't retry again the next
                         * time. */
                        vencoding = UCHAR_MAX;
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }

                /* Compare current entry with specified entry, do it only
                 * if vencoding != UCHAR_MAX because if there is no encoding
                 * possible for the field it can't be a valid integer. */
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, encoding);
                    if (ll == vll) {
                        return p;
                    }
                }
            }

            /* Reset skip count */
            skipcnt = skip;
        } else {
            // 跳过entry
            skipcnt--;
        }

        // 指向下一个entry 
        p = q + len;
    }

    return NULL;
}

当我们往 ziplist 中插入数据时,ziplist 就会根据数据是字符串还是整数,以及它们的大小进行不同的编码。

ziplist 是如何进行编码呢?

ziplist 列表项包括三部分内容,分别是前一项的长度(prevlen)、当前项长度信息的编码结果(encoding),以及当前项的实际数据(data)。
下面的图展示了列表项的结构

(5) 总结

1、ziplist 设计的初衷就是「节省内存」,在存储数据时,把内存利用率发挥到了极致:

  • 数字按「整型」编码存储,比直接当字符串存内存占用少
  • 数据「长度」字段,会根据内容的大小选择最小的长度编码
  • 甚至对于极小的数据,干脆把内容直接放到了「长度」字段中(前几个位表示长度,后几个位存数据)

2、但 ziplist 的劣势也很明显:

  • 寻找元素只能挨个遍历,存储过长数据,查询性能很低
  • 每个元素中保存了「上一个」元素的长度(为了方便反向遍历),这会导致上一个元素内容发生修改,长度超过了原来的编码长度,下一个元素的内容也要跟着变,重新分配内存,进而就有可能再次引起下一级的变化,一级级更新下去,频繁申请内存

3、想要缓解 ziplist 的问题,比较简单直接的方案就是,多个数据项,不再用一个 ziplist 来存,而是分拆到多个 ziplist 中,每个 ziplist 用指针串起来,这样修改其中一个数据项,即便发生级联更新,也只会影响这一个 ziplist,其它 ziplist 不受影响,这种方案就是 quicklist:

qucklist: ziplist1(也叫quicklistNode) <-> ziplist2 <-> ziplist3 <-> …

4、List 数据类型底层实现,就是用的 quicklist,因为它是一个链表,所以 LPUSH/LPOP/RPUSH/RPOP 的复杂度是 O(1)

5、List 中每个 ziplist 节点可以存的元素个数/总大小,可以通过 list-max-ziplist-size 配置:

  • 正数:ziplist 最多包含几个数据项
  • 负数:取值 -1 ~ -5,表示每个 ziplist 存储最大的字节数,默认 -2,每个ziplist 8KB

ziplist 超过上述任一配置,添加新元素就会新建 ziplist 插入到链表中。

6、List 因为更多是两头操作,为了节省内存,还可以把中间的 ziplist「压缩」,具体可看 list-compress-depth 配置项,默认配置不压缩

7、要想彻底解决 ziplist 级联更新问题,本质上要修改 ziplist 的存储结构,也就是不要让每个元素保存「上一个」元素的长度即可,所以才有了 listpack

8、listpack 每个元素项不再保存上一个元素的长度,而是优化元素内字段的顺序,来保证既可以从前也可以向后遍历

9、listpack 是为了替代 ziplist 为设计的,但因为 List/Hash/ZSet 都严重依赖 ziplist,所以这个替换之路很漫长,目前只有 Stream 数据类型用到了 listpack

参考资料

[1] Redis设计与实现-压缩列表
[2] Redis源码剖析与实战 - 04 内存友好的数据结构该如何细化设计?
[3] Redis源码剖析与实战 - 06 | 从ziplist到quicklist,再到listpack的启发
[4] redis-ziplist源码-github