LucienXian's Blog


  • 首页

  • 归档

  • 标签

Atomicity All-or-Nothing and Before-or-After——MIT6.824

发表于 2019-08-18

Atomicity: All-or-Nothing and Before-or-After——MIT6.824

《Principles Computer System Design Introduction》

Before-or-After Atomicity: Coordinating Concurrent Threads

并发操作中经常会出现data race condition,从程序员的角度看,有两种不同的并发协调做法:序列协调和原子性(sequence coordina­tion and before-or-after atomicity)。序列协调指的是约束"动作W必须在动作X之前发生",而原子性则是一种更普遍的约束,即同时对同一个数据进行操作的若干动作不会相互干扰。我们对Before-or-after atomicity的定义是:

Concurrent actions have the before-or-after property if their effect from the point of view of their invokers is the same as if the actions occurred either completely before or completely after one another.

与序列协调不同,before-or-after原子性对于程序员来说无法知道共享变量的所有其他动作的ID。程序员需要的是一种自动的隐式机制,可确保正确处理每个共享变量。举个例子,在操作系统中,几个并发线程可能决定在某个时间使用共享打印机。而且,哪个线程首先使用打印机并不重要,重要的是是打印机的一次使用在下一次开始之前必须要完成。

否则,多个线程交叉发生很可能引起最终的结果不一致,如图,如果两个线程对这个操作交叉执行,就会产生不一致的行为,原子性要保证的就是两个线程对B的操作必须是原子操作。

img
img

Correctness and Serialization

我们的目标是对before-or-after原子性进行正确性的验证,而不会涉及使用该机制的应用程序是否正确的问题。此正确性标准意味着如果并发操作的结果是通过某些纯串行应用的相同操作获得的结果,则算正确协调并发操作。

因此我们对before-or-after原子性的定义就是每个before-or-after的行为都表现得是执行之前或者完全执行之后的效果。

Simple Locking

simple locking有两个规则,首先是每个事务必须在执行任何实际读取和写入之前为其操作的所有共享对象获取锁;其次是必须事务完成上次更新并提交或者重新加载数据并终止后才会释放锁。

simple locking能有效地协调并发事务,在该规则下一个事务必须在前一个事务完成之后进行、在后一个事务开始之前进行,并且进行中的事务不会拥有共同的数据。并发事务产生的结果就像是按照序列化顺序进行的一样。

但是simple locking也会因为性能问题影响其并发过程,因为它要求事务获取它将要读取或写入的每个共享对象的锁,如果其需要读取的锁数量大于当前的数量,那么就会在一定程度上影响其并发性能。

Two-Phase Locking

两阶段锁协议整个过程分为两个阶段:一是加锁,二是释放锁。加锁过程中事务只能加锁或者操作数据,在其通过某个锁定点之前都不能释放锁。而释放锁的过程中事务只能解锁或者操作数据,而不能再重新上锁了。

虽然两阶段锁协议比简单的锁定具有更好的并发性能,例如假设事务T1读取X,接着写Y,而事务T2只执行写入Y。在两阶段锁协议下,T2只能在T1两个动作之前或者之后发生。但事实上T2在T1两个动作中间进行与T2完全在T1之前进行的效果是一样的。允许所有可能的并发性同时确保的before-or-after atomicity规则很难设计。

锁与日志之间的关系有两点是需要考虑的:单个的中止事务和系统恢复。对于前者,协议要求中止事务在释放任何锁之前将其更改的数据对象恢复为原始值,因此不需要对中止的事务采取特殊的计算。至于后者,锁不是在非易失性存储中,因此在系统的恢复过程中必须将锁捕获以释放锁。然而我们还需要考虑的是,基于日志的恢复算法是否构建了正确的系统状态,因为系统崩溃可能是由于在崩溃之前提交的那些事务的串行排序引起的。

假设锁是在易失性存储器中,在系统崩溃的瞬间所有锁的记录都丢失。某些事务(记录BEGIN记录但尚未记录END记录的事务)可能尚未完成。但由于在那一瞬间所有事物的锁集合都是不重叠的,因此在恢复过程中可以不加锁地通过执行恢复算法重建系统状态,当然这一恢复过程中不能有新的事务产生。

Multiple-Site Atomicity: Distributed Two-Phase Commit

如果事务需要在分布式的环境下执行,则需要使用结合了persis­tent senders, duplicate suppression和single-site transactions的两阶段提交协议。

这是因为分布式架构中,不同的节点之间只能知道自己的操作是否成功,而无法知道其他节点的操作的成功或失败。为了保证事务的特性,需要引入一个作为协调者的组件来统一掌控所有节点(参与者)的操作结果并最终指示这些节点的最终提交。

第一阶段:提交请求的投票阶段

  1. 协调者向所有参与者节点发起是否可以提交事务的询问;
  2. 参与者执行相关的事务操作,并记录Undo和Redo日志;
  3. 各个参与对询问进行响应,同意或者终止;

第二阶段:提交执行的完成阶段

  1. 协调者节点向所有参与者节点发出"正式提交"/"回滚操作"的请求;
  2. 参与者节点正式完成操作/参与者利用Undo信息进行回滚,并释放在整个事务期间内占用的资源;
  3. 参与者响应完成信息;
  4. 协调者收到所有信息后,完成事务;

通信流程参考:

img
img

2PC也存在一些缺点,其中一个就是执行过程中,节点处于阻塞状态;另一个就是出现节点崩溃时,只能依赖协调者去进行回滚;并且协调者还存在单点故障的问题。

redis设计与实现——压缩列表

发表于 2019-08-16

redis设计与实现——压缩列表

ziplist压缩列表是有序集合键(另一个是跳表)和哈希键(另一个是字典)的底层实现之一(在3.2之后不再是list的底层实现,被quicklist取代了)。如果一个列表键只包含少量的项,并且每个列表项要么是小整数值,要么是短的字符串,那么Redis就会用ziplist表示列表键。

另外,当一个哈希键只包含少量键值对,并且每个key和value要么是小整数值,要么是短的字符串,那么Redis就会用ziplist表示哈希键。

压缩列表的构成

压缩列表并没有使用一个数据结构去表示。为了节省内存,ziplist是通过特殊编码的连续内存块组成的顺序型结构。我们可以通过其创建步骤来看其结构内容,一个压缩列表包含了多个节点,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* ziplist头: 2个32位的整数存总共字节数,1个16位的整数存item */
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
/* 1个字节表示ziplist的最后一个item的size */
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
/* 获取ziplist的zlbytes指针 */
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
/* 获取ziplist的zltail指针 */
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
/* 获取zllen指针 */
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
/* 特殊值,0xFF用来标记压缩列表的末端 */
#define ZIP_END 255

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
ZIPLIST_LENGTH(zl) = 0;
zl[bytes-1] = ZIP_END;
return zl;
}

因此压缩列表的结构可以表示为(小端序):

1
2
3
4
5
6
/*
* <zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
* [0f 00 00 00] [0c 00 00 00] [02 00] [00 f3] [02 f6] [ff]
* | | | | | |
* zlbytes zltail zllen "2" "5" end
*/

从这个例子中可以看到:

  • zlbytes为0x0f,表示压缩列表总长度为15;
  • zltail为0x0c,表示如果我们有一个指向压缩列表起始地址的指针p,那么只要指针p加上偏移12,则可以得到entry2的地址;
  • zllen为0x02,表示有2个列表节点;
  • zlend是特殊值0xFF,即255,用来标记压缩列表末端;

压缩列表节点的组成

Redis使用了一个结构来表示压缩列表的节点,这个结构体并不是真正的编码方式,只是用来做内部函数操作(主要是使用zipEntry函数根据p指针返回一个zlentry),另外还使用了一个函数来创建节点,压缩列表的节点可以保存一个字节数组或者是一个整数值。

1
2
3
4
5
6
7
8
9
typedef struct zlentry {
unsigned int prevrawlensize; /* prevrawlen的字节数,1或者5,即如果prevrawlen小于254,其就是1*/
unsigned int prevrawlen; /* 前一个节点长度 */
unsigned int lensize; /* 编码len的所需字节大小*/
unsigned int len; /* 当前节点长度 */
unsigned int headersize; /* header大小 = prevrawlensize + lensize. */
unsigned char encoding; /* 节点的编码方式:ZIP_STR_* or ZIP_INT_* */
unsigned char *p; /* 指向节点的指针 */
} zlentry;
1
2
3
4
5
6
7
8
// 根据节点指针p返回一个zlentry
void zipEntry(unsigned char *p, zlentry *e) {

ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
e->headersize = e->prevrawlensize + e->lensize;
e->p = p;
}

为了节省内存,redis的压缩列表使用了节点的encoding记录了节点所保存的数据类型和长度。以下是不同的编码方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Different encoding/length possibilities */
#define ZIP_STR_MASK 0xc0 // 字节数组的编码不会以11开头
#define ZIP_INT_MASK 0x30
#define ZIP_STR_06B (0 << 6) // 小于63的字节数组
#define ZIP_STR_14B (1 << 6) // 小于2^14-1字节的字节数组
#define ZIP_STR_32B (2 << 6) // 小于2^32-1字节的字节数组
#define ZIP_INT_16B (0xc0 | 0<<4) // 11000000,int16_t的整数
#define ZIP_INT_32B (0xc0 | 1<<4) // 11010000,int32_t的整数
#define ZIP_INT_64B (0xc0 | 2<<4) // 11100000,int64_t的整数
#define ZIP_INT_24B (0xc0 | 3<<4) // 11110000,24位有符号整数
#define ZIP_INT_8B 0xfe // 11111110,8位有符号整数

/* 1111xxxx,使用xxxx来保存一个介于0-12的值 */
#define ZIP_INT_IMM_MASK 0x0f
#define ZIP_INT_IMM_MIN 0xf1 /* 11110001,直接编码存储的最小值 */
#define ZIP_INT_IMM_MAX 0xfd /* 11111101,直接编码存储的最大值*/

可以看到,压缩列表的节点保存了整数和字符数组两种类型,并针对不同的长度做了不同的编码解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 检查字符串是否可以被编码成整数
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
long long value;

if (entrylen >= 32 || entrylen == 0) return 0;
// 尝试将entry转换为long long
if (string2ll((char*)entry,entrylen,&value)) {
/* Great, the string can be encoded. Check what's the smallest
* of our encoding types that can hold this value. */
if (value >= 0 && value <= 12) {
*encoding = ZIP_INT_IMM_MIN+value; // 直接将value保存在encoding后四位
} else if (value >= INT8_MIN && value <= INT8_MAX) {
*encoding = ZIP_INT_8B;
} else if (value >= INT16_MIN && value <= INT16_MAX) {
*encoding = ZIP_INT_16B;
} else if (value >= INT24_MIN && value <= INT24_MAX) {
*encoding = ZIP_INT_24B;
} else if (value >= INT32_MIN && value <= INT32_MAX) {
*encoding = ZIP_INT_32B;
} else {
*encoding = ZIP_INT_64B;
}
*v = value;
return 1;
}
return 0;
}

unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
unsigned char len = 1, buf[5];

// #define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
if (ZIP_IS_STR(encoding)) {
/* Although encoding is given it may not be set for strings,
* so we determine it here using the raw length. */
if (rawlen <= 0x3f) { // 如果数组长度小于64,用一个字节进行存储
if (!p) return len; // p为null时,只获取len
buf[0] = ZIP_STR_06B | rawlen;
} else if (rawlen <= 0x3fff) { // 同理,用两个字节存储
len += 1;
if (!p) return len;
buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
buf[1] = rawlen & 0xff;
} else { // 用五个字节存储
len += 4;
if (!p) return len;
buf[0] = ZIP_STR_32B;
buf[1] = (rawlen >> 24) & 0xff;
buf[2] = (rawlen >> 16) & 0xff;
buf[3] = (rawlen >> 8) & 0xff;
buf[4] = rawlen & 0xff;
}
} else {
/* Implies integer encoding, so length is always 1. */
if (!p) return len;
buf[0] = encoding;
}

/* 将编码存储到p指针中 */
memcpy(p,buf,len);
return len;
}

因此其编码模式总结就是:

  • 字节数组编码
编码 编码长度 值
00bbbbbb 1字节 小于64字节的字节数组
01bbbbbb xxxxxxxx 2字节 小于16383字节的字节数组
10______ xxxxxxxx
xxxxxxxx xxxxxxxx xxxxxxxx
5字节 小于等于2^32-1的字节数组,有6个bit留空
  • 整数编码
编码 编码长度 值
1111xxxx 1字节 在[0,12]区间的整数
11111110 1字节 8bit有符号整数
11110000 1字节 24bit有符号整数
11100000 1字节 int64_t
11010000 1字节 int32_t
11000000 1字节 int16_t

content

虽然redis使用了zlentry作为内部节点的数据结构,但其真实编码表示并不是按照该结构体来计算的。redis对字节数组或者整数的编码方式可以参考节点插入的过程来解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define ZIPLIST_HEAD 0
#define ZIPLIST_TAIL 1

#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))

#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
unsigned char *p;
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
return __ziplistInsert(zl,p,s,slen); // 将长度为slen的s插入p所在位置
}

节点的插入是通过上面这个函数实现的,分为从头部和尾部进行插入。至于具体的entry表示方式则要看插入节点的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/* Insert item at "p". */
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; // 存储当前的总长和将要存储的列表长度
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; /* initialized to avoid warning. Using a value
that is easy to see if for some reason
we use it uninitialized. */
zlentry tail;

/* Find out prevlen for the entry that is inserted. */
if (p[0] != ZIP_END) { // 如果p不是列表尾部
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);// 计算prevlensize和prevlen
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);// 获取最后一个节点
if (ptail[0] != ZIP_END) { // 列表不为空
prevlen = zipRawEntryLength(ptail); // 解码,获取最后一个节点的长度,其实就是将要插入节点的上一个节点长度
}
}

/* 尝试编码value */
if (zipTryEncoding(s,slen,&value,&encoding)) {
/* 'encoding' is set to the appropriate integer encoding */
reqlen = zipIntSize(encoding); // 根据encoding获取长度
} else {
/* 'encoding' is untouched, however zipStoreEntryEncoding will use the
* string length to figure out how to encode it. */
reqlen = slen; // 直接使用字符数组长度
}
/* We need space for both the length of the previous entry and
* the length of the payload. */
reqlen += zipStorePrevEntryLength(NULL,prevlen); // 计算编码prevlen需要的长度
reqlen += zipStoreEntryEncoding(NULL,encoding,slen); // 计算编码encoding需要的长度

/* 如果不是往尾部插入,则需要判断当前prevlen长度是否足够
* 由于我们是用prevlen来存储上一个节点的长度,即prevlen在1或者5个字节间选择 * 因此需要考虑到前一个节点的插入影响了原先的prelem编码长度 */
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}

/* 由于重新分配内存,因此需要记录计算偏移 */
offset = p-zl; // 记录原偏移
zl = ziplistResize(zl,curlen+reqlen+nextdiff); // 重新分配内存
p = zl+offset; // 根据偏移获取p指针

/* Apply memory move when necessary and update tail offset. */
if (p[0] != ZIP_END) {
/* 如果不是在尾部插入,则需要把数据整体往后挪*/
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

/* Encode this entry's raw length in the next entry. */
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);

/* 更新tail值 */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
/* 在尾部插入则直接更新tail_offset */
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}

/* 进行级联更新*/
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}

/* Write the entry */
p += zipStorePrevEntryLength(p,prevlen);// 记录prevlen
p += zipStoreEntryEncoding(p,encoding,slen); // 记录encoding
if (ZIP_IS_STR(encoding)) { // 记录字符数组
memcpy(p,s,slen);
} else { // 记录整数
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1); // ziplist的len加1
return zl;
}

因此一个节点的完整编码结构包含了prevlen,encoding和content三个部分,下图就是一个保存着整数值10086的编码结构。

img
img

级联更新

考虑这样的一种情况,如果目前所有节点的长度都在250-253字节之间,那么意味着记录这些节点只需要1字节长的prevlen。但此时如果将一个长度大于或等于254字节的新节点设置为ziplist的头部节点,那么将直接影响后续所有节点的prevlen编码长度。

Redis将会因此触发级联更新,即遍历所有需要更新的节点进行处理,直到不需要更新为止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;

while (p[0] != ZIP_END) {
zipEntry(p, &cur); // 根据p重建zlentry
rawlen = cur.headersize + cur.len; // 当前长度
rawlensize = zipStorePrevEntryLength(NULL,rawlen);

/* Abort if there is no next entry. */
if (p[rawlen] == ZIP_END) break;
zipEntry(p+rawlen, &next);

/* Abort when "prevlen" has not changed. */
if (next.prevrawlen == rawlen) break;

if (next.prevrawlensize < rawlensize) {
/* 只有需要扩展的才能引发连锁更新*/
offset = p-zl;
extra = rawlensize-next.prevrawlensize;
zl = ziplistResize(zl,curlen+extra);
p = zl+offset;

/* Current pointer and offset for next element. */
np = p+rawlen;
noffset = np-zl;

/* Update tail offset when next element is not the tail element. */
if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}

/* Move the tail to the back. */
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
zipStorePrevEntryLength(np,rawlen);

/* Advance the cursor */
p += rawlen;
curlen += extra;
} else {
if (next.prevrawlensize > rawlensize) {
/* This would result in shrinking, which we want to avoid.
* So, set "rawlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
}

/* Stop here, as the raw length of "next" has not changed. */
break;
}
}
return zl;
}

总结

  1. 压缩列表是列表键和哈希键的底层实现之一;
  2. 列表中包含多个节点,每个节点都可以包含一个字节数组或者整数;
  3. 添加或者删除节点都可以能引发连锁的更新操作;

redis设计与实现——整数集合

发表于 2019-08-13

redis设计与实现——整数集合

整数集合intset是集合键的底层实现之一,当一个集合只包含整数元素并且元素数量不多时,redis就会用intset作为集合键的底层实现。

整数集合的实现

intset的数据结构在intset.h/c的表示方式如下:

1
2
3
4
5
typedef struct intset {
uint32_t encoding; // 编码方式
uint32_t length; // 集合中元素数量
int8_t contents[]; // 保存元素的数组
} intset;

contents保存的就是intset中各个元素,虽然其声明为int8_t,但实际上其保存的类型取决于encoding的值。

encoding的可能选项有三种:

1
2
3
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

另外,encoding的类型是由contents中最大的一个数决定的。contents数组则按小到大保存着所有元素。

创建空的intset时默认为int16:

1
2
3
4
5
6
7
/* Create an empty intset. */
intset *intsetNew(void) {
intset *is = zmalloc(sizeof(intset));
is->encoding = intrev32ifbe(INTSET_ENC_INT16);// 大小端转换
is->length = 0;
return is;
}

由于contents是一段连续的内存,并存储超过了一个字节,元素也是按照大小排序,因此需要考虑系统的大小端问题。redis都是按照小端来使用,在/src/endianconv.h中有一段相关的宏定义:

1
2
3
4
5
6
7
8
9
10
11
/* variants of the function doing the actual conversion only if the target
* host is big endian */
#if (BYTE_ORDER == LITTLE_ENDIAN)
#define intrev16ifbe(v) (v)
#define intrev32ifbe(v) (v)
#define intrev64ifbe(v) (v)
#else
#define intrev16ifbe(v) intrev16(v)
#define intrev32ifbe(v) intrev32(v)
#define intrev64ifbe(v) intrev64(v)
#endif

升级

每当redis要添加一个新元素到整数集合时,并且新元素的类型比当前整数集合的encoding要更长时,就需要先进行升级。

因此在这种情况下,添加一个新元素的步骤就是:升级、查找和插入。

首先要判断插入元素的长度:

1
2
3
4
5
6
7
8
static uint8_t _intsetValueEncoding(int64_t v) {
if (v < INT32_MIN || v > INT32_MAX)
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX)
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16;
}

以下就是新元素插入的主题函数过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);// 获取新元素的长度
uint32_t pos;
if (success) *success = 1;

/* 因为引发升级的新元素长度总是比intset中现存所有元素的长度都大,因此其要么
* 大于所有元素,要么小于所有元素;
* 因此新元素只需放在底层数组开头或者末尾即可 */
if (valenc > intrev32ifbe(is->encoding)) {
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value); // 升级并插入元素
} else {
/* 如果集合中寻在该元素,则返回
* 如果不存在元素,pos将存着将要被插入的准确位置索引 */
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}

// 多申请一个空间
is = intsetResize(is,intrev32ifbe(is->length)+1);
// 如果没找到相应的pos(即小于value的最大整数所在位置)
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); // 迁移内存,腾出空间给新的数据
}

_intsetSet(is,pos,value); // 在pos插入元素
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

由上可见,在不进行升级的情况,需要先找到对应的pos,即intset中小于value的最大元素,通过二分法查找:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
static int64_t _intsetGet(intset *is, int pos) {
return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}

static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
int64_t v64;
int32_t v32;
int16_t v16;

if (enc == INTSET_ENC_INT64) {
memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
memrev64ifbe(&v64);
return v64;
} else if (enc == INTSET_ENC_INT32) {
memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
memrev32ifbe(&v32);
return v32;
} else {
memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
memrev16ifbe(&v16);
return v16;
}
}

static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;

/* 空的intset,直接返回,pos设为0 */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* 如果value大于intset的最大值,则pos设为intset的长度
* 如果value小于intset的最小值,将pos赋值为0
* 此举为是否移动内存元素的判断提供帮助 */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}

// 二分查找
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}


if (value == cur) {
if (pos) *pos = mid; // 找到对应位置
return 1;
} else {
if (pos) *pos = min; // 找不到
return 0;
}
}

前面的判断语句,在pos小于当前长度的时候,需要将pos后面的元素都往后移动一个位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from;
uint32_t encoding = intrev32ifbe(is->encoding);

if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from;
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t);
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
/* 如果目标区域和源区域有重叠的话,memmove() 能够保证源串在被覆盖之前将重叠 * 区域的字节拷贝到目标区域中,复制后源区域的内容会被更改 */
memmove(dst,src,bytes);
}

至于具体的升级操作,则由intsetUpgradeAndAdd完成,包含了encoding升级和插入元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0; // 简单通过正负判断插在开头还是尾部

/* 设置新的encoding和扩展空间 */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);

/* 从尾部开始对所有数据进行迁移,重新分配空间 */
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

/* 在头部或者尾部设置新的value*/
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

总结

  1. 整数集合是集合键的底层实现之一;
  2. redis能够根据新加元素的类型,改变整个数组的类型;
  3. 整数集合只支持升级,不支持降级操作;

redis设计与实现——跳表

发表于 2019-08-07

redis设计与实现——跳表

skiplist跳跃表是一种有序数据结构,通过在每个节点中维持多个节点中维持多个指向其他节点的指针。节点查找的时间复杂度,平均是O(logN),最坏是O(N),还可以批量操作处理节点。关于skiplist,可以参考论文。

在Redis中,跳跃表是作为有序集合键的底层实现基础。

跳跃表的实现

Redis的跳跃表是在server.h/zskiplistNode和server.h/zskiplist两个结构体中定义的,其中前者表示表节点,后者用来保存节点的数量,头部和尾部节点指针等相关信息。

img
img

跳跃表节点

跳跃表节点的结构体定义为:

1
2
3
4
5
6
7
8
9
10
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
  • level:这是跳跃表节点的level数组,包含了多个元素,每个元素都不包含了一个指向前方其他节点的指针。span则是跨度,如上图所示,箭头上方的就是跨度数值,在查找某个节点的过程中,将沿途访过的所有层的跨度累积起来,就是当前节点所在的排位;
  • backward:后退指针,只有一个后退指针意味着每次只能往前一个节点;
  • score:跳跃表中所有节点按照分数值从小到大排序;
  • ele:指向一个SDS字符对象,ele是唯一的,因此score相同时,需要按照sds在字典序中的大小排序;

跳跃表

跳跃表的结构体定义为:

1
2
3
4
5
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;

redis使用zskiplist来管理所有的节点,比如表头节点和表尾节点,跳跃表长度以及level则表示在O(1)复杂度内获取跳跃表中层高最大的那个节点的层数量。

另外表头节点虽然与其他节点构造一样,但其后退指针、分值和ele等属性都不使用,并且不参与zskiplist中level的计算。

创建跳跃表

创建跳跃表的操作主要是完成一些初始化的操作,其时间复杂度为O(1)。创建操作主要依赖两个函数:zslCreate()和zslCreateNode()。其中redis默认跳跃表的最大层级为64。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* Should be enough for 2^64 elements */
#define ZSKIPLIST_MAXLEVEL 64 // zskiplist的最大层级为64

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//创建一个节点
// 表头指针的后退指针和分值,ele都不作使用
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}

/* Create a skiplist node with the specified number of levels.
* The SDS string 'ele' is referenced by the node after the call. */
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}

插入新节点

新节点的插入是通过zslInsert实现的,给定分数值和ele元素则可返回新的跳跃表节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 记录每一层插入节点的前面一个节点在skiplist中的排名
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

serverAssert(!isnan(score));
x = zsl->header;
// 计算待插入点的位置
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 先根据score比较,相等即根据sds的字符串字典序比较
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;// 每一个层级的待插入位置,即当前层最后一个小于x的点
}
/* 重复插入相同的元素,这种情况是不会发生的
* 如果元素在内部,则zslInsert()的调用者应该在哈希表中进行测试是否在里面 */
level = zslRandomLevel();
// 如果计算出来的层级比当前层级高,则重设超出zsl原来层级的指针
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele); // 创建新的节点
for (i = 0; i < level; i++) {
// 插入到当前位置(update[i]的前面)
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* rank[0] - rank[i] 是x在第i层的前一个节点与x之间的距离*/
/* 更新插入点的跨度 */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
/* 更新插入点前一个节点的跨度 */
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* 自增没有到达的层级的span */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

其中随机层级的实现如下,redis的跳跃表最大层数为64,能够足够支撑优化2^64个元素的查找。其中获取随机层级时,越高的层级数出现的几率越小,而且每往上一个层级,其概率为1/4。

1
2
3
4
5
6
7
8
9
10
11
12
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
// #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
// random()&0xFFFF的结果就是一个0-65535的数
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

总结一下

  • 跳跃表时有序集合的底层实现之一;
  • Redis跳跃表时由在server.h的zskiplistNode和zskiplist实现的;
  • 每个跳跃表节点层高都是1-64的随机数;
  • 多个节点可以包含相同的score,但ele时唯一的;

源码版本

截至2019/07/27,5.0之后的版本,commitId为:505a855000ef8f1fbea9cb41841fa8708175bba4

redis设计与实现——字典

发表于 2019-08-01

redis设计与实现——字典

在字典中,一个key和一个value进行关联,从而组成键值对,但C语言并没有内置了这种数据结构,因为redis自行构建了。字典是哈希键的底层实现之一。

字典的实现

Redis的字典使用了哈希表作为底层实现,每个表内部含有多个哈希表节点。

哈希表节点

首先是键值对的定义,即每个dictEntry结构保存着一个键值对。

1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry {
void *key; // 键
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; // 值
struct dictEntry *next; // 指向下一个哈希表节点,形成链表
} dictEntry;

key-value中的值可以是一个指针,也可能是一个整数或者double。next则是用来以链表的形式解决哈希冲突的问题。

哈希表

1
2
3
4
5
6
7
8
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table; // 哈希表数组
unsigned long size; // 哈希表大小
unsigned long sizemask; // 哈希表掩码,用来计算索引值,等于size-1
unsigned long used; // 已有节点的个数
} dictht;

字典

为了使用方便,redis的字典在上面哈希表再多封装一层。

1
2
3
4
5
6
7
8
9
10
typedef struct dict {
// 类型特定的函数
dictType *type;
// 私有数据
void *privdata;
// 两个哈希表,其中一个用来存储当前使用的,另一个则是用来做rehash
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;

这里的type属性保存了一个指向dictType结构的指针,而每个dictType结构都包含了一组用于操作特定类型键值对的函数。而private属性则保存了需要传递给那些类型特定函数的可选参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct dictType {
// 计算哈希值的函数
uint64_t (*hashFunction)(const void *key);
// 复制key的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制value的函数
void *(*valDup)(void *privdata, const void *obj);
// 对比key的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁key的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁value的函数
void (*valDestructor)(void *privdata, void *obj);
} dictType;

至此,Redis的封装层级就是dict->dictht->dictEntry

哈希算法

当需要添加一个新的键值对到dict里面,Redis会根据key计算出哈希值和索引值,再把包含键值对的哈希节点放到哈希表数组的指定索引上。

先是调用dictAdd

1
2
3
4
5
6
7
8
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key,NULL);

if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}

其中dictAddRaw会增加一个dictEntry,但不会设置value值,而是由用户自行决定如何设置。同时这个API也直接暴露给用户,用户可以自行调用,比如设置非指针值。

1
2
entry = dictAddRaw(dict,mykey,NULL);
if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;

// 判断是否需要rehashing
if (dictIsRehashing(d)) _dictRehashStep(d);

// 获取新元素index,如果已存在则返回NULL
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;

// 如果需要rehasing,则插入到ht[1]中
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
// 分配内存
entry = zmalloc(sizeof(*entry));
// 插入到table的头部,这样就可以以O(1)的时间解决哈希冲突
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key); // 设置key
return entry;
}

这里的关键是获取index,这样才能新增一个entry,并设置对应的key。准确来说,是先获取对应的hash值,再利用该hash值计算索引index。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 使用dict设置的哈希函数,计算key的哈希值
#define dictHashKey(d, key) (d)->type->hashFunction(key)

// 如果key已经存在,则返回-1
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL; // existing用来获取当前的entry

// 在需要时扩展整个dict
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
// 在两个hash表中查找是否存在相同key
for (table = 0; table <= 1; table++) {
// 使用哈希和sizemark掩码计算index
idx = hash & d->ht[table].sizemask;
// 遍历idx对应的table slot,判断该table不含有相同的key
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break; // 如果不是正在rehash,则直接break,不去ht[1]中寻找
}
return idx;
}

计算index的过程,首先是判断是否需要扩展dict,然后遍历两个哈希表,在dictEnrty数组中遍历,确保不含有相同的key。

解决键冲突

假设Redis计算得出k1和k2的索引值相同,则这就是发生了冲突。Redis使用链地址法解决冲突。每个哈希表节点都有一个next指针,在发生冲突时就使用next指针将k1和k2所在节点连接起来。同时,由于dictEntry节点组成链表没有指向尾部的指针,为了速度考虑,直接将新节点插入到头部。如上代码所示。

Rehash

首先来看扩展:正在rehashing的直接返回;第一次增加键值对的,直接扩展哈希表的大小到4;接下来是判断在什么情况下,redis会对哈希表进行扩展:

  • 服务器没有执行BGSAVE或者BGREWRITEAOF命令时,哈希表的负载因子大于或等于1;
  • 服务器正在执行BGSAVE或者BGREWRITEAOF命令时,哈希表的负载因子大于5;

之所以这样设计,是因为在子进程存在期间,操作系统是采用写时复制的技术,此时如果进行哈希扩展,就可能产生大量内存写入操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 至于扩展dict也比较直接
static int _dictExpandIfNeeded(dict *d)
{
// 正在rehashing的,直接返回
if (dictIsRehashing(d)) return DICT_OK;

// 第一次新增,则扩展到4;#define DICT_HT_INITIAL_SIZE 4
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

// static int dict_can_resize = 1;
// static unsigned int dict_force_resize_ratio = 5;

/*
在server.c里,会根据服务器是否存在有aof或者rdb的子进程,即是否在执行BGSAVE或者BGREWRITEAOF命令
void updateDictResizePolicy(void) {
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
dictEnableResize();
else
dictDisableResize();
}
*/
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}

至于收缩,当哈希表的负载因子小于0.1时,Redis会进行收缩操作。这一个操作是在server.c里进行的。

1
2
3
4
5
6
7
8
9
10
#define HASHTABLE_MIN_FILL 10    /* Minimal hash table fill 10% */

int htNeedsResize(dict *dict) {
long long size, used;

size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}

渐进式rehash

由前面的结构体可以看到dict中有ht[0]和ht[1],这样的设计可以保证在扩展或者收缩哈希表的时候可以将ht[0]里面的所有键值对rehash到ht[1]。而这一个步骤是分多次、渐进式地完成的,避免过大的计算量导致服务器在一段时间内停止服务。

在_dictExpandIfNeeded中,当满足扩展条件时会调用dictExpand。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;

dictht n; /* the new hash table */

/*
static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE; // 4

if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
*/
unsigned long realsize = _dictNextPower(size); // 获取哈希表容量,4,8,16,32,直到比size大

/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;

/* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*)); // 分配新的表
n.used = 0;

/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}

/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n;
d->rehashidx = 0; // 更新rehashidx变量,后续有用
return DICT_OK;
}

在分配完dict之后,就可以进行rehash的操作了。在redis中,有两种rehash方式。

  • dictRehashMilliseconds:按照ms计时的rehash操作,是databasesCron中针对redis的DB进行rehash。serverCron后台定时任务会每次调用databasesCron()进而调用incrementallyRehash,每隔一段时间就会执行。
1
2
3
4
5
6
7
8
9
10
11
/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;

while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}

在server.c里会调用这个函数,使用了CPU时间的1ms。该函数的作用,是对正在rehash的字典,每次执行1毫秒,每次循环100次的哈希表数据迁移。

1
2
3
4
5
6
7
8
9
10
11
12
13
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
  • _dictRehashStep:这是一个单步的rehasing,在执行对dict的增删改查中都会被调用
1
2
3
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}

比如前面的添加键值对,或者查找操作。但相对其它的操作会在两个哈希表中进行,字典的添加只会在ht[1]中直接新增

1
2
3
4
5
6
7
8
9
10
11
12
13
 dictEntry *dictAddRaw(dict *d, void *key)   // 新增一个entry
{
if (dictIsRehashing(d)) _dictRehashStep(d); // 如果在rehash执行一步rehash
// 正在rehashing的话,直接在ht[1]中添加
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
}

dictEntry *dictFind(dict *d, const void *key)
{
// ...
if (dictIsRehashing(d)) _dictRehashStep(d);
// ...
}

上面两个函数都调用了通用的rehash函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* 最多读取n*10个空节点,避免rehash阻塞太久*/
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* 确保rehashidx不会溢出 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
/* 遍历找到非空节点 */
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* 将当前节点的所有key-value对转存到ht[1]中 */
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL; // 释放该节点
d->rehashidx++;
}

/* 检查是否已经完全rehash */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

因为哈希表可能存在大量的空节点,redis的做法是每次遍历10n个节点,如果还没找到非空节点即返回,这里的n是step数。这样就可以避免rehash的时候,阻塞太久。

另外,结合incrementallyRehash该函数来看,考虑到渐进式rehash在服务器比较空闲的时候将会长时间存在使用两个哈希表的时候。因此在redis的周期函数中,会花费1ms来辅助rehash。具体可以参考server.c/databasesCron():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void databasesCron(void) {
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
// ....

// 这一步有可能产生缩容
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
/* 前提是配置了activerehashing,允许服务器在周期函数中辅助进行渐进式rehash,默认值是1,server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING; */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* 如果已经进行了定时rehash,则停止循环,等待下一轮cron */
break;
} else {
/* 否则,会移动到下一个redis db进行 */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}

总结rehash的详细步骤:

  1. 为 ht[1] 分配空间, 让字典同时持有 ht[0] 和 ht[1] 两个哈希表;
  2. 在字典中维持一个索引计数器变量 rehashidx , 并将它的值设置为 0 , 表示 rehash 工作正式开始;
  3. 在 rehash 进行期间,每次对字典执行添加、删除、查找或者更新操作时,程序除了执行指定的操作以外,还会顺带将 ht[0] 哈希表在 rehashidx 索引上的所有键值对 rehash 到 ht[1] ,当 rehash 工作完成之后,程序将 rehashidx 属性的值增一。另外,每一步都会遍历至多十个节点,以找到非空节点;
  4. 随着字典操作的不断执行,最终在某个时间点上,ht[0] 的所有键值对都会被 rehash 至 ht[1],这时程序将 rehashidx 属性的值设为 -1 ,表示 rehash 操作已完成。

通过将rehash键值对的计算工作分摊到增删改查的操作中,避免了对服务器性能造成影响。

关于rehash,推荐一篇文章,主要讲的是线上遇到的在rehash期间,同时有两个hash表在使用,会使得redis内存使用量瞬间突增的问题:美团针对Redis Rehash机制的探索和实践

redis设计与实现——链表

发表于 2019-07-26

redis设计与实现——链表

链表是数据结构中一种很常见的实现类型,redis也不例外,其主要实现在adlist.h和adlist.c里。

链表与链表节点的实现

如下所示,每个链表节点都用listNode结构来表示,并且用list来持有整个链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct listNode {
struct listNode *prev; // 前置节点
struct listNode *next; // 后置节点
void *value; /节点值
} listNode;

typedef struct list {
listNode *head; // 表头节点
listNode *tail; // 表尾节点
void *(*dup)(void *ptr); // 复制链表节点所包含的值
void (*free)(void *ptr); // 释放链表节点所包含的值
int (*match)(void *ptr, void *key); // 对比链表节点所保存的值与另一个输入是否相等
unsigned long len; // 链表长度
} list;

由此可见,redis的链表是一个双端的(具备prev和next指针)、无环的(表头节点的prev和表尾节点的next都指向NULL)、带链表长度计数器(len属性)、多态(使用void*指针来保存节点值)。

链表和链表节点的API

函数 作用
listEmpty 移除链表所有元素,但不销毁list本身
listRelease 销毁整个链表
listAddNodeHead 添加一个节点到链表头
listAddNodeTail 添加一个节点到链表尾
listInsertNode 可选地将新节点插入到指定节点的前面或者后面
listDelNode 删除一个节点
listGetIterator 返回链表头部或者尾部的迭代器
listReleaseIterator 销毁迭代器
listNext 返回迭代器的下一个链表节点
listDup 拷贝整个链表
listSearchKey 在列表中搜索与给定键匹配的节点。(需要实现定义match函数)
listIndex 返回链表中特定索引的节点,可以使用负数
listRotate 反转链表——将尾部节点插入到头部
listJoin 合并两个链表,并把其中一个置空

redis设计与实现——字符串

发表于 2019-07-21

简单动态字符串

Redis没有使用C语言传统的以空字符结尾的字符串表示,而是自己构造了一种名为简单动态字符串(simple dynamic string,SDS)的抽象类型。

除了在诸如打印日志等无需对字符串进行修改的地方使用C字符串之外,其它场合一般使用SDS。其优点:

  • 使用起来更加简单;
  • 二进制安全;
  • 计算效率高;
  • 兼容普通的C字符串函数;

SDS的定义

在sds.h/sdshdr的结构中有这么一个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags byte directly.
* However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

这里flags的低三位表示类型(是sdshdr8还是16),高五位没被使用;len记录当前字节数组长度,alloc记录当前字节数组分配的内存大小,都不包含'\0';buf保存真实字符串的值,以及结尾的'\0'。

这里使用了attribute ((packed)),它的作用是编译器取消结构在编译过程中的优化对齐,按照实际占用字节数进行对齐。我们可以打印出字节长度分别为:

1
2
3
4
sizeof(struct sdshdr8)  // 3->3, result of llvm
sizeof(struct sdshdr16) // 6->5
sizeof(struct sdshdr32) // 12->9
sizeof(struct sdshdr64) // 24->17

SDS与C字符串的区别

常数复杂度获取字符串长度

这一点得益于结构体重的len字段,与C字符串不同,redis获取字符串长度的复杂度从O(N)降到O(1)。

杜绝缓冲区的溢出

C库中有一个<string.h>/strcat函数可以将两个字符串进行拼接,但C库中这个函数是假设使用者为目的字符串分配了足够多的内存,否则会产生缓冲区溢出。

与C字符串不同,SDS使用了另外的拼接函数:sdscatlen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the
* end of the specified sds string 's'.
*
* After the call, the passed sds string is no longer valid and all the
* references must be substituted with the new pointer returned by the call. */
sds sdscatlen(sds s, const void *t, size_t len) {
size_t curlen = sdslen(s);

s = sdsMakeRoomFor(s,len);
if (s == NULL) return NULL;
memcpy(s+curlen, t, len);
sdssetlen(s, curlen+len);
s[curlen+len] = '\0';
return s;
}

这里的关键是sdsMakeRoomFor函数,其它部分只是做一些内存拷贝和长度的重新设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
sds sdsMakeRoomFor(sds s, size_t addlen) {
void *sh, *newsh;
// sdsavail: 获取可用长度,这里的s是指向buf的,通过buf进行寻址
// 获取头部(结构体)指针:#define SDS_HDR_VAR(T,s) struct sdshdr##T *sh = (void*)((s)-(sizeof(struct sdshdr##T)));
// sdsavail的计算方式:sh->alloc - sh->len;
size_t avail = sdsavail(s);
size_t len, newlen;
char type, oldtype = s[-1] & SDS_TYPE_MASK; // s-1就是flags
int hdrlen;

/* 有足够的空间就可以直接返回 */
if (avail >= addlen) return s;

len = sdslen(s); // O(1)获取字符串长度
sh = (char*)s-sdsHdrSize(oldtype);// 获取头部(结构体)指针
newlen = (len+addlen);// 新的字符串使用长度
// 新字符串小于1M时,预分配两倍空间
// 新字符串大于1M时,预分配多1M的空间
if (newlen < SDS_MAX_PREALLOC) // SDS_MAX_PREALLOC (1024*1024)
newlen *= 2;
else
newlen += SDS_MAX_PREALLOC;

type = sdsReqType(newlen); // 重新计算字符串类型

/* Don't use type 5: the user is appending to the string and type 5 is
* not able to remember empty space, so sdsMakeRoomFor() must be called
* at every appending operation. */
// SDS_TYPE_5直接按SDS_TYPE_8来计算
if (type == SDS_TYPE_5) type = SDS_TYPE_8;

hdrlen = sdsHdrSize(type);// 新类型的长度

if (oldtype==type) {
// #define s_realloc realloc
// 如果类型没变化,直接在原sds上重新分配内存
newsh = s_realloc(sh, hdrlen+newlen+1);
if (newsh == NULL) return NULL;
s = (char*)newsh+hdrlen;
} else {
/* Since the header size changes, need to move the string forward,
* and can't use realloc */
// 如果类型发生了变化,则重新malloc分配空间
newsh = s_malloc(hdrlen+newlen+1);
if (newsh == NULL) return NULL;
memcpy((char*)newsh+hdrlen, s, len+1);// 将原字符串内容拷贝到新开辟的内存中
s_free(sh); // 释放原sds内存
// 设置flag,len和alloc等字段
s = (char*)newsh+hdrlen;
s[-1] = type;
sdssetlen(s, len);
}
sdssetalloc(s, newlen);
return s;
}

这里的关键策略是针对不同长度的字符串做不同的分配策略:

  • 新字符串小于1M时,预分配两倍空间
  • 新字符串大于1M时,预分配多1M的空间

另外就是不使用SDS5,将其当作SDS8来使用。

由于redis可能出现频繁修改字符串的场景,这种预分配的策略可以使得SDS将连续增长N次字符串所需要的内存重分配次数从必定N次降低为最多N次。

减少修改字符串时带来的内存重分配次数

字符串的创建

直接看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
sds sdsnewlen(const void *init, size_t initlen) {
void *sh;
sds s;
char type = sdsReqType(initlen);
/* Empty strings are usually created in order to append. Use type 8
* since type 5 is not good at this. */
if (type == SDS_TYPE_5 && initlen == 0) type = SDS_TYPE_8;
int hdrlen = sdsHdrSize(type);
unsigned char *fp; /* flags pointer. */

// +1是因为字符串都以'\0'结尾,但其又是二进制安全的,即字符串中间可以出现字符'\0',因为sds有长度属性
sh = s_malloc(hdrlen+initlen+1);
if (init==SDS_NOINIT) // const char *SDS_NOINIT = "SDS_NOINIT";
init = NULL;
else if (!init)
memset(sh, 0, hdrlen+initlen+1);
if (sh == NULL) return NULL; // 如果sh是NULL,直接返回NULL
s = (char*)sh+hdrlen;
fp = ((unsigned char*)s)-1; // fp就是flag指针
switch(type) {
case SDS_TYPE_5: {
// #define SDS_TYPE_BITS 3
// flag的前五位保存长度,后三位是类型type,因此结构体中sdshdr5不含有len
*fp = type | (initlen << SDS_TYPE_BITS);
break;
}
case SDS_TYPE_8: {
SDS_HDR_VAR(8,s);
sh->len = initlen;
sh->alloc = initlen;
*fp = type;
break;
}
case SDS_TYPE_16: {
SDS_HDR_VAR(16,s);
sh->len = initlen;
sh->alloc = initlen;
*fp = type;
break;
}
case SDS_TYPE_32: {
SDS_HDR_VAR(32,s);
sh->len = initlen;
sh->alloc = initlen;
*fp = type;
break;
}
case SDS_TYPE_64: {
SDS_HDR_VAR(64,s);
sh->len = initlen;
sh->alloc = initlen;
*fp = type;
break;
}
}
if (initlen && init)
memcpy(s, init, initlen);
s[initlen] = '\0'; // 以'\0'结尾
return s;
}

这里有两个特殊的地方:一是sds都以'\0'结尾;二是sdshdr5用flag这个字段可以同时保存type和len。

惰性空间的释放

惰性空间的释放主要体现在SDS字符串的缩短操作,redis中的sdstrim提供了这样的一个操作:sdstrim 函数接受一个 SDS 和一个 C 字符串作为参数,从 SDS 左右两端分别移除所有在 C 字符串中出现过的字符(《redis设计与实现》一书有误)。源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sds sdstrim(sds s, const char *cset) {
char *start, *end, *sp, *ep;
size_t len;

sp = start = s;
ep = end = s+sdslen(s)-1;
// 分别从头尾开始便利,即移除掉cset中的字符
while(sp <= end && strchr(cset, *sp)) sp++;
while(ep > sp && strchr(cset, *ep)) ep--;
len = (sp > ep) ? 0 : ((ep-sp)+1);
if (s != sp) memmove(s, sp, len); //内存拷贝,即将中间段的字符串拷贝到头部指针
s[len] = '\0';
sdssetlen(s,len);
return s;
}
// s="AA...AA.a.aa.aHelloAWorld :::" ->sdstrim(s,"Aa. :")-> "HelloAWorld"

由此,可以看到sdstrim并没有释放原空间,即alloc不变,变的是len。这样后续再需要扩展的时候,len后的空间能够再被利用。

事实上,redis的确提供了释放空间的函数:

1
2
3
4
5
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
if (s == NULL) return;
s_free((char*)s-sdsHdrSize(s[-1])); // #define s_free free
}

二进制的安全

这里的关键是SDS是使用len属性的值而不是空字符来判断字符串是否结束,这种二进制安全的做法使得redis不仅可以保存文本数据,还可以保存任意格式的二进制数据。

兼容部分C字符串的函数

这是基于SDS遵循了C字符串以空字符串结尾的惯例,这些API都会将SDS保存的又用数据的末尾保存位空字符。

ZooKeeper: Wait-free coordination for Internet-scale systems——MIT6.824

发表于 2019-06-22

ZooKeeper

《ZooKeeper: Wait-free coordination for Internet-scale systems》

Abstract

ZooKeeper是一种用于协调分布式应用程序进程的服务,旨在提供一个简单而高性能的内核,用于在客户端中构建更复杂的进程协调原语。

ZooKeeper接口支持高性能服务实现。除了属性wait-free之外,ZooKeeper还为每个客户端提供FIFO请求执行保证,并为所有更改ZooKeeper状态的请求提供线性化。

Introduction

大规模的分布式应用需要多种不同形式的协调,配置就是其中最基本的配置形式之一。配置只是系统过程的操作参数列表,而更复杂的系统具有动态配置参数。

在设计ZooKeeper的API时,我们并不会使用阻塞原语。如果处理请求取决于响应和其他客户端的故障检测,则服务本身的实现变得更加复杂。因此,Zookeeper实现了一个API,它可以处理像文件系统一样分层组织的简单无等待数据对象。

ZooKeeper服务包含一组服务器,这些服务器使用复制来实现高可用和高性能。其高性能使包含大量进程的应用程序能够使用此类协调内核来管理协调的所有方面。我们能够使用简单的流水线架构来实现ZooKeeper,这使我们可以获得数千个未完成的请求,同时仍然保持低延迟。

为了保证更新操作满足线性化,系统实现了一种基于领导的原子广播协议,称为Zab。在客户端缓存数据是提高读取性能的重要技术,ZooKeeper使用监视机制使客户端能够缓存数据,而无需直接管理客户端缓存。

本文的主要贡献是:协调内核。其提出了一种无等待协调服务,具有普通的的一致性保证,可用于分布式系统。

The ZooKeeper service

ZooKeeper客户端库通过客户端API向ZooKeeper提交请求,在本节中,我们首先提供ZooKeeper服务的高级视图。 然后讨论客户端用于与ZooKeeper交互的API。

Service overview

ZooKeeper为其客户端提供了一组数据节点(znode)的抽象,这些节点根据分层名称空间进行组织,而这些层次中的znode是客户端通过ZooKeeper API操作的数据对象。分层名称空间通常用于文件系统。 它是组织数据对象的理想方式。

img
img

客户端能创建两种ZooKeeper节点:持久节点和临时节点。

在创建新的znode时,客户端可以设置顺序标志。使用顺序标志设置所创建的节点具有一个单调递增计数器值。如果n是新的znode而p是父znode,则n的序列值永远不会小于在p下创建的任何其他顺序znode的名称中的值。

ZooKeeper实现了watches,允许客户在不需要轮询的情况下及时收到变更通知。当客户端发出设置了监视标志的读取操作时,操作将正常完成,除非在返回的信息发生更改时服务器通知了客户端。watches是与会话相关的一次性触发器:一旦触发或会话结束,它们就会被注销。

例如,如果客户端在"/foo"更改两次之前发出getData("/foo",true),则客户端将获得一个监视事件,告知客户端"/foo"的数据已更改。

Data model

ZooKeeper的数据模型本质上是一个文件系统,它具有简单的API,完整的数据读写和带有分层key的键值表。与文件系统中的文件不同,znode不是为通用数据存储而设计的。相反,znodes是映射到客户端应用程序的抽象,通常对应于用于协调目的的元数据。以上图为例,我们有两个子树,一个用于应用程序1(/app1),另一个用于应用程序2(/app2)。应用程序1的子树实现了一个简单的组成员协议:每个客户端进程\(p_i\)在/app1下创建一个znode \(p_i\),只要进程正在运行,它就会持续存在。

尽管znode尚未设计用于通用数据存储,但ZooKeeper确实允许客户端存储一些可用于元数据或分布式计算中所配置的信息。

Sessions

客户端连接到ZooKeeper之后会启动一个session,session具有一个超时机制,如果客户端在其session中没有收到该超时机制的相关内容,ZooKeeper会认为客户端有故障。当客户端显式关闭session handler或ZooKeeper检测到客户端出现故障时,session结束。

Client API

create(path, data, flags):创建一个相关路径的znode;

delete(path, version):删除一个相关版本的节点;

exists(path, watch):判断相关路径的znode是否存在,watch标记强制客户端设置监视;

getData(path, watch):返回数据和元数据(例如版本信息);

setData(path, data, version):写入数据data[];

getChildren(path, watch):返回一系列子节点;

sync(path):使得client当前连接着的ZooKeeper服务器,和ZooKeeper的Leader节点同步(sync)一下数据。

所有方法都具有同步和异步版本。每种更新方法都采用预期的版本号,这样可以实现条件更新。如果znode的实际版本号与预期版本号不匹配,则更新将失败并显示版本错误。如果版本号为-1,则不执行版本检查。

ZooKeeper guarantees

ZooKeeper有两个基本的顺序保证:

  • Linearizable writes:更新ZooKeeper状态的所有请求都是可序列化的,并且与优先级有关;
  • FIFO client order:来自给定客户端的所有请求都按客户端发送的顺序执行。

我们这里所说的线性化是异步线性化,允许客户端有多个未完成的操作,因此我们可以确保同一个客户端的未完成操作的特定顺序或者确保其FIFO的顺序。

要知道这两个顺序保证如何相互影响,我们考虑以下的方案:多个进程的系统选择leader来命令工作进程的过程,此时新的leader修改更改大量的配置参数,并在完成后通知其它进程。这种场景有两个要求:

  • 当新leader开始进行更改时,我们不希望其他进程开始使用正在更改的配置;
  • 如果新配置文件在配置完全更新之前消失,我们不希望进程使用此部分配置;

分布式锁对于第一个要求有帮助,但无法解决第二个要求的问题。对于第二个要求,在使用ZooKeeper时,新leader可以讲路径指定为reader znode,其它进程仅在该znode存在时才可以使用该配置。新的leader通过删除ready,更改各种配置znode并创建ready来进行配置更改。所有的更改都以pipelined的方式异步发出。由于顺序保证,如果进程看到就绪的znode,它还必须看到新leader的所有配置更改。如果新的leader在创建就绪znode之前死亡,则其他进程知道配置尚未最终确定会不去使用它。

上述方案仍然存在一个问题:如果进程在新leader开始进行update之前看到ready,然后在update正在进行时开始读取配置,会发生什么。此问题通过通知的排序保证得以解决,如果读取ready znode的进程请求通知该znode的更改,它将在它可以读取任何新配置之前看到通知客户端更改的notifications。

ZooKeeper还提供了类似flush原语属性, sync使服务器在处理读取之前应用所有挂起的写入请求,而不会产生完全写入的开销,保证客户端在在重新读取配置之前发出写入来看到最新的信息。

ZooKeeper还具有以下两种活动性和持久性保证:如果大多数ZooKeeper服务器处于活动状态并且可以进行通信,则可以使用该服务;如果ZooKeeper服务成功响应变更请求,只要规定数量的服务器最终能够恢复,该变更就会在任何数量的故障中持续存在。

Examples of primitives

本章主要讲述了如何使用ZooKeeper API实现更加强大的原语。

Configuration Management

ZooKeeper可用于在分布式应用程序中实现动态配置。一般,配置存储在znode \(z_c\)中,进程以\(z_c\)的完整路径名启动。启动的进程通过读取\(z_c\)来获取其配置,并设置watch标记为true。如果配置更新了,则会通知进程去读取新配置,并再次设置watch标记为true。

Rendezvous

在分布式系统中,最终的系统配置并不总是有足够清晰的先验情景。例如,客户端可能希望启动主进程和多个工作进程,但启动进程由调度器完成,因此客户端不会提前知道提供给worker的地址和端口等可以连接到主服务器的进程信息。我们可以使用endezvous znode \(z_r\)处理这种情况。它是客户端创建的节点,客户端传递该节点的完整路径名作为主进程和工作进程的启动参数。这样,master启动时,就会在\(z_r\)中填充信息,而worker则可以读取其中的信息。

Group Membership

我们可以使用临时节点来实现组成员资格,具体来说,就是使用临时节点能够查看创建节点的会话状态。首先指定一个znode \(z_g\)来表示该组,当该组的成员启动时,会在\(z_g\)下创建一个临时的子znode。因此只需要通过列举\(z_g\)的后代,进程就可以获取改组信息。如果进程想要监视组成员身份的更改,则进程可以将监视标志设置为true,并在收到更改通知时刷新组信息

Simple Locks

ZooKeeper不是一个带锁的服务,使用ZooKeeper的应用通常使用同步原语来满足其需求。这里我们展示如何使用ZooKeeper实现锁,这样可以实现各种同步原语。

最简单的锁使用lock files,锁由znode表示,为了获取锁,客户端尝试着使用EPHEMERAL标记去创建指定的znode。如果创建成功,客户端则拥有锁。否则,客户端可以读取znode,并设置监视标志,以便在当前leader挂掉时收到通知。客户端在死亡或显式删除znode时释放锁,等待锁定的其他客户端在观察到被删除的znode后再次尝试获取锁定。

这种锁定协议存在两个问题:一是受到羊群效应(Herd Effect)的影响;二则是只实现了独占锁定。

Simple Locks without Herd Effect:我们定义了znode \(z_l\)来实现这样的锁,我们对所有请求锁的客户端进行排序,并且每个客户端按请求到达的顺序获得锁定。因此,希望获得锁定的客户执行以下操作:

1
2
3
4
5
6
7
8
9
Lock
1 n = create(l + “/lock-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for watch event
6 goto 2
Unlock
1 delete(n)

客户端创建节点,序号最小的获取锁。客户端只监控比自己小的那个节点。最小节点完成任务,发出通知,并释放。客户端获取通知后,获取所有节点,如果自己的序号最小,则获取锁,如果不是,监控比自己小的那个节点,依此类推。其它进程都只watch比它顺序小的进程对应的结点。

释放锁就像删除表示锁请求的znode n一样简单。通过在创建时使用EPHEMERAL标志,崩溃的进程将自动清除任何锁请求或释放它们可能具有的任何锁。

Read/Write Locks:为了实现读/写锁,我们稍微更改了锁过程,并具有单独的读锁定和写锁定过程。 解锁程序与全局锁情况相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Write Lock
1 n = create(l + “/write-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 2
Read Lock
1 n = create(l + “/read-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if no write znodes lower than n in C, exit
4 p = write znode in C ordered just before n
5 if exists(p, true) wait for event
6 goto 3

每个进程都在ZooKeeper上创建一个临时的顺序结点,最小的一个或多个结点为当前的持锁者,多个是因为多个读操作可以并发。需要写锁的进程,监视比它顺序小的进程;对于需要读锁的进程,监视比它小的最后一个写进程对应的结点。当前结点释放锁后,所有Watch该结点的进程都会被通知到,他们成为新的持锁者。

Double Barrier:Double Barrier可以用来同步一个任务的开始和结束,当有足够多的进程进入barrier之后,才开始执行任务。当所有的进程都执行完各自的任务后,屏障才撤销。而ZooKeeper的实现过程:

我们用一个znode b代表barrier。进入barrier,客户端监视ready节点,通过判断该结点是否存在来决定是否启动任务。每个进程在进入时会创建一个znode作为b的子节点,并在它准备离开时取消该节点。当b的子znode的数量超过barrier阈值时,进程可以进入屏障,客户端收到ready节点创建的通知。当所有进程都移除了其子节点时,就可以认为任务结束,离开barrier。

ZooKeeper Implementation

ZooKeeper通过在组成服务的每个服务器备份ZooKeeper数据来提供高可用性。下图展示了其高级组建,收到写请求,服务器会通过请求处理器做执行准备,然后使用相关原子广播的实现协议,最后再提交对ZooKeeper数据库的修改,完全复制到整体的所有服务器。在读请求的情况下,服务器只读取本地数据库的状态并生成对请求的响应。每个ZooKeeper服务器都为客户端服务。客户端只连接一台服务器来提交请求。

img
img

备份数据库是包含了整个数据树的内存数据库,默认情况下,树中的每个znode最多存储1MB的数据。对于可恢复性,我们有效地将更新日志记录到磁盘,并且在将应用程序应用于内存数据库之前强制写入磁盘介质。

Request Processor

与客户端发送的请求不同,transactions是幂等的。 当leader收到写入请求时,它会计算应用写入时系统的状态,并将其转换为捕获新状态的事务。因为可能存在尚未应用于数据库的事务,所以对于未来的状态,我们必须对其进行计算。例如,客户端执行setData(),如果该请求中的版本号与正在更新的znode的未来版本号相匹配,那么该服务生成一个setDataTXN,包含新数据,新版本号和更新时间戳。如果发生错误,例如版本号不匹配或要更新的znode不存在,则生成errorTXN。

Atomic Broadcast

更新ZooKeeper状态的所有请求都会被转发到leader,再由leader执行请求并通过原子广播协议Zab广播到各个服务器。接收客户端请求的服务器会在转发相应的状态改变时响应客户端。而Zab则是使用默认的多数仲裁来commit,使用2f+1服务器时,我们可以容忍f故障。

另外,Zab提供了比常规原子广播更强的顺序保证,Zab保证leader的广播变更按照发送的顺序进行,并且前leader的所有变更都会在广播自己的变更之前传递给已建立的领导者。

我们使用TCP进行传输,因此网络可以保留消息顺序,这样我们就可以简化实现。

在正常操作期间,Zab按顺序提供所有消息,但是由于Zab不会持续记录每条消息的ID,因此Zab可能会在恢复期间重新发送消息。但因为ZooKeeper是幂等交易,所以只要按顺序交付,就可以接受多次交易。实际上,ZooKeeper要求Zab至少重新传递在上一个快照开始后传递的所有消息。

Replicated Database

每个副本都有一个ZooKeeper状态的内存副本,当ZooKeeper服务器从崩溃中恢复时,其需要恢复到此状态。因此ZooKeeper会使用定期快照,仅需要从快照开始后重新传递消息即可恢复。ZooKeeper快照为模糊快照,因为没有锁定ZooKeeper状态来拍摄快照;而是对树进行深度优先扫描,原子地读取每个znode的数据和元数据并将它们写入磁盘。由于生成模糊快照的过程中可能存在额外的状态更改,但因为状态更改是幂等的,只要我们按顺序应用状态更改,就不会影响最终的结果。

Client-Server Interactions

当服务器处理写入请求时,它还会发送并清除与该更新相关的任何监视通知。

读请求在每个服务器本地处理。每个读请求都使用zxid进行处理和标记,该zxid与服务器看到的最后一个事务相对应,并定义了与写请求相关的部分读请求顺序。通过在本地处理读取,我们获得了出色的读取性能。但其也存在缺点——不保证读取操作的优先顺序,即可能返回过时值。关于这个,ZooKeeper提供了Sync原语,确保follower和leader是同步的。在读取操作后,客户端调用Sync,使得同步请求添加到一个leader与该服务之间队列末尾,待leader提交了所有决议,再返回响应。

ZooKeeper服务器按FIFO顺序处理来自客户端的请求。响应包括zxid。如果客户端连接到新服务器,则该新服务器通过检查客户端的最后一个zxid与其最后一个zxid,如果客户端具有比服务器更新的视图,则服务器不会重新建立与客户端的会话,直到服务器已经赶上了其zxid。

为了检测客户端会话失败,ZooKeeper使用超时机制。如果没有其他服务器在会话超时内从客户端会话中收到任何内容,则leader确定其中存在故障。如果客户端无法对服务器发送请求或者心跳信息(低活动期)则它将连接到其他ZooKeeper服务器以重新建立其会话。为了防止session超时,ZooKeeper客户端在session空闲了s/3 ms后发送心跳,如果没有再2s /3 ms从服务器收到响应,则切换到新服务器。其中s是会话超时时间。

Variadic templates in C++

发表于 2019-06-02

Variadic templates

Reference from : https://eli.thegreenplace.net/2014/variadic-templates-in-c/

在C++11之前,编写一个具有任意数量参数的参数的唯一方法就是使用variadic函数,如printf,scanf之流就是这样实现的,使用了省略语法…和相关的va_ 宏定义。由于所有的类型解析都在运行时,并且必须要在va_arg中显式地使用强制转换,这些低级的内存操作,很容易带来代码的段错误。

而同样的,在C++11之前,模板的编写必须要声明固定数量的参数,无法表达具有可变数量参数的类或者函数模板。

Basic example

C++11的一个新特性就是可变参数模板,这个新特性使得我们以类型安全的方式编写接收任意数量参数的函数,并在编译时解析所有参数处理逻辑,而不是运行时。

1
2
3
4
5
6
7
8
9
10
11
template<typename T>
T adder(T v) {
return v;
}

template<typename T, typename... Args>
T adder(T first, Args... args) {
return first + adder(args...);
}

// Usage: long sum = adder(1, 2, 3, 8, 7);

上面的adder函数可以接受任意数量的参数,并且只要+运算符能够应用于这些参数,就可以正常编译,其中的检查是编译器完成的,遵循的是模板语法和重载规则。

typename... Args是模板参数包,而Args... args是函数参数包,而该模板的编写方式与编写递归代码一样,需要一个基本的、接受一个参数的adder函数。每次调用函数的时候,都会从模板参数包中剥离一个类型T,缩短一个参数,直到遇到第一个函数模板。调用过程如下:

1
2
3
4
5
T adder(T, Args...) [T = int, Args = <int, int, int, int>]
T adder(T, Args...) [T = int, Args = <int, int, int>]
T adder(T, Args...) [T = int, Args = <int, int>]
T adder(T, Args...) [T = int, Args = <int>]
T adder(T) [T = int]

Some simple variations

C++模版元编程中有一个模式匹配的概念,以下面的代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
template<typename T>
bool pair_comparer(T a, T b) {
// In real-world code, we wouldn't compare floating point values like
// this. It would make sense to specialize this function for floating
// point types to use approximate comparison.
return a == b;
}

template<typename T, typename... Args>
bool pair_comparer(T a, T b, Args... args) {
return a == b && pair_comparer(args...);
}

上面的函数接受任意数量的参数,如果参数成对相等,那么最终返回true,如下:

1
pair_comparer(1.5, 1.5, 2, 2, 6, 6)

但如果我们将第一个参数改成1,那么将会编译报错。同理,如果参数个数不为偶数,编译也不会通过。

1
2
pair_comparer(1.5, 1.5, 2, 2, 6, 6, 7);
pair_comparer(1, 1.5, 2, 2, 6, 6);

如果想避免这个问题,我们可以添加一个单参数的模版函数,这样就可以避免奇数参数编译不通过:

1
2
3
4
template<typename T>
bool pair_comparer(T a) {
return false;
}

Performance

关于性能的考虑,可变参数模版并没有涉及真正的递归,而是在编译时预生成一系列函数调用,而且由于现代编译器会对代码进行内联优化,很可能最后编译的机器代码中并没有函数调用。与C风格的可变参数函数相比,va_宏实际上是在操纵运行时堆栈,在运行时解析C语言的可变参数。

Varidic data structures

这个案例就比较复杂了,在C++11之前要实现具有动态添加新字段的自定义数据结构,是比较困难的。以下面的代码为例,我们进行类型定义:

1
2
3
4
5
6
7
8
template <class... Ts> struct tuple {};

template <class T, class... Ts>
struct tuple<T, Ts...> : tuple<Ts...> {
tuple(T t, Ts... ts) : tuple<Ts...>(ts...), tail(t) {}

T tail;
};

我们先从基类开始,定义了一个空的tuple类模版,后面的特化则从参数包中剥离出第一个类型,以此定义了一个名为tail的成员。通过递归定义,当没有更多类型可以剥离时就停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
tuple<double, uint64_t, const char*> t1(12.2, 42, "big");

/*
struct tuple<double, uint64_t, const char*> : tuple<uint64_t, const char*> {
double tail;
}

struct tuple<uint64_t, const char*> : tuple<const char*> {
uint64_t tail;
}

struct tuple<const char*> : tuple {
const char* tail;
}

struct tuple {
}
*/

上面对于数据结构的定义使我们创建了tuple,其数据结构的大小和成员的内部布局都是确定的。另外,要想访问元祖,我们应该使用get函数模板来访问,定义一个帮助器类型,它允许我们访问元组中第k个元素的类型:

1
2
3
4
5
6
7
8
9
10
11
template <size_t, class> struct elem_type_holder;

template <class T, class... Ts>
struct elem_type_holder<0, tuple<T, Ts...>> {
typedef T type;
};

template <size_t k, class T, class... Ts>
struct elem_type_holder<k, tuple<T, Ts...>> {
typedef typename elem_type_holder<k - 1, tuple<Ts...>>::type type;
};

elem_type_holder是另一个可变参数类模板。它需要一个数字k和元组类型作为模板参数。这是一个编译时模板元编程,作用于常量和类型:

1
2
3
4
5
6
7
8
9
10
11
struct elem_type_holder<2, tuple<T, Ts...>> {
typedef typename elem_type_holder<1, tuple<Ts...>>::type type;
}

struct elem_type_holder<1, tuple<T, Ts...>> {
typedef typename elem_type_holder<0, tuple<Ts...>>::type type;
}

struct elem_type_holder<0, tuple<T, Ts...>> {
typedef T type;
}

以elem_type_holder <2,some_tuple_type>为例,其从tuple的开头剥离了两种类型,并将其类型设置为第三种类型。接下来再实现get。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <size_t k, class... Ts>
typename std::enable_if<k == 0, typename elem_type_holder<0, tuple<Ts...>>::type&>::type
get(tuple<Ts...>& t) {
return t.tail;
}

template <size_t k, class T, class... Ts>
typename std::enable_if<k != 0, typename elem_type_holder<k, tuple<T, Ts...>>::type&>::type
get(tuple<T, Ts...>& t) {
tuple<Ts...>& base = t;
return get<k - 1>(base);
}

/*
tuple<double, uint64_t, const char*> t1(12.2, 42, "big");

std::cout << "0th elem is " << get<0>(t1) << "\n";
std::cout << "1th elem is " << get<1>(t1) << "\n";
std::cout << "2th elem is " << get<2>(t1) << "\n";
*/

Variadic templates for catch-all functions

假设我们想要实现一个可以打印出标准库容器的函数,并且适用于任何容器。对于vector,list,deque来说,他们的摹本参数只有两个:value type和allocator type。但对于map和set来说,它们的参数个数都不止两个。因此我们可以使用可变模板来实现这个功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template <template <typename, typename...> class ContainerType,
typename ValueType, typename... Args>
void print_container(const ContainerType<ValueType, Args...>& c) {
for (const auto& v : c) {
std::cout << v << ' ';
}
std::cout << '\n';
}

template <typename T, typename U>
std::ostream& operator<<(std::ostream& out, const std::pair<T, U>& p) {
out << "[" << p.first << ", " << p.second << "]";
return out;
}

std::array

发表于 2019-05-30

std::array

std::array是在c++11中引入的,对原来C语言的数组的一个包装器,具有额外的优点。这是一种具有恒定大小元素的顺序容器。

其模版为:

1
2
template < class T, size_t N > 
class array;

T为元素类型,N为数组元素的个数。

头文件为:

1
#include <array>

Defining and Initializing an std::array<> object

举两个例子:

1
2
std::array<std::string, 200> arr1;
std::array<int, 10> arr3 = { 34, 45 };//Init: 34 , 45 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,

std::array还提供了一个方法可以一次性对所有的元素设置相同的值。

1
2
3
std::array<int, 10> arr4;
// Fill all elements in array with same value
arr4.fill(4);

How to get the size of std::array

1
arr.size();

How to access elements in std::array

  • operator []:访问超出范围的元素时会引起undefined behaviour;
1
int x = arr[2];
  • at():访问超出范围的元素时会抛出out_of_range异常;
1
int x = arr.at(2);
  • std::tuple’s get<>():访问超出范围的元素时会编译错误;
1
int x = std::get<2>(arr);

How to Iterate over a std::array<> object

对array的遍历,存在四种方法:

  • 使用基于范围的迭代循环;
  • 使用循环;
  • 使用迭代器;
  • 使用for_each;
<i class="fa fa-angle-left"></i>1…567…28<i class="fa fa-angle-right"></i>

278 日志
29 标签
RSS
© 2025 LucienXian
由 Hexo 强力驱动
主题 - NexT.Pisces