Redis 数据结构 快速列表(quicklist)

Redis3.2版本使用 quicklist 代替了 ziplistlinkedlist.

(1) 快速列表(quicklist)是什么

快速列表(quicklist)是 (压缩列表)ziplist 的 双向链表。

A generic doubly linked quicklist implementation .

快速列表

快速列表(quicklist)是把压缩列表(ziplist)和双向链表结合起来。
每个双链表节点中保存一个ziplist,然后每个ziplist中存一批list中的数据(具体ziplist大小可配置),这样既可以避免大量链表指针带来的内存消耗,也可以避免ziplist更新导致的大量性能损耗。

(2) 为什么要用快速列表(quicklist)

如果一个列表非常大且更新频繁,那就会给redis带来非常大的负担。如何既保留ziplist的空间高效性,又能不让其更新复杂度过高?

ziplist 设计出的紧凑型数据块可以有效利用内存,但在更新上,由于每一个 entry 都保留了前一个 entry 的 prevlen 长度,因此在插入或者更新时可能会出现连锁更新,这是一个影响效率的大问题。
因此,接着又设计出 「链表 + ziplist」组成的 quicklist 结构来避免单个 ziplist 过大,可以有效降低连锁更新的影响面。

(2.1) 快速列表的特点

  1. 节省内存。
  2. 相比压缩列表,在节点数比较多时操作不会对内存压力太大。 (分而治之)

quicklist 结构在 ziplist 基础上,使用链表将 ziplist 串联起来,链表的每个元素就是一个 ziplist。这种设计减少了数据插入时内存空间的重新分配,以及内存数据的拷贝。同时,quicklist 限制了每个节点上 ziplist 的大小,一旦一个 ziplist 过大,就会采用新增 quicklist 节点的方法。

从 ziplist 到 quicklist,再到 listpack 结构;可以看到,初衷都是设计一款数据结构能够高效的使用内存。


(3) 快速列表的原理

quicklist 的设计,其实是结合了链表和 ziplist 各自的优势。

为什么不全部节点都压缩,而是流出compress这个可配置的口子呢?
其实从统计数据来看,列表两端的数据变更最为频繁,像lpush,rpush,lpop,rpop等命令都是在两端操作,如果频繁压缩或解压缩会代码不必要的性能损耗。

可以看出,redis其实并不是一味追求性能,也在内存占用性能之间做取舍。


(4) 源码解读

源码: https://github.com/redis/redis/blob/6.0/src/quicklist.c

(4.1) 快速列表(quicklist)结构定义

// file: quicklist.h

/* 
 *  快速列表是一个40字节的结构体(在64位系统) 
 * 'count' 是总entry数。
 * 'len' 是快速链表的个数。 
 * 'compress' : 0:禁用压缩,否则是在quicklist末尾未压缩的quicklistNodes的数量。
 * 'fill' 是用户请求的(或默认的)填充因子。
 * 'bookmakrs 是 realloc 这个结构使用的可选功能,这样它们在不使用时就不会消耗内存。
 */
typedef struct quicklist {
    quicklistNode *head;  // 头结点
    quicklistNode *tail;  // 尾结点
    unsigned long count;        // 所有ziplist中的总元素个数 /* total count of all entries in all ziplists */
    unsigned long len;          // quicklistNodes个数
    int fill : QL_FILL_BITS;    // 单个节点的填充因子
    unsigned int compress : QL_COMP_BITS;  // 0:不压缩  /* depth of end nodes not to compress;0=off */
    unsigned int bookmark_count: QL_BM_BITS;  //  
    quicklistBookmark bookmarks[];
} quicklist;

quicklist 作为一个链表结构,在它的数据结构中,是定义了整个 quicklist 的头、尾指针,
可以快速定位到 quicklist 的链表头和链表尾。

quicklist 中还定义了 quicklistNode 的个数、所有 ziplist 的总元素个数等属性。

fill字段,它的含义是每个quicknode的节点最大容量,不同的数值有不同的含义,默认是-2,当然也可以配置为其他数值,具体数值含义如下:
-1: 每个quicklistNode节点的ziplist所占字节数不能超过4kb。(建议配置)
-2: 每个quicklistNode节点的ziplist所占字节数不能超过8kb。(默认配置&建议配置)
-3: 每个quicklistNode节点的ziplist所占字节数不能超过16kb。
-4: 每个quicklistNode节点的ziplist所占字节数不能超过32kb。
-5: 每个quicklistNode节点的ziplist所占字节数不能超过64kb。
任意正数: 表示:ziplist结构所最多包含的entry个数,最大为215215。

// file: quicklist.h

/* 
 * quicklistNode 是一个32 字节的结构体,描述了包含ziplist的quicklist。
 * 我们使用位字段将 quicklistNode 保持在 32 字节。
 * count: 16 bits,最大 65536 (最大 zl 字节为 65k,因此最大计数实际上 < 32k)。
 * encoding: 2 bits,RAW=1,LZF=2。
 * container: 2 bits,NONE=1,ZIPLIST=2。
 * recompress: 1 bit,bool,如果节点临时解压缩以供使用,则为 true。
 * attempted_compress: 1 bit,boolean,用于测试时验证。
 * extra: 10 bits,供将来使用; 填充 32 位的剩余部分
 */
typedef struct quicklistNode {
    struct quicklistNode *prev;  // 前一个quicklistNode
    struct quicklistNode *next;  // 后一个quicklistNode
    unsigned char *zl;           // 未压缩时quicklistNode->zl指向ziplist,压缩时quicklistNode->zl指向quicklistLZF
    unsigned int sz;             // ziplist的字节大小
    unsigned int count : 16;     // ziplist中的元素个数 
    unsigned int encoding : 2;   // 编码格式,原生字节数组或压缩存储  RAW==1 or LZF==2  
    unsigned int container : 2;  // 存储方式  NONE==1 or ZIPLIST==2 
    unsigned int recompress : 1; // 数据是否被压缩  
    unsigned int attempted_compress : 1;  // 数据能否被压缩 /* node can't compress; too small */
    unsigned int extra : 10;     // 预留的bit位
} quicklistNode;
/*
 * 书签的作用是用于加速查询速度?
 *  
 * 书签在 quicklist 结构的末尾用 realloc 填充。
 * 如果数千个节点的额外内存使用可以忽略不计,那么它们应该只用于非常大的列表,并且确实需要分段迭代它们。
 * 当不使用时,它们不会增加任何内存开销,但是当使用然后删除时,一些开销仍然存在(以避免共振)。
 * 使用的书签数量应保持在最低限度,因为它还会增加节点删除的开销(搜索要更新的书签)。
 */
typedef struct quicklistBookmark {
    quicklistNode *node;
    char *name;
} quicklistBookmark;

书签的作用是用于比较大(上千)的列表,方便分段处理。

/* 
 * quicklistLZF 是一个 4+N 字节的结构,包含“sz”,后跟“compressed”。
 * 'sz' 是 'compressed' 字段的字节长度。
 * 'compressed' 是总(压缩)长度为 'sz' 的 LZF 数据
 * 注意:未压缩的长度存储在 quicklistNode->sz 中。
 * quicklistNode->zl压缩时,node->zl指向一个quicklistLZF
 */
typedef struct quicklistLZF {
    unsigned int sz;  // LZF的字节大小
    char compressed[];  // 压缩后的字节数组
} quicklistLZF;

也正因为 quicklist 采用了链表结构,所以当插入一个新的元素时,quicklist 首先就会检查插入位置的 ziplist 是否能容纳该元素,这是通过 _quicklistNodeAllowInsert 函数来完成判断的。

_quicklistNodeAllowInsert 函数会计算新插入元素后的大小(new_sz),这个大小等于 quicklistNode 的当前大小(node->sz)、插入元素的大小(sz),以及插入元素后 ziplist 的 prevlen 占用大小。

在计算完大小之后,_quicklistNodeAllowInsert 函数会依次判断新插入的数据大小(sz)是否满足要求,即单个 ziplist 是否不超过 8KB,或是单个 ziplist 里的元素个数是否满足要求。

只要这里面的一个条件能满足,quicklist 就可以在当前的 quicklistNode 中插入新元素,否则 quicklist 就会新建一个 quicklistNode,以此来保存新插入的元素。

(4.2) 功能列表

函数 功能 备注
quicklistCreate 创建快速链表 时间复杂度O(1)
quicklistNew 新建快速链表(带参数) 时间复杂度O(1)
quicklistPushHead 新增节点 时间复杂度O(1)
quicklistPushTail 新增节点 时间复杂度O(1)
quicklistDelEntry 删除节点
quicklistSetCompressDepth 设置压缩深度
quicklistSetFill 设置
quicklistSetOptions 设置参数
quicklistRelease
quicklistPush

(4.3) 新建快速列表

// file: src/quicklist.c

/* 
 * 创建一个快速列表
 * 使用 quicklistRelease() 释放内存
 */
quicklist *quicklistCreate(void) {
    
    struct quicklist *quicklist;

    // 分配内存
    quicklist = zmalloc(sizeof(*quicklist));
    // 头结点 尾结点 设置默认值null
    quicklist->head = quicklist->tail = NULL;
    // 设置默认长度0
    quicklist->len = 0;
    // 设置默认元素个数0
    quicklist->count = 0;
    // 设置默认
    quicklist->compress = 0;
    // 
    quicklist->fill = -2;
    //
    quicklist->bookmark_count = 0;
    return quicklist;
}

新建快速列表带参数

// file: src/quicklist.c

/*
 * 创建快速列表带默认参数
 */
quicklist *quicklistNew(int fill, int compress) {

    // 创建快速链表
    quicklist *quicklist = quicklistCreate();
    // 设置参数
    quicklistSetOptions(quicklist, fill, compress);
    return quicklist;
}

(4.4) 快速链表头部添加节点-quicklistPushHead

// file: src/quicklist.c

/* 
 * 快速列表头部添加节点
 *
 * 返回0时 使用已存在头节点 
 * 返回1时 使用新创建的头结点 
 *
 * @param *quicklist
 * @param *value
 * @param sz
 */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    // 原来的头结点
    quicklistNode *orig_head = quicklist->head;
    // 确保
    assert(sz < UINT32_MAX);  // TODO 添加对sds编码(未压缩)快速列表节点的支持 
    // 
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        // 头节点(quicklistNode)里的压缩列表添加新元素
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        // 更新头结点元素个数    
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        // 创建一个新节点(quicklistNode)
        quicklistNode *node = quicklistCreateNode();
        // 新创建一个压缩列表,添加值,然后node->zl指向压缩列表
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

        // 更新节点元素个数
        quicklistNodeUpdateSz(node);
        // 
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    // 所有节点个数+1
    quicklist->count++;
    // 头结点(quicklistNode)里的压缩列表(ziplist)元素个数+1 
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}

(4.5) 快速列表尾部插入节点

// file: src/quicklist.c

/* 
 * 快速列表尾部添加节点
 *
 * 返回0时 使用已存在头节点 
 * 返回1时 使用新创建的头结点 
 *
 * @param *quicklist
 * @param *value
 * @param sz
 */
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
    // 尾结点
    quicklistNode *orig_tail = quicklist->tail;
    // 校验
    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */

    if (likely(
            _quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
        // 头节点(quicklistNode)里的压缩列表添加新元素      
        quicklist->tail->zl =
            ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
        // 更新头结点元素个数 
        quicklistNodeUpdateSz(quicklist->tail);
    } else {

        // 新建快速列表节点
        quicklistNode *node = quicklistCreateNode();
        // 新创建一个压缩列表,添加值,然后node->zl指向压缩列表
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
        // 更新节点元素个数
        quicklistNodeUpdateSz(node);
        // 
        _quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
    }

    // 所有节点个数+1
    quicklist->count++;
    // 尾结点(quicklistNode)里的压缩列表(ziplist)元素个数+1 
    quicklist->tail->count++;
    return (orig_tail != quicklist->tail);
}

(4.6) 插入节点

/* Insert a new entry before or after existing entry 'entry'.
 *
 * If after==1, the new value is inserted after 'entry', otherwise
 * the new value is inserted before 'entry'. */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
                                   void *value, const size_t sz, int after) {
    int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
    int fill = quicklist->fill;
    quicklistNode *node = entry->node;
    quicklistNode *new_node = NULL;
    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */

    if (!node) {
        /* we have no reference node, so let's create only node in the list */
        D("No node given!");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        __quicklistInsertNode(quicklist, NULL, new_node, after);
        new_node->count++;
        quicklist->count++;
        return;
    }

    /* Populate accounting flags for easier boolean checks later */
    if (!_quicklistNodeAllowInsert(node, fill, sz)) {
        D("Current node is full with count %d with requested fill %lu",
          node->count, fill);
        full = 1;
    }

    if (after && (entry->offset == node->count)) {
        D("At Tail of current ziplist");
        at_tail = 1;
        if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
            D("Next node is full too.");
            full_next = 1;
        }
    }

    if (!after && (entry->offset == 0)) {
        D("At Head");
        at_head = 1;
        if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
            D("Prev node is full too.");
            full_prev = 1;
        }
    }

    /* Now determine where and how to insert the new element */
    if (!full && after) {
        D("Not full, inserting after current position.");
        quicklistDecompressNodeForUse(node);
        unsigned char *next = ziplistNext(node->zl, entry->zi);
        if (next == NULL) {
            node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
        } else {
            node->zl = ziplistInsert(node->zl, next, value, sz);
        }
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);
    } else if (!full && !after) {
        D("Not full, inserting before current position.");
        quicklistDecompressNodeForUse(node);
        node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);
    } else if (full && at_tail && node->next && !full_next && after) {
        /* If we are: at tail, next has free space, and inserting after:
         *   - insert entry at head of next node. */
        D("Full and tail, but next isn't full; inserting next node head");
        new_node = node->next;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && at_head && node->prev && !full_prev && !after) {
        /* If we are: at head, previous has free space, and inserting before:
         *   - insert entry at tail of previous node. */
        D("Full and head, but prev isn't full, inserting prev node tail");
        new_node = node->prev;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && ((at_tail && node->next && full_next && after) ||
                        (at_head && node->prev && full_prev && !after))) {
        /* If we are: full, and our prev/next is full, then:
         *   - create new node and attach to quicklist */
        D("\tprovisioning new node...");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
    } else if (full) {
        /* else, node is full we need to split it. */
        /* covers both after and !after cases */
        D("\tsplitting node...");
        quicklistDecompressNodeForUse(node);
        new_node = _quicklistSplitNode(node, entry->offset, after);
        new_node->zl = ziplistPush(new_node->zl, value, sz,
                                   after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
        _quicklistMergeNodes(quicklist, node);
    }

    quicklist->count++;
}

(4.7) 删除节点-quicklistDelEntry

/* Delete one element represented by 'entry'
 *
 * 'entry' stores enough metadata to delete the proper position in
 * the correct ziplist in the correct quicklist node. */
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
    quicklistNode *prev = entry->node->prev;
    quicklistNode *next = entry->node->next;
    int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
                                         entry->node, &entry->zi);

    /* after delete, the zi is now invalid for any future usage. */
    iter->zi = NULL;

    /* If current node is deleted, we must update iterator node and offset. */
    if (deleted_node) {
        if (iter->direction == AL_START_HEAD) {
            iter->current = next;
            iter->offset = 0;
        } else if (iter->direction == AL_START_TAIL) {
            iter->current = prev;
            iter->offset = -1;
        }
    }
    /* else if (!deleted_node), no changes needed.
     * we already reset iter->zi above, and the existing iter->offset
     * doesn't move again because:
     *   - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1
     *   - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0
     *  if we deleted the last element at offet N and now
     *  length of this ziplist is N-1, the next call into
     *  quicklistNext() will jump to the next node. */
}

(4.8) 快速链表范围删除-quicklistDelRange

// file: src/quicklist.c

/*  
 * 从快速列表中删除一系列元素。
 * 
 * 元素可能跨越多个 quicklistNodes,所以我们必须小心跟踪我们的开始和结束位置。
 * 
 * 如果条目被删除则返回 1,如果没有删除则返回 0。
 *
 * @param *quicklist  快速列表指针
 * @param start  开始
 * param count  个数
 */
int quicklistDelRange(quicklist *quicklist, const long start,
                      const long count) {
    if (count <= 0)
        return 0;

    // 要删除的范围 / 要删除的个数  // 范围包括起始位置
    unsigned long extent = count;

    // 删除范围大小处理
    if (start >= 0 && extent > (quicklist->count - start)) {
        // 如果请求删除的元素多于存在的元素,则限制删除范围最大为列表大小。
        extent = quicklist->count - start;
    } else if (start < 0 && extent > (unsigned long)(-start)) {
        // 否则,如果处于负偏移量,则将最大大小限制为列表的其余部分。  
        // 删除范围 从 -start(start<0) 到 链表结尾
        extent = -start; /* c.f. LREM -29 29; just delete until end. */
    }

    // 
    quicklistEntry entry;
    if (!quicklistIndex(quicklist, start, &entry))
        return 0;

    D("Quicklist delete request for start %ld, count %ld, extent: %ld", start,
      count, extent);

    // 节点  
    quicklistNode *node = entry.node;

    // 遍历下一个节点,直到删除所有内容。
    // 遍历链表
    while (extent) {  // 当event>0
        // 链表下一个节点
        quicklistNode *next = node->next;

        // 删除的压缩列表节的点个数
        unsigned long del;
        // 是否删除整个(quicklistNode)节点  0:否  1:是
        int delete_entire_node = 0;
        if (entry.offset == 0 && extent >= node->count) {
            // 如果我们要删除的数量超过这个节点的数量,我们可以删除整个(quicklistNode)节点而不用计算。 
            delete_entire_node = 1;
            del = node->count;
        } else if (entry.offset >= 0 && extent >= node->count) {
            // 如果在此之后删除更多节点,则根据当前节点的大小计算删除。
            del = node->count - entry.offset;
        } else if (entry.offset < 0) {
            // 如果 offset 为负,则我们处于此循环的第一次运行中,我们将删除从该起始偏移量到列表末尾的整个范围。 
            // 由于Negative offset是到链表尾部的元素个数,直接作为删除计数即可。 
            del = -entry.offset;

            // 如果正偏移量大于剩余范围,我们只删除剩余范围,而不是整个偏移量。 
            if (del > extent)
                del = extent;
        } else {
            // 否则,我们将删除小于该节点的范围,因此直接使用范围。 
            del = extent;
        }

        D("[%ld]: asking to del: %ld because offset: %d; (ENTIRE NODE: %d), "
          "node count: %u",
          extent, del, entry.offset, delete_entire_node, node->count);

        if (delete_entire_node) {  // 删除了整个快速链表节点 
            __quicklistDelNode(quicklist, node);  // 删除(quicklistNode)节点
        } else {
            quicklistDecompressNodeForUse(node);
            // 压缩列表范围删除
            node->zl = ziplistDeleteRange(node->zl, entry.offset, del);
            // 快速列表节点更新大小
            quicklistNodeUpdateSz(node);
            // quicklistNode更新压缩列表节点个数
            node->count -= del;
            // quicklist更新压缩列表节点个数
            quicklist->count -= del;
            // 删除空节点
            quicklistDeleteIfEmpty(quicklist, node);
            if (node)
                quicklistRecompressOnly(quicklist, node);  // 压缩
        }

        // 更新剩余要删除的个数
        extent -= del;

        // 更新下一个节点
        node = next;

        entry.offset = 0;
    }
    return 1;
}

(4.x) 快速链表操作

(4.x.1) quicklistSetCompressDepth

// file: src/quicklist.c

#define COMPRESS_MAX ((1 << QL_COMP_BITS)-1)

/*
 * 
 */
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
    //
    if (compress > COMPRESS_MAX) {
        compress = COMPRESS_MAX;
    } else if (compress < 0) {
        compress = 0;
    }
    // 设置
    quicklist->compress = compress;
}

(4.x.2) quicklistSetFill

#define FILL_MAX ((1 << (QL_FILL_BITS-1))-1)

void quicklistSetFill(quicklist *quicklist, int fill) {
    if (fill > FILL_MAX) {
        fill = FILL_MAX;
    } else if (fill < -5) {
        fill = -5;
    }
    quicklist->fill = fill;
}

(4.x.3) quicklistSetOptions

/*
 * 
 */
void quicklistSetOptions(quicklist *quicklist, int fill, int depth) {
    quicklistSetFill(quicklist, fill);
    quicklistSetCompressDepth(quicklist, depth);
}

(4.x.4) 快速链表释放-quicklistRelease

// file: src/quicklist.c

/* Free entire quicklist. */
void quicklistRelease(quicklist *quicklist) {
    unsigned long len;
    quicklistNode *current, *next;

    // 
    current = quicklist->head;
    len = quicklist->len;
    // 遍历快速链表
    while (len--) {
        // 下一个节点
        next = current->next;

        // 释放quicklistNode指向的ziplist
        zfree(current->zl);
        // 更新entry个数
        quicklist->count -= current->count;

        // 释放当前节点
        zfree(current);

        // 更新链表长度
        quicklist->len--;

        // 更新下一个阶段
        current = next;
    }

    // 
    quicklistBookmarksClear(quicklist);

    // 释放快速链表内存
    zfree(quicklist);
}

(4.x.5) 快速列表压缩-__quicklistCompress

/* Force 'quicklist' to meet compression guidelines set by compress depth.
 * The only way to guarantee interior nodes get compressed is to iterate
 * to our "interior" compress depth then compress the next node we find.
 * If compress depth is larger than the entire list, we return immediately. */
REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
                                      quicklistNode *node) {
    /* If length is less than our compress depth (from both sides),
     * we can't compress anything. */
    if (!quicklistAllowsCompression(quicklist) ||
        quicklist->len < (unsigned int)(quicklist->compress * 2))
        return;

#if 0
    /* Optimized cases for small depth counts */
    if (quicklist->compress == 1) {
        quicklistNode *h = quicklist->head, *t = quicklist->tail;
        quicklistDecompressNode(h);
        quicklistDecompressNode(t);
        if (h != node && t != node)
            quicklistCompressNode(node);
        return;
    } else if (quicklist->compress == 2) {
        quicklistNode *h = quicklist->head, *hn = h->next, *hnn = hn->next;
        quicklistNode *t = quicklist->tail, *tp = t->prev, *tpp = tp->prev;
        quicklistDecompressNode(h);
        quicklistDecompressNode(hn);
        quicklistDecompressNode(t);
        quicklistDecompressNode(tp);
        if (h != node && hn != node && t != node && tp != node) {
            quicklistCompressNode(node);
        }
        if (hnn != t) {
            quicklistCompressNode(hnn);
        }
        if (tpp != h) {
            quicklistCompressNode(tpp);
        }
        return;
    }
#endif

    /* Iterate until we reach compress depth for both sides of the list.a
     * Note: because we do length checks at the *top* of this function,
     *       we can skip explicit null checks below. Everything exists. */
    quicklistNode *forward = quicklist->head;
    quicklistNode *reverse = quicklist->tail;
    int depth = 0;
    int in_depth = 0;
    while (depth++ < quicklist->compress) {
        quicklistDecompressNode(forward);
        quicklistDecompressNode(reverse);

        if (forward == node || reverse == node)
            in_depth = 1;

        if (forward == reverse)
            return;

        forward = forward->next;
        reverse = reverse->prev;
    }

    if (!in_depth)
        quicklistCompressNode(node);

    if (depth > 2) {
        /* At this point, forward and reverse are one node beyond depth */
        quicklistCompressNode(forward);
        quicklistCompressNode(reverse);
    }
}

(4.y) 快速链表节点(quicklistNode)操作

(4.y.1) 创建节点-quicklistCreateNode

// file: src/quicklist.c

/*
 * 创建节点
 */
REDIS_STATIC quicklistNode *quicklistCreateNode(void) {
    quicklistNode *node;
    node = zmalloc(sizeof(*node));
    // 
    node->zl = NULL;
    node->count = 0;
    // 压缩列表占用字节大小为0
    node->sz = 0; 
    node->next = node->prev = NULL;
    // 编码使用原生字节数组
    node->encoding = QUICKLIST_NODE_ENCODING_RAW;
    // 容器格式使用压缩列表
    node->container = QUICKLIST_NODE_CONTAINER_ZIPLIST;
    // 压缩
    node->recompress = 0;
    return node;
}

(4.y.2) _quicklistNodeAllowInsert

/*
 * 
 * @param *node 快速链表节点
 * @param fill 
 * @param sz  新插入的数据大小 
 */
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
                                           const int fill, const size_t sz) {
    if (unlikely(!node))
        return 0;

    // 
    int ziplist_overhead;
  
    // 前一个节点的长度 占用的内存大小
    // size of previous offset 
    if (sz < 254)
        ziplist_overhead = 1;  // 1字节
    else
        ziplist_overhead = 5;  // 5字节

    // 压缩列表节点编码占用内存大小 1/2/5字节
    // size of forward offset
    if (sz < 64)
        ziplist_overhead += 1;
    else if (likely(sz < 16384))  // 2^14 = 16384
        ziplist_overhead += 2;
    else
        ziplist_overhead += 5;

    // 需要新增的内存 sz(压缩列表节点占用内存大小) + ziplist_overhead(压缩列表)
    // 如果"sz"编码为整数类型,new_sz 会高估
    unsigned int new_sz = node->sz + sz + ziplist_overhead;
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
        return 1; // 当返回 1 时,我们知道限制是大小限制 < 8192
    else if (!sizeMeetsSafetyLimit(new_sz))
        return 0;
    else if ((int)node->count < fill)
        return 1;
    else
        return 0;
}

#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)


/* 
 * ziplist 的最大大小(以字节为单位)。
 * 较大的值将存在于它们自己独立的 ziplist 中。
 * 仅当我们受记录数限制时才使用。 当我们受大小限制时,最大限制更大,但仍然安全。 
 * 8k 是推荐/默认大小限制
 */
#define SIZE_SAFETY_LIMIT 8192

(4.y.3) 快速列表所有entry个数-quicklistCount

/* Return cached quicklist count */
unsigned long quicklistCount(const quicklist *ql) { return ql->count; }

(4.y.4) 快速链表压缩节点-quicklistCompressNode

/* 
 * 仅压缩未压缩的节点。
 * 
 * 节点的编码是 QUICKLIST_NODE_ENCODING_RAW 格式的,压缩节点
 */
#define quicklistCompressNode(_node)                                           \
    do {                                                                       \
        if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_RAW) {     \
            __quicklistCompressNode((_node));                                  \
        }                                                                      \
    } while (0)
/* 
 * 压缩"节点"中的 ziplist 并更新编码细节。
 * 如果 ziplist 压缩成功则返回 1。
 * 如果压缩失败或 ziplist 太小而无法压缩,则返回 0。
 */
REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
#ifdef REDIS_TEST
    node->attempted_compress = 1;
#endif

    // 如果压缩列表占用内存大小 < 48字节
    if (node->sz < MIN_COMPRESS_BYTES)
        return 0;

    // ziplist压缩后的格式
    quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz);

    /* Cancel if compression fails or doesn't compress small enough */
    // 如果压缩失败或压缩不够小则取消
    if (((lzf->sz = lzf_compress(node->zl, node->sz, lzf->compressed,
                                 node->sz)) == 0) ||
        lzf->sz + MIN_COMPRESS_IMPROVE >= node->sz) {
        // 如果值不可压缩,lzf_compress 中止/拒绝压缩。
        zfree(lzf);
        return 0;
    }

    // 重新申请内存
    lzf = zrealloc(lzf, sizeof(*lzf) + lzf->sz);
    // 释放 node->zl对应的ziplist内存
    zfree(node->zl);
    // node->zl指向 压缩格式的 quicklistLZF
    node->zl = (unsigned char *)lzf;
    // 更新节点编码为 ENCODING_LZF
    node->encoding = QUICKLIST_NODE_ENCODING_LZF;
    // 重新压缩 = false  
    node->recompress = 0;
    return 1;
}

(4.y.5) 解压缩-__quicklistDecompressNode

/* 
 * 解压缩ziplist 并且 更新编码细节
 * 解压成功返回1  失败返回0
 */
REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) {
#ifdef REDIS_TEST
    node->attempted_compress = 0;
#endif

    // 解压缩后的ziplist
    void *decompressed = zmalloc(node->sz);
    quicklistLZF *lzf = (quicklistLZF *)node->zl;

    // 解压缩失败
    if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) {
        // 解压失败,释放申请的内存
        zfree(decompressed);
        return 0;
    }

    // 释放压缩节点lzf
    zfree(lzf);

    // node->zl指向压缩列表
    node->zl = decompressed;
    // 编码格式使用原生字节
    node->encoding = QUICKLIST_NODE_ENCODING_RAW;
    return 1;
}

按索引删除节点-quicklistDelIndex

// file: src/quicklist.c

/* 
 * 删除快速列表(quicklist)给定的压缩列表节点(quicklistNode)里指向的压缩列表(ziplist)里的一个值
 * quicklistNode = quicklist->head->next (某个next) 
 * ziplist = quicklistNode->zl
 * 
 * 注意:quicklistDelIndex() *需要*未压缩的节点,因为您已经必须从某个地方的未压缩节点获取 *p。
 *
 * 如果整个节点被删除则返回 1,如果节点仍然存在则返回 0。
 * 还使用 ziplist 中的下一个偏移量更新输入/输出参数"p"。
 */
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
                                   unsigned char **p) {
    int gone = 0;

    // 删除压缩列表里的指定值
    node->zl = ziplistDelete(node->zl, p);
    // 更新压缩列表节点里entry个数
    node->count--;

    if (node->count == 0) { // 节点个数=0
        gone = 1;
        // 快速列表(quicklist)删除节点(quicklistNode)
        __quicklistDelNode(quicklist, node);
    } else {
        // 快速列表 更新大小
        quicklistNodeUpdateSz(node);
    }
    // 更新entry个数
    quicklist->count--;
    
    return gone ? 1 : 0;
}

() 快速链表删除节点-__quicklistDelNode

和链表删除节点类似

// file: src/quicklist.c

/*
 * 快速链表删除节点
 */
REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
                                     quicklistNode *node) {
    /* Update the bookmark if any */
    // 
    quicklistBookmark *bm = _quicklistBookmarkFindByNode(quicklist, node);
    if (bm) {
        // 更新
        bm->node = node->next;
        /* if the bookmark was to the last node, delete it. */
        // 
        if (!bm->node)
            _quicklistBookmarkDelete(quicklist, bm);
    }

    // 当前节点断开链接
    if (node->next)
        node->next->prev = node->prev; // 下一个节点的前驱节点 = 当前节点的前驱节点
    if (node->prev)
        node->prev->next = node->next; // 前一个节点的后继节点 = 当前阶段的后继节点

    if (node == quicklist->tail) {
        // 更新尾结点
        quicklist->tail = node->prev;
    }

    if (node == quicklist->head) {
        // 更新头结点
        quicklist->head = node->next;
    }

    // 如果我们在压缩深度内删除了一个节点,我们现在有需要解压缩的压缩节点。
    __quicklistCompress(quicklist, NULL);

    // 更新快速链表节点个数
    quicklist->count -= node->count;

    // 释放ziplist
    zfree(node->zl);
    // 释放quicklistNode
    zfree(node);
    // 更新快去链表里的quicklistNode个数
    quicklist->len--;
}

(4.z) 快速列表里的书签

bookmarks[] 的作用是?

(4.z.1) quicklistBookmarkCreate

/* Create or update a bookmark in the list which will be updated to the next node
 * automatically when the one referenced gets deleted.
 * Returns 1 on success (creation of new bookmark or override of an existing one).
 * Returns 0 on failure (reached the maximum supported number of bookmarks).
 * NOTE: use short simple names, so that string compare on find is quick.
 * NOTE: bookmakrk creation may re-allocate the quicklist, so the input pointer
         may change and it's the caller responsibilty to update the reference.
 */
int quicklistBookmarkCreate(quicklist **ql_ref, const char *name, quicklistNode *node) {
    quicklist *ql = *ql_ref;
    if (ql->bookmark_count >= QL_MAX_BM)
        return 0;
    quicklistBookmark *bm = _quicklistBookmarkFindByName(ql, name);
    if (bm) {
        bm->node = node;
        return 1;
    }
    ql = zrealloc(ql, sizeof(quicklist) + (ql->bookmark_count+1) * sizeof(quicklistBookmark));
    *ql_ref = ql;
    ql->bookmarks[ql->bookmark_count].node = node;
    ql->bookmarks[ql->bookmark_count].name = zstrdup(name);
    ql->bookmark_count++;
    return 1;
}

(4.z.2) 快速列表书签查找节点-_quicklistBookmarkFindByNode

// file: src/quicklist.c

/*
 * 
 */
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node) {
    unsigned i;
    // 遍历
    for (i=0; i<ql->bookmark_count; i++) {
        // 节点相同
        if (ql->bookmarks[i].node == node) {
            // 返回节点
            return &ql->bookmarks[i];
        }
    }
    return NULL;
}

参考资料

[1] redis-quicklist-github
[2] Redis源码剖析之快速列表(quicklist)