Redis 数据结构 整数集合(intset)

Redis的set对象里用到了整数集合(intset)

(1) 整数集合是什么

整数集合(intset)类似于数组,但同时有有序无重复的特点,并且可以根据元素的值,自动选择整数类型来保存元素。

整数集合里的元素 类型一样(整数类型都是int16/int32/int64),里面的值不重复
相比于数组,增加了自动编码转换的功能。

(2) 为什么使用整数集合?

整数集合比较节省内存。

(2.1) 整数集合的特点

  1. 整数集合里存储的都是整形数字 ( -2^63 < val < 2^63 - 1 )
  2. 存储的数字是有序的
  3. 存储的数字是不重复的
  4. 存储的数字可以根据数字大小自动转换整数类型(int16/int32/int64)

(2.2) 为什么Redis不直接使用数组?

  1. 整数集合对数组加了一层封装,并且提供了对应的api,使用起来更便捷。
  2. 整数集合会根据值的大小自动选择编码(整数类型),节省内存空间。

(3) 源码解析

(3.1) 整数集合定义

intset.hintset.c 分别包括了整数集合的定义和实现。

// file: src/inset.h

typedef struct intset {
    uint32_t encoding; // 编码方式 4字节
    uint32_t length;  // 长度/元素个数 4字节
    int8_t contents[]; // 保存元素数组 n*len字节
} intset;

contents 数组的 int8_t 类型声明比较容易让人误解,实际上, intset 并不使用 int8_t 类型来保存任何元素,结构中的这个类型声明只是作为一个占位符使用:在对 contents 中的元素进行读取或者写入时,程序并不是直接使用 contents 来对元素进行索引,而是根据 encoding 的值,对 contents 进行类型转换和指针运算,计算出元素在内存中的正确位置。

在添加新元素,进行内存分配时,分配的空间也是由 encoding 的值决定。

(3.2) 提供的功能

操作 函数 复杂度
创建 intset intsetNew O(1)
删除 intset
intset调整大小 intsetResize O(1)
按索引设置元素 _intsetSet O(1)
按索引获取元素 _intsetGet O(1)
添加新元素 intsetAdd O(N)
添加新元素(升级编码) intsetUpgradeAndAdd O(N)
删除元素 intsetRemove O(N)
查找元素,返回索引 intsetSearch O(lgN)

(3.3) 创建整数集合-intsetNew

// file: inset.c

/* 创建空的整形数组 */
intset *intsetNew(void) {
    // inset接头体分配内存
    intset *is = zmalloc(sizeof(intset));
    // 编码方式  
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    // 设置长度
    is->length = 0;
    return is;
}

encoding 的值可以是以下三个常量之一

// file: inset.c

/* 
 * 请注意,这些编码是有序的,因此:
 * INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. 
 */
#define INTSET_ENC_INT16 (sizeof(int16_t))  // 2字节
#define INTSET_ENC_INT32 (sizeof(int32_t))  // 4字节
#define INTSET_ENC_INT64 (sizeof(int64_t))  // 8字节
/* 仅当目标主机是大端时才进行实际转换的函数变体 */
#if (BYTE_ORDER == LITTLE_ENDIAN)
#define memrev16ifbe(p) ((void)(0))
#define memrev32ifbe(p) ((void)(0))
#define memrev64ifbe(p) ((void)(0))
#define intrev16ifbe(v) (v)
#define intrev32ifbe(v) (v)  // 使用小端存储
#define intrev64ifbe(v) (v)
#else
#define memrev16ifbe(p) memrev16(p)
#define memrev32ifbe(p) memrev32(p)
#define memrev64ifbe(p) memrev64(p)
#define intrev16ifbe(v) intrev16(v)
#define intrev32ifbe(v) intrev32(v)
#define intrev64ifbe(v) intrev64(v)
#endif

(3.4) 根据值的大小使用不同的数字类型-_intsetValueEncoding

/*
 * 为提供的值返回所需的编码。
 *
 * @param v 
 */
static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;  // 使用int64类型 一个数字占用8字节 
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;  // 使用int32类型 一个数字占用4字节 
    else
        return INTSET_ENC_INT16;  // 使用int16类型 一个数字占用2字节 
}

根据整数值的大小调整整数类型(int16/int32/int64),节省内存。

(3.5) 整数集合调整大小-intsetResize

/*
 * 调整整数集合大小  
 */
static intset *intsetResize(intset *is, uint32_t len) {
    // 大小 = 长度 * 整数类型/编码
    uint64_t size = (uint64_t)len*intrev32ifbe(is->encoding);
    // 确保 内存大小 符合预期
    assert(size <= SIZE_MAX - sizeof(intset));
    // 申请内存
    is = zrealloc(is,sizeof(intset)+size);
    return is;
}

(3.6) 按索引设置元素-_intsetSet

/*  
 * 在指定位置设置值,使用配置的编码
 * 
 * @param *is 整形集合
 * @param pos 位置
 * @param value 值
 */
static void _intsetSet(intset *is, int pos, int64_t value) {
    // 获取编码/整数类型
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {  // int64类型
        // 设置值
        ((int64_t*)is->contents)[pos] = value;
        // 进行大小端转换 使用小端
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {  // int32类型
        // 设置值
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {  // int16类型
        // 设置值
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}

(3.7) 查找指定索引的值

/* 
 * 查找指定索引的值,使用指定编码
 */
static int64_t _intsetGet(intset *is, int pos) {
    return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}
/*
 * 查找指定索引的值,使用指定编码 
 */
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {  // 编码是int64

        // 把指定地址的值复制到 &v64  // 整形集合根据下标获取值的方式和数组一样 
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
        memrev64ifbe(&v64);
        return v64;
    } else if (enc == INTSET_ENC_INT32) {  // 编码是int32
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {  // 编码是 int16 
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}

(3.8) 添加新元素-intsetAdd

/*
 * 往整数集合插入一个整形数字
 */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {

    // 根据值大小获取对应的整数类型 节省内存
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* 
     * 如果必要升级编码 (有可能原来用的 int16,现在值是 2^16,超过int16的范围了,需要升级整数类型为int32)
     * 如果我们需要升级,我们需要知道这个值应该>0或<0 
     * 因为它位于现有值的范围之外。
     */
    if (valenc > intrev32ifbe(is->encoding)) { // 当前数字值对应的类型 和 整形数组编码 不一样
        // 这总是成功的,所以我们不需要 cury 成功。
        // 升级 整数类型 比如从 int16升级成int32
        return intsetUpgradeAndAdd(is,value);
    } else { // 类型一样

        // 如果这个值已经在集合里就放弃
        // 在值不存在时,这个调用将使用正确的位置填充“pos” 插入值

        if (intsetSearch(is,value,&pos)) {  // 值已存在
            if (success) *success = 0;
            return is;
        }

        // 需要插入新元素

        // 重置大小 
        // 这个每添加一个元素就要重置一次大小,成本比较高
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        // 位置 < 整数集合长度,把 pos的 往后移动
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); 
    }

    // 插入值
    _intsetSet(is,pos,value);
    // 更新整数集合长度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

从源码可以看到:

  1. 添加元素时可能触发升级操作。
  2. 升级时元素(数字)的大小不会改变。
  3. 整数集合里的编码,由最大的那个值决定。

(3.8.1) 移动元素

/*
 * 移动指定元素
 * 类似于数组拷贝
 */
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
    void *src, *dst;
    // ? 个数
    uint32_t bytes = intrev32ifbe(is->length)-from;
    // 编码/整数类型
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {  // int64类型
        // 计算开始复制位置
        src = (int64_t*)is->contents+from;
        // 计算复制后的位置
        dst = (int64_t*)is->contents+to;
        // ? 要复制的字节数 = 元素个数 * 每个元素占用的内存空间 
        bytes *= sizeof(int64_t);
    } else if (encoding == INTSET_ENC_INT32) {  // int32类型
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {  // int16类型
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }
    // 复制
    memmove(dst,src,bytes);
}

(3.9) 添加新元素(升级类型)-intsetUpgradeAndAdd

/* 
 * 整数集合里的数字类型升级 
 * 比如 int32 升级成 int64 
 * 将整数集合升级为更大的编码/整数类型并插入给定的整数。
 */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    // 编码/整数类型
    uint8_t curenc = intrev32ifbe(is->encoding);
    // 升级后的整数类型
    uint8_t newenc = _intsetValueEncoding(value);
    // 整数集合长度
    int length = intrev32ifbe(is->length);
    // 判断值是否小于0,用来计算插入位置
    int prepend = value < 0 ? 1 : 0;

    // 首先设置新编码并扩容
    // 设置编码/整数类型
    is->encoding = intrev32ifbe(newenc); 
    // 扩容
    is = intsetResize(is,intrev32ifbe(is->length)+1);  

    // 从后向前升级,这样我们就不会覆盖值。
    // 请注意,"prepend"变量用于确保我们在 intset 的开头或结尾有一个空白空间。 
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    // 在开头或结尾设置值。
    if (prepend)
        _intsetSet(is,0,value);  // 设置值
    else
        _intsetSet(is,intrev32ifbe(is->length),value);  // 设置值

    // 更新整形集合长度    
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

整数集合的值可能会出现小于0


(3.10) 删除元素-intsetRemove

/* 
 * 从整数集合删除整数
 */
intset *intsetRemove(intset *is, int64_t value, int *success) {

    // 当前元素的整数类型/编码
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;

    // 当前元素的整数类型 < 整数集合的整数类型  并且  整数集合里存在这个值时 
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        // 整数集合的长度
        uint32_t len = intrev32ifbe(is->length);

        // 能走到这儿肯定是存在这个元素,可以删除 
        if (success) *success = 1;

        // 移动元素 类似于数组删除元素 
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
        // 调整整数集合大小
        is = intsetResize(is,len-1);
        // 更新整数集合长度
        is->length = intrev32ifbe(len-1);
    }
    return is;
}

从源码可以看到

  1. 删除元素不会触发元素(数字)降级。
  2. 删除元素后会整数集合的大小会重新分配内存

(3.11) 查找元素-intsetSearch

/* 
 * 搜索"value"的位置。 
 * 找到值时返回 1,并将"pos"设置为值在 intset 中的位置。 
 * 当 intset 中不存在该值时返回 0,并将“pos”设置为可以插入“value”的位置。
 */
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    // 集合为空时,值永远找不到
    if (intrev32ifbe(is->length) == 0) {  // 集合长度为0
        if (pos) *pos = 0;
        return 0;
    } else {

        // 检查我们知道找不到值但知道插入位置的情况。 
        // value > maxValue || value < 0 
        if (value > _intsetGet(is,max)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

    // 二分查找
    while(max >= min) {
        // 中轴下标
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        // 中轴下标对应的值
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}

参考资料

[1] Redis设计与实现-整数集合
[2] Redis源码剖析与实战 - 04 内存友好的数据结构该如何细化设计?
[3] redis-inset源码-github