|| d->ht[0].size == realsize)
return DICT_ERR;
// 创建并初始化新哈希表
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
// 如果 ht[0] 为空,那么这就是一次创建新哈希表行为
// 将新哈希表设置为 ht[0] ,然后返回
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
// 如果 ht[0] 不为空,那么这就是一次扩展字典的行为
// 将新哈希表设置为 ht[1] ,并打开 rehash 标识
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
字典dict的rehashidx被设置成0后,就表示开始rehash动作,在心跳函数执行的过程,会检查到这个标志,如果需要rehash,就行进行渐进式rehash动作。函数调用的过程为:
serverCron->incrementallyRehash->dictRehashMilliseconds->dictRehash
incrementallyRehash函数代码:
/*
* 在 Redis Cron 中调用,对数据库中第一个遇到的、可以进行 rehash 的哈希表
* 进行 1 毫秒的渐进式 rehash
*/
void incrementallyRehash(void) {
int j;
for (j = 0; j < server.dbnum; j++) {
/* Keys dictionary */
if (dictIsRehashing(server.db[j].dict)) {
dictRehashMilliseconds(server.db[j].dict,1);
break; /* 已经耗尽了指定的CPU毫秒数 */
}
...
}
dictRehashMilliseconds函数是按照指定的CPU运算的毫秒数,执行rehash动作,每次一个100个为单位执行。代码如下:
/*
* 在给定毫秒数内,以 100 步为单位,对字典进行 rehash 。
*/
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {/*每次100步数据*/
rehashes += 100;
if (timeInMilliseconds()-start > ms) break; /*耗时完毕,暂停rehash*/
}
return rehashes;
}
/*
* 执行 N 步渐进式 rehash 。
*
* 如果执行之后哈希表还有元素需要 rehash ,那么返回 1 。
* 如果哈希表里面所有元素已经迁移完毕,那么返回 0 。
*
* 每步 rehash 都会移动哈希表数组内某个索引上的整个链表节点,
* 所以从 ht[0] 迁移到 ht[1] 的 key 可能不止一个。
*/
int dictRehash(dict *d, int n) {
if (!dictIsRehashing(d)) return 0;
while(n--) {
dictEntry *de, *nextde;
// 如果 ht[0] 已经为空,那么迁移完毕
// 用 ht[1] 代替原来的 ht[0]
if (d->ht[0].used == 0) {
// 释放 ht[0] 的哈希表数组
zfree(d->ht[0].table);
// 将 ht[0] 指向 ht[1]
d->ht[0] = d->ht[1];
// 清空 ht[1] 的指针
_dictReset(&d->ht[1]);
// 关闭 rehash 标识
d->rehashidx = -1;
// 通知调用者, rehash 完毕
return 0;
}
assert(d->ht[0].size > (unsigned)d->rehashidx);
// 移动到数组中首个不为 NULL 链表的索引上
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
// 指向链表头
de = d->ht[0].table[d->rehashidx];
// 将链表内的所有元素从 ht[0] 迁移到 ht[1]
// 因为桶内的元素通常只有一个,或者不多于某个特定比率
// 所以可以将这个操作看作 O(1)
while(de) {
unsigned int h;
nextde = de->next;
/* Get the index in the new hash table */
// 计算元素在 ht[1] 的哈希值
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
// 添加节点到 ht[1] ,调整指针
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
// 更新计数器
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
// 设置指针为 NULL ,方便下次 rehash 时跳过
d->ht[0].table[d->rehashidx] = NULL;
// 前进至下一索引
d->rehashidx++;
}
// 通知调用者,还有元素等待 rehash
return 1;
} 总结,Redis的rehash动作是一个内存管理和数据管理的一个核心操作,由于Redis主要使用单线程做数据管理和消息效应,它的rehash数据迁移过程采用的是渐进式的数据迁移模式,这样做是为了防止rehash过程太长堵塞数据处理线程。并没有采用memcached的多线程迁移模式。关于memcached的rehash过程,以后再做介绍。从redis的rehash过程设计的很巧,也很优雅。在这里值得注意的是,redis在find数据的时候,是同时查找正在迁移的ht[0]和被迁移的ht[1]。防止迁移过程数据命不中的问题。