你要如何衡量你的人生

坚持,努力,让好事发生

redis里有 redis.log(程序运行日志)、slowlog(慢查询日志) 等

参考资料

[1] slowlog

  在原来coding pages服务和github pages服务都可以免费试用时,在域名解析时配置 国内访问coding pages部署的博客静态文件,国外访问github pages部署的静态文件。国内国外访问速度都基本都在200ms内。

网站网络设计

  自从 coding.net 停止免费的pages服务后,只有github pages可以免费试用,国内访问博客只能访问github pages服务,导致访问速度很慢。被人吐槽了一次,所以想优化一下。

优化思路比较简单,
1、国内ip访问国内的博客服务 比如gitee pages服务
2、通过缓存加快访问速度 比如CDN缓存

阅读全文 »

一个shard就对应了一个lucene的library
Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储

(1) elasticsearch存储简介

Elasticsearch中的数据是如何存储

Elasticsearch对外提供的是index的概念,可以类比为DB,
用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。

shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。
Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于MySQL的WAL(Write Ahead Log)日志,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

elasticsearch MySQL Excel
Node/Cluster (分片) Cluster 文件夹/Excel集合
index(索引) Database(数据库) 一个Excel
Type Table(一个表) Excel里的一个Sheet
Document (文档) Row(表里一行数据) Sheet里的一行
field (字段) field(一个字段) Sheet里的一个单元格

(2) lucene简介

lucene基本概念
segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
doc : doc表示lucene中的一条记录
field :field表示记录中的字段概念,一个doc由若干个field组成。
term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

(3) 索引原理

(3.1) 新增数据

(3.2) 修改数据

(3.3) 删除数据

(3.4) 查询数据

根据主键id查询
根据关键字查询

(3.5) 倒排索引合并

Lucene系列七:倒排数组合并

MySQL里经常会有 SELECT * FROM table_user WHERE city='北京' and user_name = 'lisi'
elasticsearch里也会有类似的查询

单个词匹配可以得到这个term的倒排链(docId列表)
lucene的优势在哪呢?
问题其实就变成了求有序集合求并集的问题

多个有序集合求并集

时间序列数据库的秘密 (2)——索引

elasticsearch 利用 Skip List 合并

把多个 posting list 用 AND 的关系合并,得出 posting list 的交集

每个 list 需要指出 Advance 这个操作,快速移动指向的位置。什么样的 list 可以这样 Advance 往前做蛙跳?skip list

利用 bitset 合并

(1) 多层for循环

最简单的办法是多层for循环,这样的时间复杂度是 O(n^x),这个明显是不能接受的。
两层是 O(n^2),三层是O(n^3)

(2) 拉链法

有序集合1 {1,3,5,7,8,9} 和 有序集合2 {2,3,4,5,6,7},两个指针指向首元素,比较元素的大小(如果相同,放入结果集,随意移动一个指针,否则,移动值较小的一个指针,直到队尾),

优势(集合中的元素最多被比较一次,时间复杂度为O(n),多个有序集合可以同时进行,这适用于多个分词的item求url_id交集)
这个方法就像一条拉链的两边齿轮,一一比对就像拉链,故称为拉链法

(3) 水平分桶

水平分桶,多线程并行

有序集合1{1,3,5,7,8,9, 10,30,50,70,80,90} 和 有序集合2{2,3,4,5,6,7, 20,30,40,50,60,70},先进行分桶拆分【桶1的范围为[1, 9]、桶2的范围为[10, 100]、桶3的范围为[101, max_int]】,
于是拆分成集合1【a={1,3,5,7,8,9}、b={10,30,50,70,80,90}、c={}】和 集合2【d={2,3,4,5,6,7}、e={20,30,40,50,60,70}、e={}】,
利用多线程对桶1(a和d)、桶2(b和e)、桶3(c和e)分别求交集,最后求并集。

每个区间利用多线程并行求交集,各个线程结果集的并集,作为最终的结果集,能够大大的减少执行时间

(4) bitmap

bitmap,大大提高运算并行度,时间复杂度O(n)

假设list1{1,3,5,7,8,9} 和 list2{2,3,4,5,6,7} 的所有元素都在桶值[1, 16]的范围之内,可以用16个bit来描述这两个集合,原集合中的元素x,在这个16bitmap中的第x个bit为1,此时两个bitmap求交集,只需要将两个bitmap进行”与”操作,结果集bitmap的3,5,7位是1,表明原集合的交集为{3,5,7}

水平分桶(每个桶内的数据一定处于一个范围之内),使用bitmap来表示集合,能极大提高求交集的效率,但时间复杂度仍旧是O(n),但bitmap需要大量连续空间,占用内存较大。

从Lucene 5 开始采用了一种改进的位图方式,即roaring bitmaps ( 官网http://roaringbitmap.org/ ),它是一个压缩性能比bitmap更好的位图实现。

(5) 跳表
跳表,时间复杂度为O(log(n))

集合1 {1,2,3,4,20,21,22,23,50,60,70} 和 集合2{50,70} 求交集,如果用拉链法,会发现1,2,3,4,20,21,22,23都要被无效遍历一次,每个元素都要被比对,时间复杂度为O(n),能不能每次比对“跳过一些元素”呢?集合1{1,2,3,4,20,21,22,23,50,60,70}建立跳表时,一级只有{1,20,50}三个元素,二级与普通链表相同,集合2{50,70}由于元素较少,只建立了一级普通链表。这样在进行拉链法时,复杂度由O(n)降至O(log(n))

(4) lucene索引实现

Lucene简介、索引原理、Lucene索引实现

Lucene现使用的倒排表结构叫Frame of reference,它主要有两个特点:
  1. 数据压缩,可以看下图怎么将6个数字从原先的24bytes压缩到7bytes。
  2. 跳跃表加速合并,因为布尔查询时,and 和or 操作都需要合并倒排表,这时就需要快速定位相同文档号,所以利用跳跃表来进行相同文档号查找。

ElasticSearch倒排表 https://www.elastic.co/cn/blog/frame-of-reference-and-roaring-bitmaps

参考资料

[1] Lucene底层实现原理,它的索引结构
[2] Elasticsearch 存储原理-lucene底层数据结构
[3] 简析ES/Lucene索引的基本设计原理
[4] Elasticsearch 如何做到快速检索 - 倒排索引的秘密
[5]
[] 时间序列数据库的秘密 (2)——索引

[] Frame of Reference and Roaring Bitmaps
[]
[]

[10] Lucene 查询原理及解析
[11] lucene字典实现原理——FST
[12] Lucene查询分析
[13] Lucene 查询原理
[14] Lucene系列七:倒排数组合并
[15] Elasticsearch 存储原理-lucene底层数据结构

(1) 缓存淘汰是什么

(2) 为什么要缓存淘汰

(3) 缓存淘汰算法/页面置换算法原理

(3.1) LRU

LRU 算法背后的想法非常朴素:它认为刚刚被访问的数据,肯定还会被再次访问。
选择最近最久未被使用的数据进行淘汰。

优点:

不足:
可能造成缓存污染。

缓存污染:在一些场景下,有些数据被访问的次数非常少,甚至只会被访问一次。当这些数据服务完访问请求后,如果还继续留存在缓存中的话,就只会白白占用缓存空间。

典型场景:全表扫描,对所有数据进行一次读取,每个数据都被读取到了,

(3.2) LFU

记录数据被访问的频率,选择在最近使用最少的数据进行淘汰。

LFU算法是根据数据访问的频率来选择被淘汰数据的,所以LFU算法会记录每个数据的访问次数。当一个数据被再次访问时,就会增加该数据的访问次数。

不过,访问次数和访问频率还不能完全等同。访问频率是指在一定时间内的访问次数,也就是说,在计算访问频率时,我们不仅需要记录访问次数,还要记录这些访问是在多长时间内执行的。


(4) Redis里缓存有哪些淘汰策略

内存淘汰策略 解释 备注
noeviction 不进行数据淘汰
allkeys-random 在所有key里随机筛选数据
allkeys-lru 在所有key里筛选最近最久未使用的数据
allkeys-lfu 在所有key里筛选最近最少使用的数据 Redis 4.0 新增
volatile-ttl 在有过期时间key里根据过期时间的先后筛选
volatile-random 在有过期时间key里随机筛选数据
volatile-lru 在有过期时间key里筛选最近最久未使用的数据
volatile-lfu 在有过期时间key里筛选最近最少使用的数据 Redis 4.0 新增

lru (Least Recently Used) 最近最久未使用
lfu (Least Frequently Used) 最近最少使用

在redis3.0之前,默认淘汰策略是volatile-lru;在redis3.0及之后(包括3.0),默认淘汰策略是noeviction

在3.0及之后的版本,Redis 在使用的内存空间超过 maxmemory 值时,并不会淘汰数据。

对应到 Redis 缓存,也就是指,一旦缓存被写满了,再有写请求来时,Redis 不再提供服务,而是直接返回错误。

(4.1) Redis内存淘汰机制如何启用

Redis 的内存淘汰机制是如何启用近似 LRU 算法的

和Redis配置文件redis.conf中的两个配置参数有关:

maxmemory,该配置项设定了 Redis server 可以使用的最大内存容量,一旦 server 使用的实际内存量超出该阈值时,server 就会根据 maxmemory-policy 配置项定义的策略,执行内存淘汰操作;

maxmemory-policy,该配置项设定了 Redis server 的内存淘汰策略,主要包括近似 LRU 算法、LFU 算法、按 TTL 值淘汰和随机淘汰等几种算法。


(5) Redis里缓存淘汰算法原理

(5.1) Redis-LRU

LRU 算法在实际实现时,需要用链表管理所有的缓存数据,这会带来额外的空间开销。

而且,当有数据被访问时,需要在链表上把该数据移动到 MRU 端,如果有大量数据被访问,就会带来很多链表移动操作,会很耗时,进而会降低 Redis 性能。

在 Redis 中,LRU 算法被做了简化,以减轻数据淘汰对缓存性能的影响。

Redis 并没有为所有的数据维护一个全局的链表,而是通过随机采样方式,选取一定数量(例如 100 个)的数据放入候选集合,后续在候选集合中根据 lru 字段值的大小进行筛选。

(3.2) Redis-LFU

LFU 缓存策略是在 LRU 策略基础上,为每个数据增加了一个计数器,来统计这个数据的访问次数。

当使用 LFU 策略筛选淘汰数据时,首先会根据数据的访问次数进行筛选,把访问次数最低的数据淘汰出缓存。
如果两个数据的访问次数相同,LFU 策略再比较这两个数据的访问时效性,把距离上一次访问时间更久的数据淘汰出缓存。

Redis 在实现 LFU 策略的时候,只是把原来 24bit 大小的 lru 字段,又进一步拆分成了两部分。
ldt 值:lru 字段的前 16bit,表示数据的访问时间戳;
counter 值:lru 字段的后 8bit,表示数据的访问次数。

在实现 LFU 策略时,Redis 并没有采用数据每被访问一次,就给对应的 counter 值加 1 的计数规则,而是采用了一个更优化的计数规则。

LFU 策略实现的计数规则是:每当数据被访问一次时,首先,用计数器当前的值乘以配置项 lfu_log_factor 再加 1,再取其倒数,得到一个 p 值;然后,把这个 p 值和一个取值范围在(0,1)间的随机数 r 值比大小,只有 p 值大于 r 值时,计数器才加 1。

double r = (double)rand()/RAND_MAX;
...
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;   

(4) LRU源码解读

(4.1) 全局LRU时钟值的计算

LRU算法需要知道数据的最近一次访问时间。因此,Redis设计了LRU时钟来记录数据每次访问的时间戳。

// file: src/server.h 

/*
 * redis对象
 */
typedef struct redisObject {
    unsigned type:4;  // 数据类型 (string/list/hash/set/zset等)
    unsigned encoding:4;  // 编码方式 
    unsigned lru:LRU_BITS;  // LRU时间(相对于全局 lru_clock) 
                            // 或 LFU数据(低8位保存频率 和 高16位保存访问时间)。  
                            // LRU_BITS为24个bits
    int refcount;  // 引用计数  4字节
    void *ptr;  // 指针 指向对象的值  8字节
} robj;
// file: src/server.c

void initServerConfig(void) {

    // 计算全局LRU时钟值
    server.lruclock = getLRUClock();

}
// file: src/evict.c

/* 
 * 根据时钟分辨率返回 LRU 时钟。 
 * 这是一个减少位格式的时间,可用于设置和检查 redisObject 结构的 object->lru 字段。
 */
unsigned int getLRUClock(void) {
    // mstime()是毫秒时间戳  // mstime()/1000=秒级时间戳
    // 与运算 保证值 <= LRU_CLOCK_MAX
    return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

从代码可以看出,LRU时钟精度是1000毫秒,也就是1秒。

#define LRU_BITS 24

// obj->lru的最大值 // LRU_CLOCK_MAX = 1^24 - 1
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */

// LRU 时钟分辨率(毫秒)
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
// file: src/server.c

/* 
 * 返回UNIX毫秒时间戳
 * Return the UNIX time in milliseconds 
 */
mstime_t mstime(void) {
    return ustime()/1000;
}
// file: src/server.c

/*
 * 返回UNIX微秒时间戳 
 * Return the UNIX time in microseconds 
 */
long long ustime(void) {
    struct timeval tv;
    long long ust;

    gettimeofday(&tv, NULL);
    ust = ((long long)tv.tv_sec)*1000000;
    ust += tv.tv_usec;
    return ust;
}

(4.2) 在运行过程中LRU时钟值是如何更新的

和 Redis server 在事件驱动框架中,定期运行的时间事件所对应的 serverCron 函数有关。

serverCron 函数作为时间事件的回调函数,本身会按照一定的频率周期性执行,其频率值是由 Redis 配置文件 redis.conf 中的 hz 配置项决定的。

hz 配置项的默认值是 10,这表示 serverCron 函数会每 100 毫秒(1秒 / 10 = 100 毫秒)运行一次。

// file: src/server.c

/* This is our timer interrupt, called server.hz times per second.
 * Here is where we do a number of things that need to be done asynchronously.
 * For instance:
 *
 * - Active expired keys collection (it is also performed in a lazy way on
 *   lookup).
 * - Software watchdog.
 * - Update some statistic.
 * - Incremental rehashing of the DBs hash tables.
 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
 * - Clients timeout of different kinds.
 * - Replication reconnection.
 * - Many more...
 *
 * Everything directly called here will be called server.hz times per second,
 * so in order to throttle execution of things we want to do less frequently
 * a macro is used: run_with_period(milliseconds) { .... }
 */

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

    /* We have just LRU_BITS bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock.
     *
     * Note that even if the counter wraps it's not a big problem,
     * everything will still work but some object will appear younger
     * to Redis. However for this to happen a given object should never be
     * touched for all the time needed to the counter to wrap, which is
     * not likely.
     *
     * Note that you can change the resolution altering the
     * LRU_CLOCK_RESOLUTION define. */
    // 默认情况下,每100毫秒调用getLRUClock函数更新一次全局LRU时钟值 
    server.lruclock = getLRUClock();

}

这样一来,每个键值对就可以从全局 LRU 时钟获取最新的访问时间戳了。

(4.3) key-value-LRU时钟值的初始化与更新

(4.3.1) key-LRU时钟初始化

对于key-value来说,它的 LRU 时钟值最初是在这个键值对被创建的时候,进行初始化设置的,这个初始化操作是在 createObject 函数中调用的。

// file: src/object.c

/*
 * 创建一个redisObject对象
 *
 * @param type redisObject的类型
 * @param *ptr 值的指针
 */
robj *createObject(int type, void *ptr) {
    // 为redisObject结构体分配内存空间
    robj *o = zmalloc(sizeof(*o));
  
    // 省略部分代码 

    // 将lru字段设置为当前的 lruclock(分钟分辨率),或者 LFU 计数器。 
    // 判断内存过期策略
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        // 对应lfu 
        // LFU_INIT_VAL=5 对应二进制是 0101 
        // 或运算 
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
    } else {
        // 对应lru 
        o->lru = LRU_CLOCK();
    }
    return o;
}

(4.3.2) key-LRU时钟更新

只要一个key被访问了,它的 LRU 时钟值就会被更新。而当一个键值对被访问时,访问操作最终都会调用 lookupKey 函数。

// file: src/db.c

/* 
 * 低级key查找API
 * 实际上并没有直接从应该依赖lookupKeyRead()、lookupKeyWrite()和lookupKeyReadWithFlags()的命令实现中调用。
 */
robj *lookupKey(redisDb *db, robj *key, int flags) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    // 如果节点存在
    if (de) {
        // 从节点里获取redisObject
        robj *val = dictGetVal(de);

        /* 
         * 更新老化算法的访问时间。
         * 如果我们有一个正在保存的子进程,请不要这样做,因为这会触发疯狂写入副本。
         */
        // 没有活跃子进程 并且  
        if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                // 更新lfu
                updateLFU(val);
            } else {
                // 更新lru时间
                val->lru = LRU_CLOCK();
            }
        }
        return val;
    } else {
        return NULL;
    }
}

(4.4) 近似LRU算法的实际执行

Redis 之所以实现近似 LRU 算法的目的,是为了减少内存资源和操作时间上的开销。
何时触发算法执行?
算法具体如何执行?

(4.4.1) 触发时机

近似 LRU 算法的主要逻辑是在 freeMemoryIfNeeded 函数中实现的

processCommand -> freeMemoryIfNeededAndSafe -> freeMemoryIfNeeded

(4.4.2) 近似LRU算法执行

主要分3大步

  1. 判断当前内存使用情况-getMaxmemoryState
  2. 更新待淘汰的候选键值对集合-evictionPoolPopulate
  3. 选择被淘汰的键值对并删除-freeMemoryIfNeeded
// file: src/evict.c

/* This function is periodically called to see if there is memory to free
 * according to the current "maxmemory" settings. In case we are over the
 * memory limit, the function will try to free some memory to return back
 * under the limit.
 *
 * The function returns C_OK if we are under the memory limit or if we
 * were over the limit, but the attempt to free memory was successful.
 * Otherwise if we are over the memory limit, but not enough memory
 * was freed to return back under the limit, the function returns C_ERR. 
 */
int freeMemoryIfNeeded(void) {
    int keys_freed = 0;
    /* By default replicas should ignore maxmemory
     * and just be masters exact copies. */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;

    size_t mem_reported, mem_tofree, mem_freed;
    mstime_t latency, eviction_latency, lazyfree_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = C_ERR;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (clientsArePaused()) return C_OK;

    // 如果当前内存使用量没有超过 maxmemory,返回
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;

    // 已经释放的内存大小
    mem_freed = 0;

    latencyStartMonitor(latency);
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */

    // 已经释放的内存大小 < 计划要释放的内存大小
    while (mem_freed < mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            // 淘汰池 / 采样key集合
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                // 在keys过期时我们不想创建本地数据库去选择(哪些key删除),
                // 因此开始在每个数据库中填充采样key的淘汰池。 
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;

            /* When the memory to free starts to be big enough, we may
             * start spending so much time here that is impossible to
             * deliver data to the slaves fast enough, so we force the
             * transmission here inside the loop. */
            if (slaves) flushSlavesOutputBuffers();

            /* Normally our stop condition is the ability to release
             * a fixed, pre-computed amount of memory. However when we
             * are deleting objects in another thread, it's better to
             * check, from time to time, if we already reached our target
             * memory, since the "mem_freed" amount is computed only
             * across the dbAsyncDelete() call, while the thread can
             * release the memory all the time. */
            if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
                if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                    /* Let's satisfy our stop condition. */
                    mem_freed = mem_tofree;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    result = C_OK;

cant_free:
    /* We are here if we are not able to reclaim memory. There is only one
     * last thing we can try: check if the lazyfree thread has jobs in queue
     * and wait... */
    if (result != C_OK) {
        latencyStartMonitor(lazyfree_latency);
        while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = C_OK;
                break;
            }
            usleep(1000);
        }
        latencyEndMonitor(lazyfree_latency);
        latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
    }
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return result;
}

(4.4.2.1) 判断当前内存使用情况-getMaxmemoryState

// file: src/evict.c

/* Get the memory status from the point of view of the maxmemory directive:
 * if the memory used is under the maxmemory setting then C_OK is returned.
 * Otherwise, if we are over the memory limit, the function returns
 * C_ERR.
 *
 * The function may return additional info via reference, only if the
 * pointers to the respective arguments is not NULL. Certain fields are
 * populated only when C_ERR is returned:
 *
 *  'total'     total amount of bytes used.
 *              (Populated both for C_ERR and C_OK)
 *
 *  'logical'   the amount of memory used minus the slaves/AOF buffers.
 *              (Populated when C_ERR is returned)
 *
 *  'tofree'    the amount of memory that should be released
 *              in order to return back into the memory limits.
 *              (Populated when C_ERR is returned)
 *
 *  'level'     this usually ranges from 0 to 1, and reports the amount of
 *              memory currently used. May be > 1 if we are over the memory
 *              limit.
 *              (Populated both for C_ERR and C_OK)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) {
    size_t mem_reported, mem_used, mem_tofree;

    /* Check if we are over the memory usage limit. If we are not, no need
     * to subtract the slaves output buffers. We can just return ASAP. */
    mem_reported = zmalloc_used_memory();
    if (total) *total = mem_reported;

    /* We may return ASAP if there is no need to compute the level. */
    int return_ok_asap = !server.maxmemory || mem_reported <= server.maxmemory;
    if (return_ok_asap && !level) return C_OK;

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    mem_used = mem_reported;
    size_t overhead = freeMemoryGetNotCountedMemory();
    mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

    /* Compute the ratio of memory usage. */
    if (level) {
        if (!server.maxmemory) {
            *level = 0;
        } else {
            *level = (float)mem_used / (float)server.maxmemory;
        }
    }

    if (return_ok_asap) return C_OK;

    /* Check if we are still over the memory limit. */
    if (mem_used <= server.maxmemory) return C_OK;

    /* Compute how much memory we need to free. */
    mem_tofree = mem_used - server.maxmemory;

    if (logical) *logical = mem_used;
    if (tofree) *tofree = mem_tofree;

    return C_ERR;
}

(4.4.2.2) 更新待淘汰的候选键值对集合-evictionPoolPopulate

// file: src/evict.c

/* This is an helper function for freeMemoryIfNeeded(), it is used in order
 * to populate the evictionPool with a few entries every time we want to
 * expire a key. Keys with idle time smaller than one of the current
 * keys are added. Keys are always added if there are free entries.
 *
 * We insert keys on place in ascending order, so keys with the smaller
 * idle time are on the left, and keys with the higher idle time on the
 * right. */

void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];

    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* Calculate the idle time according to the policy. This is called
         * idle just because the code initially handled LRU, but is in fact
         * just a score where an higher score means better candidate. */
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            /* When we use an LRU policy, we sort the keys by idle time
             * so that we expire keys starting from greater idle time.
             * However when the policy is an LFU one, we have a frequency
             * estimation, and we want to evict keys with lower frequency
             * first. So inside the pool we put objects using the inverted
             * frequency subtracting the actual frequency to the maximum
             * frequency of 255. */
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            /* In this case the sooner the expire the better. */
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* Can't insert if the element is < the worst element we have
             * and there are no empty buckets. */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* Inserting into empty position. No setup needed before insert. */
        } else {
            /* Inserting in the middle. Now k points to the first element
             * greater than the element to insert.  */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* Free space on the right? Insert at k shifting
                 * all the elements from k to end to the right. */

                /* Save SDS before overwriting. */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                pool[k].cached = cached;
            } else {
                /* No free space on right? Insert at k-1 */
                k--;
                /* Shift all elements on the left of k (included) to the
                 * left, so we discard the element with smaller idle time. */
                sds cached = pool[0].cached; /* Save SDS before overwriting. */
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }

        /* Try to reuse the cached SDS string allocated in the pool entry,
         * because allocating and deallocating this object is costly
         * (according to the profiler, not my fantasy. Remember:
         * premature optimization bla bla bla. */
        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
            memcpy(pool[k].cached,key,klen+1);
            sdssetlen(pool[k].cached,klen);
            pool[k].key = pool[k].cached;
        }
        pool[k].idle = idle;
        pool[k].dbid = dbid;
    }
}

(4.4.2.3) 选择被淘汰的键值对并删除-freeMemoryIfNeeded

// file: src/evict.c

/* This function is periodically called to see if there is memory to free
 * according to the current "maxmemory" settings. In case we are over the
 * memory limit, the function will try to free some memory to return back
 * under the limit.
 *
 * The function returns C_OK if we are under the memory limit or if we
 * were over the limit, but the attempt to free memory was successful.
 * Otherwise if we are over the memory limit, but not enough memory
 * was freed to return back under the limit, the function returns C_ERR. */
int freeMemoryIfNeeded(void) {
    int keys_freed = 0;
    /* By default replicas should ignore maxmemory
     * and just be masters exact copies. */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;

    size_t mem_reported, mem_tofree, mem_freed;
    mstime_t latency, eviction_latency, lazyfree_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = C_ERR;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (clientsArePaused()) return C_OK;
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;

    mem_freed = 0;

    latencyStartMonitor(latency);
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */

    while (mem_freed < mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;

            /* When the memory to free starts to be big enough, we may
             * start spending so much time here that is impossible to
             * deliver data to the slaves fast enough, so we force the
             * transmission here inside the loop. */
            if (slaves) flushSlavesOutputBuffers();

            /* Normally our stop condition is the ability to release
             * a fixed, pre-computed amount of memory. However when we
             * are deleting objects in another thread, it's better to
             * check, from time to time, if we already reached our target
             * memory, since the "mem_freed" amount is computed only
             * across the dbAsyncDelete() call, while the thread can
             * release the memory all the time. */
            if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
                if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                    /* Let's satisfy our stop condition. */
                    mem_freed = mem_tofree;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    result = C_OK;

cant_free:
    /* We are here if we are not able to reclaim memory. There is only one
     * last thing we can try: check if the lazyfree thread has jobs in queue
     * and wait... */
    if (result != C_OK) {
        latencyStartMonitor(lazyfree_latency);
        while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = C_OK;
                break;
            }
            usleep(1000);
        }
        latencyEndMonitor(lazyfree_latency);
        latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
    }
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return result;
}

(5) LFU源码解读

LFU 算法的启用,是通过设置 Redis 配置文件 redis.conf 中的 maxmemory 和 maxmemory-policy。

LFU 算法的实现可以分成三部分内容,分别是键值对访问频率记录、键值对访问频率初始化和更新,以及LFU算法淘汰数据。

(5.1) 键值对访问频率记录

每个键值对的值都对应了一个 redisObject 结构体,其中有一个 24 bits 的 lru 变量。

LRU 算法和 LFU 算法并不会同时使用。为了节省内存开销,Redis 源码就复用了 lru 变量来记录 LFU 算法所需的访问频率信息。

记录LFU算法的所需信息时,它会用24 bits中的低8 bits作为计数器,来记录键值对的访问次数,同时它会用24 bits中的高16 bits,记录访问的时间戳。

|<---访问时间戳--->|< 计数器 >| 

     16 bits      8 bits
+----------------+--------+
+ Last decr time | LOG_C  |
+----------------+--------+            

(5.2) 键值对访问频率初始化和更新

(5.2.1) 初始化

键值对 lru变量初始化是在 创建redisObject调用 createObject 函数时完成的。

主要分2步:
第一部是 lru 变量的高16位,是以1分钟为精度的 UNIX 时间戳。(LFUGetTimeInMinutes)
第二部是 lru 变量的低8位,被设置为宏定义 LFU_INIT_VAL,默认值为 5。

源码如下

// file: src/object.c

/*
 * 创建一个redisObject对象
 *
 * @param type redisObject的类型
 * @param *ptr 值的指针
 */
robj *createObject(int type, void *ptr) {
    // 为redisObject结构体分配内存空间
    robj *o = zmalloc(sizeof(*o));
  
    // 省略部分代码 

    // 将lru字段设置为当前的 lruclock(分钟分辨率),或者 LFU 计数器。 
    // 判断内存过期策略
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
        // 对应lfu 
        // LFU_INIT_VAL=5 对应二进制是 0101 
        // 或运算  高16位是时间,低8位是次数, LFU_INIT_VAL = 5
        o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
    } else {
        // 对应lru 
        o->lru = LRU_CLOCK();
    }
    return o;
}

counter会被初始化为LFU_INIT_VAL,默认5。

// file: src/evict.c

/* ----------------------------------------------------------------------------
 * LFU (Least Frequently Used) implementation.
 * 
 * 为了实现 LFU(最不常用)驱逐策略,我们在每个对象中总共有 24 位空间,因为我们为此目的重新使用了 LRU 字段。
 *
 * 我们将 24 位分成两个字段:
 *
 *          16 bits      8 bits
 *     +----------------+--------+
 *     + Last decr time | LOG_C  |
 *     +----------------+--------+
 *
 * LOG_C 是提供访问频率指示的对数计数器。 
 * 然而,这个字段也必须递减,否则过去经常访问的键将永远保持这样的排名,而我们希望算法适应访问模式的变化。
 *
 * 因此,剩余的 16 位用于存储“递减时间”,
 * 这是一个精度降低的 Unix 时间(我们将 16 位时间转换为分钟,因为我们不关心回绕),
 * 其中 LOG_C 计数器减半 如果它具有高值,或者如果它具有低值则只是递减。
 *
 * 新key不会从零开始,以便能够在被淘汰之前收集一些访问,因此它们从 COUNTER_INIT_VAL 开始。
 * COUNTER_INIT_VAL = 5
 * 因此从5(或具有较小值)开始的键在访问时递增的可能性非常高。
 *
 * 在递减期间,如果对数计数器的当前值大于5的两倍,则对数计数器的值减半,否则它只减一。
 * 
 * --------------------------------------------------------------------------*/

/* 
 * 以分钟为单位返回当前时间,只取最低有效16位。 
 * 返回的时间适合存储为 LFU 实现的 LDT(最后递减时间)。
 */
unsigned long LFUGetTimeInMinutes(void) {
    // 65535 = 2^16 - 1 对应二进制是 1111 1111 1111 1111
    // (server.unixtime/60) & 1111 1111 1111 1111
    return (server.unixtime/60) & 65535;
}

(5.2.2) 更新LFU值

当一个键值对被访问时,Redis 会调用 lookupKey 函数进行查找。lookupKey 函数会调用 updateLFU 函数来更新键值对的访问频率。

// file: src/db.c

/* 
 * 访问对象时更新 LFU。
 * 首先,如果达到递减时间,则递减计数器。
 * 然后以对数方式递增计数器,并更新访问时间。
 */
void updateLFU(robj *val) {
    // 获取计数器
    unsigned long counter = LFUDecrAndReturn(val);
    // 更新计数器
    counter = LFULogIncr(counter);
    val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}

(5.2.2.1) 递减计数器-LFUDecrAndReturn

/*
 * 如果达到对象递减时间,则 递减LFU计数器 但 不更新对象的LFU字段,
 * 我们在真正访问对象时以显式方式更新访问时间和计数器。
 * 
 * 并且我们将根据 经过的时间/server.lfu_decay_time 将计数器减半。
 * 返回对象频率计数器。
 * redis.conf配置文件里 lfu-decay-time 默认是 1
 * And we will times halve the counter according to the times of
 * elapsed time than server.lfu_decay_time.
 * 
 * 此函数用于扫描数据集以获得最佳对象
 *  适合:当我们检查候选对象时,如果需要,我们会递减扫描对象的计数器。
 */
unsigned long LFUDecrAndReturn(robj *o) {

    // 高16位存的是 上次访问时间(分钟级的) Last decr time 
    unsigned long ldt = o->lru >> 8;

    // 255 对应二进制 1111 1111 
    // o->lru & 1111 1111 相当于取低8位的值
    // 获取计数器
    unsigned long counter = o->lru & 255;

    // 0 <= LFUTimeElapsed(ldt) <  65535
    // 过了的分钟数 / server.lfu_decay_time
    // num_periods 是过了 n轮 衰减时间(lfu_decay_time)
    unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;

    // 如果经过的轮数不为0 (超过1分钟了)
    if (num_periods) 
        // 如果 n轮衰减 > 访问次数,counter设置为0,相当于重新开始计算
        // 否则,n轮衰减 <= 访问次数,counter设置为 counter - num_periods,相当于每过1轮衰减时间(lfu_decay_time),减1
        counter = (num_periods > counter) ? 0 : counter - num_periods;

    // 如果没有超过1分钟,num_periods=0,直接返回counter
    // 如果超过1分钟,num_periods!=0,至少过了1轮衰减时间(lfu_decay_time)了,更新counter后返回
    return counter;
}

LFUDecrAndReturn 得到的计数结果

  1. 如果在当前分钟时间戳内,counter不变
  2. 如果不在当前分钟时间戳内,每过1轮衰减时间(lfu_decay_time),counter减1 (代码里是过了num_periods轮,减num_periods)
/* 
 * 计算过了多少分钟
 * 
 * 给定对象的上次访问时间,计算自上次访问以来经过的最小分钟数。 
 * 处理溢出(ldt 大于当前 16 位分钟时间),将时间视为正好回绕一次。
 * 
 * @param ldt 上一次访问的时间(分钟级)
 */
unsigned long LFUTimeElapsed(unsigned long ldt) {
    // 获取分钟级时间戳
    unsigned long now = LFUGetTimeInMinutes();
    // 计算过了多少分钟
    if (now >= ldt) return now-ldt;

    // 实际上now永远是在ldt(上一次访问时间之后)
    // 但是现在 now < ldt,不符合预期 
    // ldt是 (server.unixtime/60) & 1111 1111 1111 1111 得到的,相当于取余,也就是至少过了1轮了 
    // 假设 ldt = 65534  now = 1,其实过了2分钟
    return 65535-ldt+now;
}

(5.2.2.2) 更新LFU计数器-LFULogIncr

/* 
 * 以对数方式递增计数器。 当前计数器值越大,它真正实现的可能性就越小。 在255时饱和。
 *
 * Logarithmically increment a counter. 
 * The greater is the current counter value
 * the less likely is that it gets really implemented. 
 * Saturate it at 255. 
 */
uint8_t LFULogIncr(uint8_t counter) {
    // 最大255
    if (counter == 255) return 255;

    // 获取一个随机数
    double r = (double)rand()/RAND_MAX;

    // 基础值 = counter - 5
    double baseval = counter - LFU_INIT_VAL;
    // 最小=0
    if (baseval < 0) baseval = 0;

    // 取对数 
    double p = 1.0/(baseval*server.lfu_log_factor+1);

    // 随机数 < 对数时,计数器+1
    if (r < p) counter++;

    return counter;
}

counter并不是简单的访问一次就+1,而是采用了一个0-1之间的p因子控制增长。

对数

取一个0-1之间的随机数r与p比较,当r < p时,才增加counter
p取决于当前counter值与lfu_log_factor因子,counter值与lfu_log_factor因子越大,p越小,r<p的概率也越小,counter增长的概率也就越小。

增长情况如下

+--------+------------+------------+------------+------------+------------+
| factor | 100 hits   | 1000 hits  | 100K hits  | 1M hits    | 10M hits   |
+--------+------------+------------+------------+------------+------------+
| 0      | 104        | 255        | 255        | 255        | 255        |
+--------+------------+------------+------------+------------+------------+
| 1      | 18         | 49         | 255        | 255        | 255        |
+--------+------------+------------+------------+------------+------------+
| 10     | 10         | 18         | 142        | 255        | 255        |
+--------+------------+------------+------------+------------+------------+
| 100    | 8          | 11         | 49         | 143        | 255        |
+--------+------------+------------+------------+------------+------------+

(5.3) LFU算法淘汰数据

主要有三步
第一步,调用 getMaxmemoryState 函数计算待释放的内存空间;
第二步,调用 evictionPoolPopulate 函数随机采样键值对,并插入到待淘汰集合 EvictionPoolLRU 中;
第三步,遍历待淘汰集合 EvictionPoolLRU,选择实际被淘汰数据,并删除。

(5.3.1) 判断当前内存使用情况-getMaxmemoryState

// file: src/evict.c

/* Get the memory status from the point of view of the maxmemory directive:
 * if the memory used is under the maxmemory setting then C_OK is returned.
 * Otherwise, if we are over the memory limit, the function returns
 * C_ERR.
 *
 * The function may return additional info via reference, only if the
 * pointers to the respective arguments is not NULL. Certain fields are
 * populated only when C_ERR is returned:
 *
 *  'total'     total amount of bytes used.
 *              (Populated both for C_ERR and C_OK)
 *
 *  'logical'   the amount of memory used minus the slaves/AOF buffers.
 *              (Populated when C_ERR is returned)
 *
 *  'tofree'    the amount of memory that should be released
 *              in order to return back into the memory limits.
 *              (Populated when C_ERR is returned)
 *
 *  'level'     this usually ranges from 0 to 1, and reports the amount of
 *              memory currently used. May be > 1 if we are over the memory
 *              limit.
 *              (Populated both for C_ERR and C_OK)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level) {
    size_t mem_reported, mem_used, mem_tofree;

    /* Check if we are over the memory usage limit. If we are not, no need
     * to subtract the slaves output buffers. We can just return ASAP. */
    mem_reported = zmalloc_used_memory();
    if (total) *total = mem_reported;

    /* We may return ASAP if there is no need to compute the level. */
    int return_ok_asap = !server.maxmemory || mem_reported <= server.maxmemory;
    if (return_ok_asap && !level) return C_OK;

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    mem_used = mem_reported;
    size_t overhead = freeMemoryGetNotCountedMemory();
    mem_used = (mem_used > overhead) ? mem_used-overhead : 0;

    /* Compute the ratio of memory usage. */
    if (level) {
        if (!server.maxmemory) {
            *level = 0;
        } else {
            *level = (float)mem_used / (float)server.maxmemory;
        }
    }

    if (return_ok_asap) return C_OK;

    /* Check if we are still over the memory limit. */
    if (mem_used <= server.maxmemory) return C_OK;

    /* Compute how much memory we need to free. */
    mem_tofree = mem_used - server.maxmemory;

    if (logical) *logical = mem_used;
    if (tofree) *tofree = mem_tofree;

    return C_ERR;
}

(5.3.2) 更新待淘汰的候选键值对集合-evictionPoolPopulate

// file: src/evict.c

/* This is an helper function for freeMemoryIfNeeded(), it is used in order
 * to populate the evictionPool with a few entries every time we want to
 * expire a key. Keys with idle time smaller than one of the current
 * keys are added. Keys are always added if there are free entries.
 *
 * We insert keys on place in ascending order, so keys with the smaller
 * idle time are on the left, and keys with the higher idle time on the
 * right. */

void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *samples[server.maxmemory_samples];

    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* Calculate the idle time according to the policy. This is called
         * idle just because the code initially handled LRU, but is in fact
         * just a score where an higher score means better candidate. */
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            /* When we use an LRU policy, we sort the keys by idle time
             * so that we expire keys starting from greater idle time.
             * However when the policy is an LFU one, we have a frequency
             * estimation, and we want to evict keys with lower frequency
             * first. So inside the pool we put objects using the inverted
             * frequency subtracting the actual frequency to the maximum
             * frequency of 255. */
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            /* In this case the sooner the expire the better. */
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* Can't insert if the element is < the worst element we have
             * and there are no empty buckets. */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* Inserting into empty position. No setup needed before insert. */
        } else {
            /* Inserting in the middle. Now k points to the first element
             * greater than the element to insert.  */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* Free space on the right? Insert at k shifting
                 * all the elements from k to end to the right. */

                /* Save SDS before overwriting. */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                pool[k].cached = cached;
            } else {
                /* No free space on right? Insert at k-1 */
                k--;
                /* Shift all elements on the left of k (included) to the
                 * left, so we discard the element with smaller idle time. */
                sds cached = pool[0].cached; /* Save SDS before overwriting. */
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }

        /* Try to reuse the cached SDS string allocated in the pool entry,
         * because allocating and deallocating this object is costly
         * (according to the profiler, not my fantasy. Remember:
         * premature optimization bla bla bla. */
        int klen = sdslen(key);
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            pool[k].key = sdsdup(key);
        } else {
            memcpy(pool[k].cached,key,klen+1);
            sdssetlen(pool[k].cached,klen);
            pool[k].key = pool[k].cached;
        }
        pool[k].idle = idle;
        pool[k].dbid = dbid;
    }
}

(5.3.3) 选择被淘汰的键值对并删除-freeMemoryIfNeeded

// file: src/evict.c

/* This function is periodically called to see if there is memory to free
 * according to the current "maxmemory" settings. In case we are over the
 * memory limit, the function will try to free some memory to return back
 * under the limit.
 *
 * The function returns C_OK if we are under the memory limit or if we
 * were over the limit, but the attempt to free memory was successful.
 * Otherwise if we are over the memory limit, but not enough memory
 * was freed to return back under the limit, the function returns C_ERR. */
int freeMemoryIfNeeded(void) {
    int keys_freed = 0;
    /* By default replicas should ignore maxmemory
     * and just be masters exact copies. */
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;

    size_t mem_reported, mem_tofree, mem_freed;
    mstime_t latency, eviction_latency, lazyfree_latency;
    long long delta;
    int slaves = listLength(server.slaves);
    int result = C_ERR;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    if (clientsArePaused()) return C_OK;
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;

    mem_freed = 0;

    latencyStartMonitor(latency);
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */

    while (mem_freed < mem_tofree) {
        int j, k, i;
        static unsigned int next_db = 0;
        sds bestkey = NULL;
        int bestdbid;
        redisDb *db;
        dict *dict;
        dictEntry *de;

        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            struct evictionPoolEntry *pool = EvictionPoolLRU;

            while(bestkey == NULL) {
                unsigned long total_keys = 0, keys;

                /* We don't want to make local-db choices when expiring keys,
                 * so to start populate the eviction pool sampling keys from
                 * every DB. */
                for (i = 0; i < server.dbnum; i++) {
                    db = server.db+i;
                    dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                            db->dict : db->expires;
                    if ((keys = dictSize(dict)) != 0) {
                        evictionPoolPopulate(i, dict, db->dict, pool);
                        total_keys += keys;
                    }
                }
                if (!total_keys) break; /* No keys to evict. */

                /* Go backward from best to worst element to evict. */
                for (k = EVPOOL_SIZE-1; k >= 0; k--) {
                    if (pool[k].key == NULL) continue;
                    bestdbid = pool[k].dbid;

                    if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                        de = dictFind(server.db[pool[k].dbid].dict,
                            pool[k].key);
                    } else {
                        de = dictFind(server.db[pool[k].dbid].expires,
                            pool[k].key);
                    }

                    /* Remove the entry from the pool. */
                    if (pool[k].key != pool[k].cached)
                        sdsfree(pool[k].key);
                    pool[k].key = NULL;
                    pool[k].idle = 0;

                    /* If the key exists, is our pick. Otherwise it is
                     * a ghost and we need to try the next element. */
                    if (de) {
                        bestkey = dictGetKey(de);
                        break;
                    } else {
                        /* Ghost... Iterate again. */
                    }
                }
            }
        }

        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            /* When evicting a random key, we try to evict a key for
             * each DB, so we use the static 'next_db' variable to
             * incrementally visit all DBs. */
            for (i = 0; i < server.dbnum; i++) {
                j = (++next_db) % server.dbnum;
                db = server.db+j;
                dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
                        db->dict : db->expires;
                if (dictSize(dict) != 0) {
                    de = dictGetRandomKey(dict);
                    bestkey = dictGetKey(de);
                    bestdbid = j;
                    break;
                }
            }
        }

        /* Finally remove the selected key. */
        if (bestkey) {
            db = server.db+bestdbid;
            robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
            propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
            /* We compute the amount of memory freed by db*Delete() alone.
             * It is possible that actually the memory needed to propagate
             * the DEL in AOF and replication link is greater than the one
             * we are freeing removing the key, but we can't account for
             * that otherwise we would never exit the loop.
             *
             * Same for CSC invalidation messages generated by signalModifiedKey.
             *
             * AOF and Output buffer memory will be freed eventually so
             * we only care about memory used by the key space. */
            delta = (long long) zmalloc_used_memory();
            latencyStartMonitor(eviction_latency);
            if (server.lazyfree_lazy_eviction)
                dbAsyncDelete(db,keyobj);
            else
                dbSyncDelete(db,keyobj);
            latencyEndMonitor(eviction_latency);
            latencyAddSampleIfNeeded("eviction-del",eviction_latency);
            delta -= (long long) zmalloc_used_memory();
            mem_freed += delta;
            server.stat_evictedkeys++;
            signalModifiedKey(NULL,db,keyobj);
            notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
                keyobj, db->id);
            decrRefCount(keyobj);
            keys_freed++;

            /* When the memory to free starts to be big enough, we may
             * start spending so much time here that is impossible to
             * deliver data to the slaves fast enough, so we force the
             * transmission here inside the loop. */
            if (slaves) flushSlavesOutputBuffers();

            /* Normally our stop condition is the ability to release
             * a fixed, pre-computed amount of memory. However when we
             * are deleting objects in another thread, it's better to
             * check, from time to time, if we already reached our target
             * memory, since the "mem_freed" amount is computed only
             * across the dbAsyncDelete() call, while the thread can
             * release the memory all the time. */
            if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
                if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                    /* Let's satisfy our stop condition. */
                    mem_freed = mem_tofree;
                }
            }
        } else {
            goto cant_free; /* nothing to free... */
        }
    }
    result = C_OK;

cant_free:
    /* We are here if we are not able to reclaim memory. There is only one
     * last thing we can try: check if the lazyfree thread has jobs in queue
     * and wait... */
    if (result != C_OK) {
        latencyStartMonitor(lazyfree_latency);
        while(bioPendingJobsOfType(BIO_LAZY_FREE)) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                result = C_OK;
                break;
            }
            usleep(1000);
        }
        latencyEndMonitor(lazyfree_latency);
        latencyAddSampleIfNeeded("eviction-lazyfree",lazyfree_latency);
    }
    latencyEndMonitor(latency);
    latencyAddSampleIfNeeded("eviction-cycle",latency);
    return result;
}

参考资料

[1] Redis 核心技术与实战 - 24 | 替换策略:缓存满了怎么办?
[2] Redis 核心技术与实战 - 27 | 缓存被污染了,该怎么办?
[3] Redis 源码剖析与实战 - 15 | 为什么LRU算法原理和代码实现不一样?
[4] Redis 源码剖析与实战 - 16 | LFU算法和其他算法相比有优势吗?
[5] Redis中的LFU算法

redis的set命令很简单,大家经常用。
有没有想过一个问题, SET 9223372036854775806 1 EX 86400 NX 是怎么执行的?

127.0.0.1:6379>  SET 9223372036854775806 1 EX 86400 NX
OK
127.0.0.1:6379>
阅读全文 »

看了微博在2012年 微博计数器的设计 后,现在在想假设自己要设计一个有点赞数收藏数评论数转发数 的计数系统,会怎么做?

微博计数器的设计 文章链接 https://blog.cydu.net/weidesign/2012/09/09/weibo-counter-service-design-2/


(1) 背景

以抖音、微博、微信 为例,推荐的视频、微博、朋友圈 会有点赞数收藏数评论数转发数,假如让你设计,你会怎么做?

微博的比较经典,这里以微博为例,原理都一样。
考虑因素如下:
1、预估用户 14亿
2、预估活跃用户数 0.01 ~ 6亿。
3、假设每个用户每天发13个微博, 每天大概有 0.01亿 ~ 18亿条微博。 每条微博上都有点赞数收藏数评论数转发数
3
5年大概有 0.013653 ~ 183653亿条微博,也就是 10.95亿 ~ 1 9710亿条微博。



(2) 信息Id设计

如果只考虑用户信息的ID查询,只要保证动态Id唯一即可;考虑到要按照用户维度去展示用户的信息列表,信息ID里最好包含用户ID,这样方便查询某个用户发布的所有的信息(动态)。
类似于电商场景查询某个用户的所有订单。

信息有个唯一表示,用rec_id表示
根据推荐信息唯一标识rec_id获取点赞数收藏数评论数转发数 是一个简单查询。

(2.1) 用数字表示信息ID

int类型占用4字节,32位,最多可以表示 2^32个数字,考虑上符号,以及0,正数最多有 2^31-1 = 21 4748 3647 个,大概21亿多。

long类型占用8字节,64位,最多可以表示 2^64个数字,考虑到id都是正数,而且一般不用0,所以最多有 2^63-1 = 922 3372 0368 5477 5807 个,大概922亿亿个。

可以用8字节的Long类型表示信息ID(msg_id),msg_id的设计可以参考雪花算法的思想, 64位可以按照 符号位(1位)、城市(6位)、用户(8-10位)、时间戳()、随机字符()等来设计。


(3) 计数器设计方案

(3.1) 数据库方案

在刚开始阶段,用户较少并且用户发布的信息/动态较少时,可以使用数据库来存储。

CREATE TABLE msg_count (
    `id` bigint(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
    `user_id` bigint(11) NOT NULL COMMENT '用户id',    
    `msg_id` bigint(11) NOT NULL COMMENT '信息id',
    `like_count` int(11) NOT NULL DEFAULT '0' COMMENT '点赞数',
    `favor_count` int(11) NOT NULL DEFAULT '0' COMMENT '收藏数',
    `repost_count` int(11) NOT NULL DEFAULT '0' COMMENT '转发数',
    `comment_count` int(11) NOT NULL DEFAULT '0' COMMENT '评论数',
    PRIMARY KEY (`id`),
    UNIQUE KEY `uq_idx_msg_id` (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='信息(动态)计数表';

(3.1.1) 实现业务功能

查询点赞数 UPDATE msg_count SET like_num = like_num + 1 WHERE msg_id = xxx ;
更新点赞数 SELECT msg_count WHERE msg_id = xxx ;

(3.1.2) 存储

按照上面的表设计,一条计数数据占用64字节,MySQL一页可以存储 16K/64=256条数据
MySQL一页可以存储 16K / (主键ID 8字节 + 指针 6字节) = 1170 条数据

假设MySQL查一条数据需要二次IO,读取二页,那么MySQL单表 可以存储 1170 * 256 = 29 9520 条数据。
假设MySQL查一条数据需要三次IO,读取三页,那么MySQL单表 可以存储 1170 * 1170 * 256 = 3 5043 8400 条数据。
从数据存储角度看,MySQL单表存储上亿条数据没问题。

假设一个表存一亿条数据,需要一千多个表就可以存储千亿条数据。
从维护角度看,可以让单表数据量500万,这样单表 数据备份,改表等时间会短一点。

未来扩缩容

按信息ID(msg_id)取模,把数据拆分到N个表(N是2的幂次方)。
如果要扩容,把N个表拆成 N * 2^x,迁移数据时只需要把 1个表拆数据到 2^x个表即可。

(3.1.3) 性能

从查询角度看,百万的查询量,分摊到一千个每个表,每个表查询QPS大概是100左右,从单表查询QPS来看是没什么问题的,但是从 CPU、内存、IO角度看是有资源的问题
每秒百万请求量,那么MySQL需要把数据读取到内存,会有页数据对应的内存淘汰,假设有一半的数据在内存,msg_count表单条计数数据64字节 * 每秒100 0000访问量 / 2 = 6400 0000字节,也就是MySQL至少需要64G内存,

从业务角度看,也是会有问题的,单表平均QPS大概100,但实际上往往一个热点微博可能导致所在表的QPS上万。

(3.1.4) 存储数据量过大怎么办-分库分表

对于一亿甚至几亿以下的数据规模来说,拆表能够解决很多问题。

类似于电商交易场景的订单分库分表,按照uidorder_id分库分表
技术系统可以按照uid或者msg_id分库分表

(3.1.4.1) 按照uid分库分表

  1. 写入时一个用户的所有动态(微博)在一个表里。
  2. 用户在查看自己的动态(微博)列表时,可以批量获取点赞数,评论数、转发数。
  3. 如果用户是一个热点用户,发的动态(微博)比较多,查询压力会在同一个表,这张表可能会成为热点表。
    可以通过mq或日志计数,如果计数结果>设置阈值,把热点动态的计数放到缓存里。
    不过如果一个动态(微博)是热点,可能mq计数还没把数据放到缓存里数据库就挂了。
  4. 如果直接根据msg_id来查,会找不到对应的库表,需要根据uid和msg_id来查。 或者msg_id里可以记录uid的一些信息。

(3.1.4.2) 按照msg_id分库分表

  1. 写入时一个用户的所有动态(微博)在多个表里。
  2. 用户在查看自己的动态(微博)列表时,获取点赞数,评论数、转发数 需要进行多次查询。
  3. 如果用户是一个热点用户,发的动态(微博)比较多,

(3.1.4.3) 按照create_time分库分表

这个一般是数据归档使用
如果按照 create_time 分库分表,在C端高并发场景,同一天的热点数据在同一个表,有热点时单表肯定扛不住。


(3.1.5) 访问量太大怎么办?

使用缓存+数据库,先查缓存,缓存查不到再查数据库。

优点
只需要改少量代码。
方案成熟, 数据复制,管理,修复方案都很成熟。

问题:
1、空数据也得Cache(有一半以上的微博是没有转发也没有评论的,但是依然有大量的访问会查询数据库);
2、Cache频繁失效(由于计数更新非常快,所以经常需要失效Cache再重新缓存,还会导致数据不一致);

更好的硬件解决。 上FusionIO + HandleSocket + 大内存 优化。

总的来说,MySQL分库分表 + Cache加速的方案 对于数据规模和访问量不是特别巨大的情况下,是非常不错的解决方案。


(3.2) Redis缓存方案

Redis作为一个简单的内存数据库,提供了多种数据类型,可以使用string类型的 incr 来计数。
具体命令参考 https://redis.io/commands/incr/

简单的来估算一下数据存储量,按照Redis 6.0的实现,在64位系统,指针为8字节来估算

假设 key 为8字节,value为 4字节,通过incr存储的话

key value 会存储在 dictEntry里,详细的结构: redisServer -> db0 -> dict -> ht[0] -> dictht -> dictEntry

redis索引模型

从后往前

  1. 存储key为8字节数字(64位 8字节 long类型/int64,Eg:9223372036854775807 ),但通过 sdsdup 以字符串的形式存储,至少需要 8(struct sdshdr) + 19(数字转字符串后长度是19) + 1(结束字符) = 28字节;
  2. 存储val为4字节数字(32位 4字节 int类型/int32,Eg: 2147483647 ),通过 createStringObjectFromLongLong 创建一个robj,由于val > REDIS_SHARED_INTEGERS (默认1000), 所以存储时用的 encoding为 REDIS_ENCODING_INT,占用8字节,加上redisObject,一共占用16字节 (这里redis有个优化,把redisObject的指针ptr直接存储int类型,节省了一个指针的开销,所以不是占用 16+8字节 而是16字节)
  3. 为了存储到Redis的dict里,需要一个dictEntry对象,占用 3*8字节=24字节
  4. 放到 db->dict->ht[0]->table 中存储dictEntry的指针,需要8个字节;
  5. 存储一个64位key,32位value的计数,Redis也至少需要耗费: 16 + 28 + 24 + 8 = 76 字节。
  6. 1000亿个key全内存的话,就至少需要 ( 100 * 1000 * 1000 * 1000 ) * 76 = 100G * 76 = 7.6TB 的内存

从技术角度讲
只算value,有效的业务数据其实是 1000亿*32位 = 100G * 4字节 = 400GB,需要7.6TB来存储,内存的有效利用率约为: 400GB/7600GB = 5.3%
key和value全算上,有效业务数据是 1000亿*(64位+32位) = 100G * 12字节 = 1200GB,需要7.6TB存储,内存有效利用率约为:1200GB/7600GB = 15.7%


(3.3) 计数服务-优化Redis源码

计数器是一个普通的基础服务,但是因为数据量太大了,从而量变引发了质变。
所以做Counter时的一个思路就是: 牺牲部分的通用性,针对微博转发和评论的大数据量和高并发访问的特点来进行定点优化。

(3.3.1) 优化Redis结构体

方案二中Redis对内存使用的分析,我们发现是非常”奢侈”的, 大量的重复存储着指针和预留的字段,而且造成大量的碎片内存的使用,当然Redis主要是出于通用性的考虑。
针对这种情况,设计了一个更轻量更简单的数据结构,能更好的利用内存,核心思路:

改动点

  1. 定制化数据结构。
  2. hash冲突时把链地址法优化开放寻址法
/* 
 * 哈希表节点/Entry实体 
 */
typedef struct updateDictEntry {
    int64_t msg_id;  // 信息ID 对应微博ID  
    int repost_num;  // 转发数
    int comment_num; // 评论数
} dictEntry;

优化前后对比

优化前,存储一条微博评论数 消耗 76 字节
优化前内存占用

优化后,存储一条微博转发数+评论数 消耗 16字节
优化后内存占用

  1. Value的长度短于指针长度
  2. 开放寻址Hash表(双重散列)
  3. 以节省指针存储

优化后一条数据占用内存
业务数据 有效内存利用率为

(3.3.1) 业务维度优化

  1. 大量微博(一半以上)没有转发或没有评论,甚至是都没有
    针对这种情况的优化:

抛弃 存储+Cache 的思路.因为这些为0的数据,也必须进到Cache中(无论是旁路还是穿透).
查询量并不小,这对于Cache的利用率影响非常非常的大(有一半的数据是空的。) 而我们采用类似 存储即Cache(存储本身就在内存中) 时,这一类的数据是可以不存储的,当查不到的时候,就返回0。
通过这种优化:

1000亿个数字,我们可以减少3/5,即最多只需要存 400亿个数字。这算是最经典的稀疏数组的优化存储方式了。

  1. 微博的评论数和转发数的关联度非常的高。
    他们都有相同的主Key, 有大量转发的微博一般也有评论,有大量评论的一般转发量也不小。 而且访问量最大的Feed页基本上取评论数的时候,也会取转发数。。。

针对这种情况的优化:

我们将评论数和转发数 可以考虑存储在一起,这样的话,可以节省大量key的存储空间。 由 微博ID+评论数; 微博ID+转发数 变为: 微博ID+评论数+转发数的结构。
PS: 这个优化和上一个优化是有一些小冲突的,部分有转发没有评论的微博,需要多存一个0; 但是经过数据评估,我们发现这个优化还是相当必要的:
key存储的空间比评论数还要长,一个8字节,一个4字节;
对于应用层来说,批量请求可以减少一次访问,能够降请求的压力,同时提升响应的时间;
(具体的数字不方便透露,但是这个结论大家可以随机抽取一批公开的微博来验证)

(3.4) KV存储

(4) 系统安全问题

(4.1) 系统怎么应对爬虫?

网关+风控处理

参考资料

[1] [WeiDesign]微博计数器的设计(上)
[2] [WeiDesign]微博计数器的设计(下)
[3] [WeiDesign]微博计数器的设计 velocity
[4] Velocity分享_微架构设计之微博计数器服务_杜传赢_20121205.pdf
[5] 微博计数:从关系服务到访问计数,Redis持续优化支撑万亿级访问(含PPT)
[6] 微博每日数十亿级业务下的计数器如何扩展Redis?
[7] 高并发系统设计40问 - 37计数系统设计(一):面对海量数据的计数器要如何做?
[8] Storing hundreds of millions of simple key-value pairs in Redis

  同事在实现一个弹窗需求时遇到一个问题,找大家讨论方案,产品的需求是小程序首页各种弹窗太多了,用户一进来需要点多个弹窗,要求服务端限制在首页一个用户一天只能弹一次。当时考虑了数据库和缓存,从实现简易程度、扩展性、速度综合考虑最后选了redis缓存。

Redis数据类型

五种数据形式的底层实现
string:简单动态字符串
list:双向链表,压缩列表
hash:压缩列表,哈希表
Sorted Set:压缩列表,跳表
set:哈希表,整数数组

(3) redis常用数据类型/对象

上面介绍了 6 种底层数据结构,Redis 并没有直接使用这些数据结构来实现键值数据库,而是基于这些数据结构创建了一个对象系统,这个系统包含字符串对象、列表对象、哈希对象、集合对象和有序集合这五种类型的对象,每个对象都使用到了至少一种前边讲的底层数据结构。

redis常用的数据对象有以下几种

/* The actual Redis Object */
#define OBJ_STRING 0    /* String object. */
#define OBJ_LIST 1      /* List object. */
#define OBJ_SET 2       /* Set object. */
#define OBJ_ZSET 3      /* Sorted set object. */
#define OBJ_HASH 4      /* Hash object. */

#define OBJ_MODULE 5    /* Module object. */
#define OBJ_STREAM 6    /* Stream object. */

(3.1) 字符串对象 OBJ_STRING

(3.2) 列表对象 OBJ_LIST

(3.3) 集合对象 OBJ_SET

(3.4) 有序集合对象 OBJ_ZSET

Sorted Set内部实现数据结构是跳表和压缩列表。
跳表主要服务范围操作,提供O(logN)的复杂度。
zset有个ZSCORE的操作,用于返回单个集合member的分数,它的操作复杂度是O(1),这就是收益于你这看到的hash table。这个hash table保存了集合元素和相应的分数,所以做ZSCORE操作时,直接查这个表就可以,复杂度就降为O(1)了。

(3.5) 哈希对象 OBJ_HASH

(2) redis常用数据结构

/* Objects encoding. Some kind of objects like Strings and Hashes can be
 * internally represented in multiple ways. The 'encoding' field of the object
 * is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1     /* Encoded as integer */
#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

(2.1) 简单动态字符串SDS

  代码见
  SDS包括 RAW INT EMDSTR

(2.2) 链表

(2.3) 字典

(2.4) 跳跃表

(2.5) 整数集合

(2.6) 压缩列表

思考

Redis为什么会把整数数组和压缩列表作为底层数据结构?

整数数组和压缩列表在查找时间复杂度方面并没有很大的优势,那为什么 Redis 还会把它们作为底层数据结构呢?

1、内存利用率,数组和压缩列表都是非常紧凑的数据结构,它比链表占用的内存要更少。
Redis是内存数据库,大量数据存到内存中,此时需要做尽可能的优化,提高内存的利用率。

2、数组对CPU高速缓存支持更友好,所以Redis在设计时,集合数据元素较少情况下,默认采用内存紧凑排列的方式存储,同时利用CPU高速缓存不会降低访问速度。当数据元素超过设定阈值后,避免查询时间复杂度太高,转为哈希和跳表数据结构存储,保证查询效率。

数组对cpu缓存友好的原因是: cpu预读取一个cache line大小的数据, 数组数据排列紧凑、相同大小空间保存的元素更多, 访问下一个元素时、恰好已经在cpu缓存了. 如果是随机访问、就不能充分利用cpu缓存了, 拿int元素举例: 一个元素4byte, CacheLine 假设64byte, 可以预读取 16个挨着的元素, 如果下次随机访问的元素不在这16个元素里、就需要重新从内存读取了.

(4) 手动测试过程

 1、flushall 清空所有数据
 2、测试用string类型存 key=1000000,value=1
 3、插入100万uid用了55M左右(实际占用操作系统64M)
 4、插入1000万用了587M(实际占用从操作系统622M)

 感觉不太对,key是long类型 8字节,100万*8=8M, 100万key占8M 100万value占8M 多余的55-8-8=39M去哪了?

(4.1) 测试代码

@Slf4j
public class RedisMemoryTest {

    public Jedis jedis = JedisPoolUtil.getJedis();

    /**
     * 1000万 18位用户id
     */
    @Test
    public void testMemory() {
        // 18位用户id
        long start = 123456789012345678L;
        long end = start + 10000000;
        for (long i = 123456789012345678L; i < end; i++) {
            String res = jedis.set("u" + i, "1");
        }
    }

}

(4.2) redis flushall后 info memory信息

# Memory
used_memory:1180064
used_memory_human:1.13M
used_memory_rss:954368
used_memory_rss_human:932.00K
used_memory_peak:1219088
used_memory_peak_human:1.16M
used_memory_peak_perc:96.80%
used_memory_overhead:1132016
used_memory_startup:1079584
used_memory_dataset:48048
used_memory_dataset_perc:47.82%

(4.3) redis 插入100万数据后的info memory信息

 


127.0.0.1:6379> info memory
# Memory
used_memory:57558992
used_memory_human:54.89M
used_memory_rss:66695168
used_memory_rss_human:63.61M
used_memory_peak:58020704
used_memory_peak_human:55.33M
used_memory_peak_perc:99.20%
used_memory_overhead:49520624
used_memory_startup:1079584
used_memory_dataset:8038368
used_memory_dataset_perc:14.23%

(4.4) redis 插入100万数据后的info memory信息

 


# Memory
used_memory:615390352
used_memory_human:586.88M
used_memory_rss:421154816
used_memory_rss_human:401.64M
used_memory_peak:653409248
used_memory_peak_human:623.14M
used_memory_peak_perc:94.18%
used_memory_overhead:535349840
used_memory_startup:1079584
used_memory_dataset:80040512
used_memory_dataset_perc:13.03%

参考资料

[1] redis源码 - github
[2] 十二张图详解Redis的数据结构和对象系统
[3] “万金油”的String,为什么不好用了

  经常会遇到的一个问题是数据库如何保证不丢数据? 同样的假如把Redis当数据库用,如何保证不丢数据?

 MySQL里有 redo log、bin log、undo log,MySQL通过binlog全量备份+增量备份保证数据不丢。通过redo log和bin log保证数据一致性。

 Redis里有没有类似的功能呢?

 Redis包含 rdb logaof log,可以通过RDB全量备份+aof增量备份保证数据几乎不丢。

阅读全文 »

经常会被问到一个问题,假如Redis宕机,内存中的数据全部丢失,怎么恢复数据?

Redis 分别提供了 RDB 和 AOF 两种持久化机制:
RDB将数据库的快照(snapshot)以二进制的方式保存到磁盘中。类似于MySQL全量备份。
AOF则以协议文本的方式,将所有对数据库进行过写入的命令(及其参数)记录到AOF文件,以此达到记录数据库状态的目的。 类似于MySQL binlog 增量更新。

(1) AOF日志是什么

Redis AOF(Append-Only File)是一种持久化机制,它记录Redis接收到的每个写操作。它通过将每个写操作追加到磁盘上的文件中来实现。这个文件称为AOF文件。

AOF文件以Redis特定的格式编写,以实现最佳性能。它也是只追加的,这意味着一旦数据被写入文件,就不能被修改或删除。这使得它成为一种可靠和安全的数据持久化方式。

和我们常见的WAL日志不同,WAL(Write Ahead Log)是写前日志,在实际写数据前,先把修改的数据记到日志文件中,再去执行命令,这个就要求数据库需要额外的检查命令是否正确。

AOF(Append Only File)日志是一种写后日志,在Redis先执行命令,把数据写入内存后,然后才记录日志,日志会追加到文件末尾,所以叫AOF日志。

阅读全文 »

有一个常见的问题,如果Redis挂了,在启动Redis的时候怎么快速恢复数据?

如果Redis挂了,在启动的时候可以通过AOF或RDB恢复数据。

(1) 什么是RDB

RDB是Redis DataBase 的缩写,中文名为快照/内存快照。
RDB持久化是把当前进程数据生成快照保存到磁盘上的过程,可以抽象为全量备份

MySQL的全量备份通过mysqldump命令触发,Redis的RDB通过什么命令触发?
RDB可以通过savebgsave主从复制触发。


(2) 为什么要用RDB

Redis是个基于内存的数据库。那服务一旦宕机,内存中的数据将全部丢失。为了保存数据,需要有一个持久化方式,RDB是全量备份的方法。

主从复制是分布式数据系统保证可靠性的一个重要机制。RDB为Redis主从提供了数据同步机制。

(2.1) RDB优缺点

优点
RDB文件是某个时间节点的快照,默认使用LZF算法进行压缩,压缩后的文件体积远远小于内存大小,适用于备份、全量复制等场景;
Redis加载RDB文件恢复数据要远远快于AOF方式;

缺点
RDB方式实时性不够,无法做到秒级的持久化;
每次调用bgsave都需要fork子进程,fork子进程属于重量级操作,频繁执行成本较高;
RDB文件是二进制的,没有可读性,AOF文件在了解其结构的情况下可以手动修改或者补全;
版本兼容RDB文件问题;


(3) RDB的原理

Redis RDB数据格式


(4) 源码解读

(4.1) RDB创建的入口函数和触发时机

RDB 文件创建的三个时机,分别是 save命令执行bgsave命令执行 以及 主从复制
对应源码rdb.c文件中的3个函数

函数 作用 备注
rdbSave Redis Server在本地磁盘创建RDB文件。 对应save命令
rdbSaveBackground Redis Server 使用后台子进程方式,在本地磁盘创建 RDB文件 对应bgsaveCommand
rdbSaveToSlavesSockets Redis Server 在采用不落盘方式传输RDB文件进行主从复制时,创建RDB文件 对应了Redis Server 执行主从复制命令,以及周期性检测主从复制状态时触发RDB生成

(4.1.1) 同步阻塞保存-rdbSave

/*
 * 把数据库的数据保存到磁盘上
 *
 * 失败时返回 C_ERR
 * 成功时返回 C_OK 
 *
 * @param *filename 文件名
 * @param *rsi 
 */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp = NULL;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

    // 打开文件
    fp = fopen(tmpfile,"w");
    // 文件不存在,为空
    if (!fp) {
        //  
        char *cwdp = getcwd(cwd,MAXPATHLEN);

        // 提示 打开文件失败
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }

    // rio初始化
    rioInitWithFile(&rdb,fp);

    // 标记开始保存
    startSaving(RDBFLAGS_NONE);

    // fsync 在 rdb 保存时递增 
    if (server.rdb_save_incremental_fsync)
        // todo 自动同步?
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

    // 真正把Redis数据库里的数据保存到文件 
    if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    // 确保数据不会保留在操作系统的输出缓冲区中

    // fflush 把缓冲区的数据写到文件里
    if (fflush(fp)) goto werr;

    // fsync 确保一直到写磁盘操作结束才会返回
    if (fsync(fileno(fp))) goto werr;

    // 关闭文件
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;
    

    // 使用 文件重命名 确保仅当生成的数据库文件正常时才自动更改数据库文件。 

    // 把缓存文件重命名正式文件时,可能没权限,会重命名失败  
    if (rename(tmpfile,filename) == -1) {  // 重命名失败
        char *cwdp = getcwd(cwd,MAXPATHLEN);

        // 打印日志 
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }

    // 打印日志  数据库数据已经保存到磁盘 
    serverLog(LL_NOTICE,"DB saved on disk");
    // 更新
    server.dirty = 0;
    // 更新上次保存时间
    server.lastsave = time(NULL);
    // 更新上次保存状态
    server.lastbgsave_status = C_OK;
    // 停止保存 发布事件
    stopSaving(1);
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    // 关闭文件
    if (fp) fclose(fp);
    // 删除缓存文件
    unlink(tmpfile);
    // 停止保存 发布事件
    stopSaving(0);
    return C_ERR;
}

Redis保存RDB文件的代码确实不错,在使用资源前校验资源的合法性,使用后确保数据都保存到磁盘,还通过重命名确认保存成功,并且在完后后货失败后释放对应资源。

虽然很细节很简单,但是很健壮。


(4.1.2) 异步保存-rdbSaveBackground

/*
 * 
 * @param *filename 文件名
 * @param *rsi 
 */
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;

    // 有活跃的子进程 
    if (hasActiveChildProcess()) return C_ERR;

    //  
    server.dirty_before_bgsave = server.dirty;
    //
    server.lastbgsave_try = time(NULL);
    // 打开子
    openChildInfoPipe();

    // 复制进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {  // 
        int retval;

        // 子进程设置调用名称
        redisSetProcTitle("redis-rdb-bgsave");
        // 设置 bgsave子进程的cpu关联列表
        redisSetCpuAffinity(server.bgsave_cpulist);
        // 在子进程里调用 rdbSave() 同步保存 
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            // 发送子进程写时复制(Copy On Write)信息 
            sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
        }
        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        if (childpid == -1) {
            // 关闭 
            closeChildInfoPipe();
            // 后台保存状态错误
            server.lastbgsave_status = C_ERR;
            // 不能后台保存  子进程复制出错
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        }

        // 
        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

(4.1.3) 主从同步生成RDB-rdbSaveToSlavesSockets

/* 
 * 生成一个 RDB 子进程,它将 RDB 写入当前处于 SLAVE_STATE_WAIT_BGSAVE_START 状态的从属 sockets。
 */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {

    listNode *ln;
    listIter li;
    pid_t childpid;
    int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;

    // 有活跃子进程 
    if (hasActiveChildProcess()) return C_ERR;

    // 即使之前的fork子进程退出了,在我们排干管道之前不要开一个新的。
    if (server.rdb_pipe_conns) return C_ERR;

    /* Before to fork, create a pipe that is used to transfer the rdb bytes to
     * the parent, we can't let it write directly to the sockets, since in case
     * of TLS we must let the parent handle a continuous TLS state when the
     * child terminates and parent takes over. */
    if (pipe(pipefds) == -1) return C_ERR;
    // 
    server.rdb_pipe_read = pipefds[0]; /* read end */
    // 
    rdb_pipe_write = pipefds[1]; /* write end */
    // 
    anetNonBlock(NULL, server.rdb_pipe_read);

    // 创建另一个管道,父进程使用该管道向子进程发出可以退出的信号。 
    if (pipe(pipefds) == -1) {
        // 关闭rdb管道写操作 
        close(rdb_pipe_write);
        // 关闭rdb管道读
        close(server.rdb_pipe_read);
        return C_ERR;
    }
    safe_to_exit_pipe = pipefds[0]; /* read end */
    server.rdb_child_exit_pipe = pipefds[1]; /* write end */

    // 收集我们要将 RDB 传输到的 从节点 的连接,这些从节点处于 WAIT_BGSAVE_START 状态。 
    // 创建 connection数组  长度=从节点个数
    server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
    server.rdb_pipe_numconns = 0;
    server.rdb_pipe_numconns_writing = 0;
    // 链表迭代器 
    listRewind(server.slaves,&li);
    // 遍历链表
    while((ln = listNext(&li))) {
        // 从节点 
        client *slave = ln->value;
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { // 
            // 连接赋值
            server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
            // 发送 FULLRESYNC 回复
            replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
        }
    }

    // 创建父子进程通信管道
    openChildInfoPipe();
    // 创建子进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
        /* Child */
        int retval, dummy;
        rio rdb;

        rioInitWithFd(&rdb,rdb_pipe_write);

        // 设置进程名称  
        redisSetProcTitle("redis-rdb-to-slaves");
        // 设置 子进程的cpu关联列表
        redisSetCpuAffinity(server.bgsave_cpulist);

        // 写RDB数据
        retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
        if (retval == C_OK && rioFlush(&rdb) == 0)
            retval = C_ERR;

        if (retval == C_OK) {
            // 发送子进程写时复制信息 
            sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
        }

        //   
        rioFreeFd(&rdb);
        // 关闭rdb写管道    // 唤醒读取着,告诉他们我们已经完成 
        close(rdb_pipe_write);
        // 关闭rdb子进程退出管道    // 关闭写结束,以便我们可以检测到父级的关闭。
        close(server.rdb_child_exit_pipe); 
        // 等待退出直到父进程告诉我们它是安全的。 我们不期望读取任何内容,只是在管道关闭时收到错误。 
        dummy = read(safe_to_exit_pipe, pipefds, 1);
        // 
        UNUSED(dummy);
        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        close(safe_to_exit_pipe);
        if (childpid == -1) {
            // 打日志  不能后台保存 fork错误 
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));

            /* Undo the state change. The caller will perform cleanup on
             * all the slaves in BGSAVE_START state, but an early call to
             * replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
            //  
            listRewind(server.slaves,&li);
            while((ln = listNext(&li))) {
                client *slave = ln->value;
                if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
                    slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
                }
            }
            // 关闭 rdb写管道
            close(rdb_pipe_write);
            // 
            close(server.rdb_pipe_read);
            // 释放内存
            zfree(server.rdb_pipe_conns);
            server.rdb_pipe_conns = NULL;
            server.rdb_pipe_numconns = 0;
            server.rdb_pipe_numconns_writing = 0;
            // 
            closeChildInfoPipe();
        } else {
            serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
                childpid);
            server.rdb_save_time_start = time(NULL);
            server.rdb_child_pid = childpid;
            server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
            updateDictResizePolicy();
            // 在父进程关闭rdb写管道,以便它可以检测到孩子的关闭。
            close(rdb_pipe_write);
            // 创建文件事件  设置回调函数 rdbPipeReadHandler
            if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
                serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
            }
        }
        return (childpid == -1) ? C_ERR : C_OK;
    }
    return C_OK; /* Unreached. */
}

(4.2) RDB文件如何生成

生成RDB文件的主要逻辑在rdbSaveRio函数

Redis RDB数据格式

主要步骤如下:

  1. 保存元数据信息,比如魔数属性信息 (类似RPC序列化里的)
  2. 保存所有数据库字典里的键值对(包括内存淘汰策略、过期时间)
  3. 保存结束符校验和 等。
/* 
 * 生成 RDB 格式的数据库转储,将其发送到指定的 Redis I/O 通道。 
 * 成功时返回 C_OK,否则返回 C_ERR,并且由于I/O错误可能会丢失部分输出或全部输出。
 *
 * 当函数返回 C_ERR 并且如果 'error' 不为 NULL 时,
 * 'error' 指向的整数将设置为紧接在 I/O 错误之后的 errno 的值。
 *
 * @param *rdb
 * @param *error
 * @param rdbflags
 * @param *rsi
 */
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;

    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;

    // 生成魔数magic    
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    // 将magic写入RDB文件
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;

    // 写入属性信息
    if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;

    // 写入 信息
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

    //  
    for (j = 0; j < server.dbnum; j++) {
        // 获取redisDb 
        redisDb *db = server.db+j;
        // 字典
        dict *d = db->dict;
        // 字典为空 跳过
        if (dictSize(d) == 0) continue;
        // 迭代器
        di = dictGetSafeIterator(d);


        // 写 SELECT DB 操作符  // 解决数据一致性问题  避免从节点写到其它库里
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        // 保存编码长度 
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Write the RESIZE DB opcode. */
        uint64_t db_size, expires_size;
        // 数据个数
        db_size = dictSize(db->dict);
        // 过期数据个数
        expires_size = dictSize(db->expires);
        // 
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        // 
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        // 
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        // 遍历DB里的每个entry 
        while((de = dictNext(di)) != NULL) {
            // 获取key  二进制安全的key
            sds keystr = dictGetKey(de);
            // 
            robj key, *o = dictGetVal(de);
            long long expire;

            // 初始化key的状态
            initStaticStringObject(key,keystr);
            // 获取key的过期时间
            expire = getExpire(db,&key);
            // 保存键值对
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;


            // 当此RDB作为AOF重写的一部分生成时,在重写时将累积的差异从父级移动到子级,以便最终写入更小。 

            if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            {
                //  
                processed = rdb->processed_bytes;
                // 
                aofReadDiffFromParent();
            }
        }
        // 
        dictReleaseIterator(di);
        di = NULL;  // 这样我们就不会因为错误而再次发布它。 
    }

    /* If we are storing the replication information on disk, persist
     * the script cache as well: on successful PSYNC after a restart, we need
     * to be able to process any EVALSHA inside the replication backlog the
     * master will send us. */
    if (rsi && dictSize(server.lua_scripts)) {
        // 
        di = dictGetIterator(server.lua_scripts);
        // 遍历
        while((de = dictNext(di)) != NULL) {
            //  
            robj *body = dictGetVal(de);
            // 
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        // 
        dictReleaseIterator(di);
        di = NULL;  // 这样我们就不会因为错误而再次发布它。 
    }

    //  
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

    // 保存RDB文件结束符
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    // CRC64 校验和。 如果禁用校验和计算,它将为零,在这种情况下加载代码会跳过检查。
    cksum = rdb->cksum;
    // 小端编码
    memrev64ifbe(&cksum);
    // 写入校验和
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;
}
// file: src/rdb.h

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_MODULE_AUX 247   /* Module auxiliary data. */
#define RDB_OPCODE_IDLE       248   /* LRU idle time. */
#define RDB_OPCODE_FREQ       249   /* LFU frequency. */
#define RDB_OPCODE_AUX        250   /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB   251   /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252    /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253       /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB   254   /* DB number of the following keys. */
#define RDB_OPCODE_EOF        255   /* End of the RDB file. */
/* 
 * 保存一些默认的 AUX 字段,其中包含有关生成的 RDB 的信息。
 */
int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
    //  
    int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
    // 
    int aof_preamble = (rdbflags & RDBFLAGS_AOF_PREAMBLE) != 0;

    // 在创建 RDB 时添加一些关于状态的字段。

    // 保存 Redis版本信息
    if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
    // 保存 Redis运行平台的架构信息 (32位还是64位) 
    if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
    // 保存 RDB文件的创建时间
    if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
    // 保存 Redis Server已使用的内存空间大小
    if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;

    /* Handle saving options that generate aux fields. */
    if (rsi) {
        // 保存  
        if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
            == -1) return -1;

        // 保存 复制id = Redis Server 的 server.replid
        if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
            == -1) return -1;

        // 保存 复制偏移量 = 主节点的复制偏移量  
        if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
            == -1) return -1;
    }

    // 保存 aof-preamble
    if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
    return 1;
}
// file: src/rdb.c

/* 
 * 保存键值对
 * 
 * 保存键值对,带过期时间、类型、键、值 
 * 错误时返回-1
 * 如果key实际保存成功,则返回1,否则返回0(key已过期)。
 * 
 * @param *rdb 
 * @param *key 
 * @param *val
 * @param expiretime
 */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    // 内存淘汰策略
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    // 过期时间
    if (expiretime != -1) {  // key是会过期的
        // 保存过期时间类型(ms)
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        // 保存过期时间(ms级)
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    // 保存LRU信息
    if (savelru) {  // 如果使用lru
        // 对象空闲时间  ms
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; // 使用秒就足够了,而且需要的空间更少。 
        // 保存类型为LRU空闲时间
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        // 保存对象空间时间 (秒级)
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    // 保存LFU信息
    if (savelfu) {
        uint8_t buf[1];
        // 
        buf[0] = LFUDecrAndReturn(val);

        // 我们可以用两个字节对其进行编码:操作码和一个8位计数器,因为频率是 0-255 范围内的对数。
        // 请注意,我们不存储减半时间,因为在加载时将其重置一次不会对频率产生太大影响。  

        // 保存类型为LFU
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        // 保存LFU信息
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }


    // 保存类型
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 保存key
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 保存value
    if (rdbSaveObject(rdb,val,key) == -1) return -1;

    // 如果需要延迟返回(用于测试)
    if (server.rdb_key_save_delay)
        usleep(server.rdb_key_save_delay);

    return 1;
}

(4.2.1) 写入数据-rdbWriteRaw

static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
    if (rdb && rioWrite(rdb,p,len) == 0)
        return -1;
    return len;
}
// file: src/io.h

/*
 * 
 * @param *r 
 * @param *buf 要写入的数据
 * @param len 数据长度
 */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    //  
    if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
    // 长度大于-0
    while (len) {
        // 要写入的字节数 
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        // 更新校验和
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        // 写入数据
        if (r->write(r,buf,bytes_to_write) == 0) {
            r->flags |= RIO_FLAG_WRITE_ERROR;
            return 0;
        }
        //
        buf = (char*)buf + bytes_to_write;
        // 更新要写入的长度
        len -= bytes_to_write;
        // 
        r->processed_bytes += bytes_to_write;
    }
    return 1;
}

参考资料

[1] Redis 源码剖析与实战 - 18 | 如何生成和解读RDB文件?
[2] redis-rdb.c - github

0%