Redis 对象 有序集合 / Sorted Set / zset

有序集合(Sorted Set)是Redis中的一种对象,它本身是集合类型,同时也可以支持集合中的元素带有权重,并按权重排序。

(1) 有序集合(Sorted Set)是什么

/*
 * ZSET 是有序集合,使用两个数据结构来保存相同的元素,以便将 O(log(N)) INSERT 和 REMOVE 操作放入已排序的数据结构中。
 * 
 * 这些元素被添加到将 Redis 对象映射到分数的哈希表中。
 * 同时,将元素添加到将分数映射到 Redis 对象的跳跃列表(因此对象在此“视图”中按分数排序)。
 *
 * 请注意,为了节省内存,表示元素的 SDS 字符串在哈希表和跳表中都是相同的。 
 * 为了更容易地管理共享的 SDS 字符串,我们所做的是仅在 zslFreeNode() 中释放 SDS 字符串。 
 * 字典没有设置无值方法。
 * 所以我们应该总是从字典中删除一个元素,然后再从跳过列表中删除。
 */

(1.1) 有序集合结构

/*
 * 有序集合 zset 结构体
 */
typedef struct zset {
    dict *dict; // 字典
    zskiplist *zsl; // 跳表
} zset;

Sorted Set 同时采用跳表哈希表两个索引结构
利用了跳表高效支持范围查询(如 ZRANGEBYSCORE 操作),以及哈希表高效支持单点查询(如 ZSCORE 操作)的特征
可以在一个数据类型中,同时高效支持范围查询和单点查询,这是单一索引结构比较难达到的效果。

跳表或是哈希表中,各自保存了什么样的数据?
跳表和哈希表保存的数据是如何保持一致的?

(2) 为什么要用有序集合

127.0.0.1:6379> zadd zset_user_1 1 mysql
(integer) 1
127.0.0.1:6379> zadd zset_user_1 2 redis
(integer) 1
127.0.0.1:6379> zadd zset_user_1 3 es
(integer) 1
127.0.0.1:6379> zadd zset_user_1 4 hbase
(integer) 1
127.0.0.1:6379> zadd zset_user_1 5 clickhouse
(integer) 1
127.0.0.1:6379>
127.0.0.1:6379> zrange zset_user_1 0 10
1) "mysql"
2) "redis"
3) "es"
4) "hbase"
5) "clickhouse"
127.0.0.1:6379>
127.0.0.1:6379> zrange zset_user_1 0 10 withscores
 1) "mysql"
 2) "1"
 3) "redis"
 4) "2"
 5) "es"
 6) "3"
 7) "hbase"
 8) "4"
 9) "clickhouse"
10) "5"
127.0.0.1:6379>
127.0.0.1:6379>
127.0.0.1:6379> zrangebyscore zset_user_1 2 4
1) "redis"
2) "es"
3) "hbase"
127.0.0.1:6379>
127.0.0.1:6379> zrangebyscore zset_user_1 2 4 withscores
1) "redis"
2) "2"
3) "es"
4) "3"
5) "hbase"
6) "4"
127.0.0.1:6379>
127.0.0.1:6379>
127.0.0.1:6379> zscore zset_user_1 mysql
"1"
127.0.0.1:6379>
127.0.0.1:6379> zscore zset_user_1 es
"3"
127.0.0.1:6379>
127.0.0.1:6379>

(3) 有序集合原理

为什么 Sorted Set 能同时提供以下两种操作接口,以及它们的复杂度分别是 O(logN)+M 和 O(1) 呢?

ZRANGEBYSCORE:按照元素权重返回一个范围内的元素。
ZSCORE:返回某个元素的权重值。

实际上,这个问题背后的本质是:为什么 Sorted Set 既能支持高效的范围查询,同时还能以 O(1) 复杂度获取元素权重值?

这其实就和 Sorted Set 底层的设计实现有关了。
Sorted Set 能支持范围查询,这是因为它的核心数据结构设计采用了跳表,
而它能以常数复杂度获取元素权重,这是因为它同时采用了哈希表进行索引。

Sorted Set 是如何把这两种数据结构结合在一起的?它们又是如何进行协作的呢?

Sorted Set 采用的双索引的设计思想和实现。

(3.1) Redis有序集合支持的命令

支持的命令见 https://github.com/redis/redis/blob/6.0/src/server.c#L405

(4) 有序集合源码解读

提供的功能

(4.1) 新建有序集合

// file: object.c

/*
 * 创建有序集合对象
 * 底层数据结构 跳表+哈希表
 */
robj *createZsetObject(void) {
    // 为zset结构体分配内存
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;

    // 创建字典
    zs->dict = dictCreate(&zsetDictType,NULL);
    // 创建跳表
    zs->zsl = zslCreate();

    // 创建redisObject对象,类型为有序集合
    o = createObject(OBJ_ZSET,zs);
    // 设置编码方式为 跳表
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}
// file: object.c

/*
 * 创建有序集合压缩列表对象
 * 底层数据结构 压缩列表
 */
robj *createZsetZiplistObject(void) {
    // 创建压缩列表
    unsigned char *zl = ziplistNew();
    // 创建redisObject对象,类型为有序集合
    robj *o = createObject(OBJ_ZSET,zl);
    // 设置编码方式为 压缩列表 
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

(4.2) 往有序集合添加元素-zsetAdd

// file: 

/*
 * 添加新元素或更新排序集中现有元素的分数,无论其编码如何。
 *
 * 标记会更改命令行为。 它们通过整数指针传递,因为该函数将清除标志并用其他标志填充它们以指示不同的条件。
 *
 * 输入标记如下:
 * ZADD_INCR: 将当前元素分数增加'score',而不是更新当前元素分数。 如果该元素不存在,我们假设以前分数为0。
 * ZADD_NX:   仅当元素不存在时才执行操作。
 * ZADD_XX:   仅当元素不存在时才执行操作。
 *
 * 当使用 ZADD_INCR 时,如果 'newscore' 不为 NULL,则元素的新分数存储在 '*newscore' 中。
 *
 * 返回的标记如下:
 * ZADD_NAN:     结果分数不是数字。
 * ZADD_ADDED:   元素已添加(调用前不存在)。 
 * ZADD_UPDATED: 元素分数已更新。
 * ZADD_NOP:     由于 NX 或 XX,未执行任何操作。
 *
 * 返回值:
 * 该函数在成功时返回 1,并设置适当的标志 ADDED 或 UPDATED 以指示操作期间发生的事情
 * (请注意,如果我们使用与以前相同的分数重新添加元素,则不能设置任何标志,或者在这种情况下 使用零增量)。
 *
 * 该函数在出错时返回 0,目前仅当增量产生 NAN 条件时,或者当“score”值自开始以来为 NAN 时。 
 * 
 * 作为添加新元素的副作用的命令可能会将排序集内部编码从 ziplist 转换为 hashtable + skiplist。 
 *
 * 'ele'的内存管理:
 * 该函数不获取“ele”SDS 字符串的所有权,但会在需要时复制它。
 * 
 * 
 * zadd zset_user_1 1 mysql 2 redis 
 * @param *zobj 有序集合对象
 * @param score 分数 对应 1 2
 * @param ele 元素 其实就是 member 对应的是 mysql redis 
 * @param *flags 
 * @param *newscore 
 */
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
    /* 将选项变成简单的检查变量。 */
    int incr = (*flags & ZADD_INCR) != 0;
    int nx = (*flags & ZADD_NX) != 0;
    int xx = (*flags & ZADD_XX) != 0;
    *flags = 0; /* We'll return our response flags. */
    double curscore;

    /* NaN as input is an error regardless of all the other parameters. */
    // 参数校验
    if (isnan(score)) {
        *flags = ZADD_NAN;
        return 0;
    }

    /* 根据编码更新有序集合。 */
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { // 编码是压缩列表
        unsigned char *eptr;

        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *flags |= ZADD_NOP;
                return 1;
            }

            /* Prepare the score for the increment if needed. */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *flags |= ZADD_NAN;
                    return 0;
                }
                if (newscore) *newscore = score;
            }

            /* Remove and re-insert when score changed. */
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *flags |= ZADD_UPDATED;
            }
            return 1;
        } else if (!xx) {
            /* check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
                sdslen(ele) > server.zset_max_ziplist_value ||
                !ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
            {
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            } else {
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                if (newscore) *newscore = score;
                *flags |= ZADD_ADDED;
                return 1;
            }
        } else {
            *flags |= ZADD_NOP;
            return 1;
        }
    }

    /* 请注意,上面的块处理ziplist将返回 或 转换键为skiplist。 
     * Note that the above block handling ziplist would have either returned or converted the key to skiplist. 
     */

    if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { // 编码是跳表
        // 有序集合
        zset *zs = zobj->ptr;
        // 跳表头节点
        zskiplistNode *znode;
        // 字典
        dictEntry *de;

        // ele是member  // zscore key member 的时候获取score 用到了这儿
        // 从字典里根据key查到的节点
        de = dictFind(zs->dict,ele);
        if (de != NULL) { // 节点已存在
            /* NX? Return, same element already exists. */
            if (nx) {
                *flags |= ZADD_NOP;
                return 1;
            }
            // 获取节点的值 也就是 分数
            curscore = *(double*)dictGetVal(de);

            /* 如果需要,为增量准备分数。 */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *flags |= ZADD_NAN;
                    return 0;
                }
                // 更新分值
                if (newscore) *newscore = score;
            }

            /* 当分数改变时移除并重新插入。 */
            if (score != curscore) {
                // 
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                /* 请注意,我们没有从表示已排序集合的哈希表中删除原始元素,因此我们只更新分数。 */
                dictGetVal(de) = &znode->score; // 更新分数指针
                // 更新标记
                *flags |= ZADD_UPDATED;
            }
            return 1;
        } else if (!xx) {
            ele = sdsdup(ele);
            znode = zslInsert(zs->zsl,score,ele);
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *flags |= ZADD_ADDED;
            if (newscore) *newscore = score;
            return 1;
        } else {
            *flags |= ZADD_NOP;
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

(6) zxxCommand请求源码解析

(6.1) zadd命令源码解析

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}
/*
 * This generic command implements both ZADD and ZINCRBY. 
 * 
 * zadd zset_user_1 1 mysql
 */
void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    // 输入的key  
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements;
    int scoreidx = 0;
    /* 以下变量用于跟踪命令在执行期间实际执行的操作、回复客户端并触发键空间更改通知。 */
    int added = 0;      /* 添加的新元素个数 */
    int updated = 0;    /* 更新分数的元素个数 */
    int processed = 0;  /* 处理的元素个数, 对于 XX 等选项可能会保持为零。 */

    /* 解析选项。 最后,'scoreidx' 设置为第一个分数元素对的分数的参数位置。 */
    scoreidx = 2;
    // 解析后面的参数
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        // 位运算 flag是0 对应二进制是 0000 0000 0000 0000 
        // |= 按位或 
        // 如果包含就 执行 按位或运算   
        if (!strcasecmp(opt,"nx")) flags |= ZADD_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX;
        else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH;
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR;
        else break;
        scoreidx++;
    }

    /* 将选项变成简单的检查变量。 */
    // 位运算 
    int incr = (flags & ZADD_INCR) != 0;
    int nx = (flags & ZADD_NX) != 0;
    int xx = (flags & ZADD_XX) != 0;
    int ch = (flags & ZADD_CH) != 0;

    /* 在选项之后,我们希望有偶数个参数,因为我们希望有任意数量的分数元素对。 */
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReply(c,shared.syntaxerr);
        return;
    }
    elements /= 2; /* Now this holds the number of score-element pairs. */

    /* Check for incompatible options. */
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }

    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. */
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist. */
    // 查找键并创建排序集(如果不存在)
    // 根据key查找value
    zobj = lookupKeyWrite(c->db,key);
    // zobj为空,表示有序集合还未创建
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        // 服务器配置里 zset_max_ziplist_entries==0 或  zset_max_ziplist_value < 每个元素的长度 
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            // 创建 Zset对象,底层数据结构是跳表和哈希表 
            zobj = createZsetObject();
        } else {
            // 也就是 同时满足以下3个条件,会使用 ziplist 作为 有序集合的底层编码方式 
            // 压缩列表为空时,
            // 并且 服务器配置里 有序集合最大实体个数(zset_max_ziplist_entries) > 0 
            // 并且 服务器配置里元素的长度(zset_max_ziplist_value) >= sdslen(c->argv[scoreidx+1]->ptr) 时

            // 创建压缩列表对象, 底层数据结构是ziplist 
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    } else {
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }

    // 往有序集合添加元素逻辑

    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        int retflags = flags;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        // 真正往有序集合添加元素添加逻辑
        int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_ADDED) added++;
        if (retflags & ZADD_UPDATED) updated++;
        if (!(retflags & ZADD_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

(7) 总结

1、ZSet 当数据比较少时,采用 ziplist 存储,每个 member/score 元素紧凑排列,节省内存

2、当数据超过阈值(zset-max-ziplist-entries、zset-max-ziplist-value)后,转为 hashtable + skiplist 存储,降低查询的时间复杂度

3、hashtable 存储 member->score 的关系,所以 ZSCORE 的时间复杂度为 O(1)

4、skiplist 是一个「有序链表 + 多层索引」的结构,把查询元素的复杂度降到了 O(logN),服务于 ZRANGE/ZREVRANGE 这类命令

5、skiplist 的多层索引,采用「随机」的方式来构建,也就是说每次添加一个元素进来,要不要对这个元素建立「多层索引」?建立「几层索引」?都要通过「随机数」的方式来决定

6、每次随机一个 0-1 之间的数,如果这个数小于 0.25(25% 概率),那就给这个元素加一层指针,持续随机直到大于 0.25 结束,最终确定这个元素的层数(层数越高,概率越低,且限制最多 64 层,详见 t_zset.c 的 zslRandomLevel 函数)

7、这个预设「概率」决定了一个跳表的内存占用和查询复杂度:概率设置越低,层数越少,元素指针越少,内存占用也就越少,但查询复杂会变高,反之亦然。这也是 skiplist 的一大特点,可通过控制概率,进而控制内存和查询效率

8、skiplist 新插入一个节点,只需修改这一层前后节点的指针,不影响其它节点的层数,降低了操作复杂度(相比平衡二叉树的再平衡,skiplist 插入性能更优)

关于 Redis 的 ZSet 为什么用 skiplist 而不用平衡二叉树实现的问题,原因是:

  • skiplist 更省内存:25% 概率的随机层数,可通过公式计算出 skiplist 平均每个节点的指针数是 1.33 个,平衡二叉树每个节点指针是 2 个(左右子树)
  • skiplist 遍历更友好:skiplist 找到大于目标元素后,向后遍历链表即可,平衡树需要通过中序遍历方式来完成,实现也略复杂
  • skiplist 更易实现和维护:扩展 skiplist 只需要改少量代码即可完成,平衡树维护起来较复杂

课后题:在使用跳表和哈希表相结合的双索引机制时,在获得高效范围查询和单点查询的同时,你能想到有哪些不足之处么?

这种发挥「多个数据结构」的优势,来完成某个功能的场景,最大的特点就是「空间换时间」,所以内存占用多是它的不足。

不过也没办法,想要高效率查询,就得牺牲内存,鱼和熊掌不可兼得。

不过 skiplist 在实现时,Redis 作者应该也考虑到这个问题了,就是上面提到的这个「随机概率」,Redis 后期维护可以通过调整这个概率,进而达到「控制」查询效率和内存平衡的结果。当然,这个预设值是固定写死的,不可配置,应该是 Redis 作者经过测试和权衡后的设定,我们这里只需要知晓原理就好。

Redis只在两个地方用到了跳跃表,一个是实现有序集合键(zset),另一个是在集群节点中用作内部数据结构,除此之外,跳表在Redis里面没有其他用途。

但是为什么用跳表而不用红黑树呢?猜想如下:
1)在做范围查找的时候,平衡树比skiplist操作要复杂。在平衡树上,我们找到指定范围的小值之后,还需要以中序遍历的顺序继续寻找其它不超过大值的节点。如果不对平衡树进行一定的改造,这里的中序遍历并不容易实现。而在skiplist上进行范围查找就非常简单,只需要在找到小值之后,对第1层链表进行若干步的遍历就可以实现。
2)平衡树的插入和删除操作可能引发子树的调整,逻辑复杂,而skiplist的插入和删除只需要修改相邻节点的指针,操作简单又快速。
3)从内存占用上来说,skiplist比平衡树更灵活一些。一般来说,平衡树每个节点包含2个指针(分别指向左右子树),而skiplist每个节点包含的指针数目平均为1/(1-p),具体取决于参数p的大小。如果像Redis里的实现一样,取p=1/4,那么平均每个节点包含1.33个指针,比平衡树更有优势。
4)查找单个key,skiplist和平衡树的时间复杂度都为O(log n),大体相当;而哈希表在保持较低的哈希值冲突概率的前提下,查找时间复杂度接近O(1),性能更高一些。所以我们平常使用的各种Map或dictionary结构,大都是基于哈希表实现的。
5)从算法实现难度上来比较,skiplist比平衡树要简单得多。

参考资料

[1] Redis源码剖析与实战 - 05 | 有序集合为何能同时支持点查询和范围查询?
[2] redis为什么选择了跳跃表而不是红黑树