LucienXian's Blog


  • 首页

  • 归档

  • 标签

Golang内存模型

发表于 2020-04-04

Golang内存模型

参考自:https://golang.org/ref/mem

Golang的内存模型描述了这样的一种场景:在一个goroutine中对一个变量的读取能保证是由不同gorountine写入相同变量所产生的。

Happens Before

在单个goroutine中,只有在满足不改变语言规范所定义的行为时,编译器才能对单个goroutine所执行的读写进行重新排序。但由于重新排序,一个goroutine所观察到的执行顺序可能与另一个goroutine察觉到的执行顺序不同。

为了指定读写要求,在go程序中定义了一个叫Happens Before的偏序关系——如果事件e1发生在事件e2之前,那么我们说e2发生在e1之后。同样,如果e1不在e2之前发生并且在e2之后也没有发生,那么我们说e1和e2同时发生。

在单个goroutine中,Happens Before的顺序就是程序所表现出来的顺序。

为了保证对变量的读取R可以读取到由特定的对变量的写入W,即W是R可以观察到的唯一写入,必须要满足以下两个条件:

  1. W发生到R之前;
  2. 任何对变量的其他写入要么发生在w之前,要么发生在r之后;

变量的初始化为零值,其实也是内存模型中的零值写入。

Synchronization

初始化

程序的初始化是在单个goroutine中进行的,但goroutine可以创建其他goroutine,这是并发的。

如果一个package引入了另一个package,即被引入的package会先初始化。

main.main的开始必须要在所有init函数完成之后。

Goroutine的创建

以下面的为例子,f()打印出hello world可能会在hello()结束后才打印。

1
2
3
4
5
6
7
8
9
10
var a string

func f() {
print(a)
}

func hello() {
a = "hello, world"
go f()
}

Goroutine的销毁

无法保证goroutine的退出在程序中的任何其他事件发生之前发生。

1
2
3
4
5
6
var a string

func hello() {
go func() { a = "hello" }()
print(a)
}

对a的赋值很可能在下面的print中看不到,因为缺乏同步。

Channel的同步

通道通信是goroutine之间同步的主要方法。channel的发送必定发生在该channel接受完成之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int, 10)
var a string

func f() {
a = "hello, world"
c <- 0
}

func main() {
go f()
<-c
print(a)
}

这样就能保证打印出hello, world。

另外,channel的关闭会发生在返回零值的接收之前,这样用close(c)替代c<-0也可以产生相同的保证行为。

而对于缺乏buffer的channel,其接收会发生在该channel的发送完成之前,例如这样也可以保证打印出正确的hello world,这里的发送和接收顺序与上面的例子相反。

1
2
3
4
5
6
7
8
9
10
11
12
13
var c = make(chan int)
var a string

func f() {
a = "hello, world"
<-c
}

func main() {
go f()
c <- 0
print(a)
}

但如果channel是带有buffer,就无法保证打印出hello world了。

在容量为C的通道上的第k个接收发生在该通道的第k + C个发送完成之前,因为不从channel接收数据就无法继续写入。

该规则将前一个规则推广到缓冲通道。 它允许通过缓冲的通道对计数信号量进行建模:channel中的项目数量对应于活动使用的数量,channel的容量对应于同时使用的最大数量,发送一个项目获取信号量,接收项目则会释放信号量。通过这种操作就可以限制其并发。

1
2
3
4
5
6
7
8
9
10
11
12
var limit = make(chan int, 3)

func main() {
for _, w := range work {
go func(w func()) {
limit <- 1
w() // 不处理完成,无法释放该信号量
<-limit
}(w)
}
select{}
}

Locks

sync包里实现了两种锁相关的数据类型:sync.Mutex和sync.RWMutex。通过锁的使用,我们可以在goroutine中保证同步。这样的一个程序就可以顺利打印出hello world。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var l sync.Mutex
var a string

func f() {
a = "hello, world"
l.Unlock()
}

func main() {
l.Lock()
go f()
l.Lock()
print(a)
}

Once

sync包还提供了一种初始化的安全机制,通过使用Once数据类型,多个线程都可以执行once.Do(f),但只有一个会运行f(),其他的调用将会block直接f()返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var a string
var once sync.Once

func setup() {
a = "hello, world"
}

func doprint() {
once.Do(setup)
print(a)
}

func twoprint() {
go doprint()
go doprint()
}

在这种机制下,a的赋值将会在打印之前执行。

Incorrect synchronization

需要注意的是读取R可能会观察到与R同时发生的写入W所写入的值,但这并不意味着在R之后的读取会观察到在W之前所发生的写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var a, b int

func f() {
a = 1
b = 2
}

func g() {
print(b)
print(a)
}

func main() {
go f()
g()
}

这里可能发生的情况是g()打印出了2和0,也就是即便g()已经读取了f()里面对b的写入,但这不意味着g()里面的a能够读取到f()中在b写入之前的a。

同理,类似的错误也会发生在同步的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var a string
var done bool

func setup() {
a = "hello, world"
done = true
}

func doprint() {
if !done {
once.Do(setup)
}
print(a)
}

func twoprint() {
go doprint()
go doprint()
}

这里并不意味着能够观察到done设置为true,就隐式地说明a已经被初始化。

另一种典型错误则是忙等,这种情况下并不意味着done被设置为true后,能够表示a已经被初始化,可以跳出for循环。真实情况是,此时print(a),a可能还是空字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var a string
var done bool

func setup() {
a = "hello, world"
done = true
}

func main() {
go setup()
for !done {
}
print(a)
}

应对这些问题也很简单,使用显式地同步。

decltype in c++11

发表于 2019-12-20

decltype

decltype是c++11引入的类型推导标记符,与auto类似。基本语法比较简单,就是给一个表达式,返回表达式的类型:

1
decltype ( expression )

这里只会查询表达式的返回类型,并不会对表达式进行求值。

decltype的判断规则是比较复杂的,主要分为以下几类:

  • 如果参数是无括号的标识表达式或无括号的类成员访问表达式,decltype会返回以该表达式命名的实体类型。但如果参数是一个重载函数,则会编译错误;
  • 若参数是其他类型为 T 的任何表达式
    • 表达式的值类型为临时值/亡值,则会返回T&&;
    • 表达式的值类型为左值,则会参会T&;
    • 纯右值,则会返回T;

需要注意的是,如果对象的名字带有括号,则它被当做通常的左值表达式,从而 decltype(x) 和 decltype((x)) 通常是不同的类型。

举个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct A { double x; };
const A* a;
int i=10;

decltype(a->x) y; // y 的类型是 double(其声明类型)
decltype((a->x)) z = y; // z 的类型是 const double&(被当作左值表达式)
decltype((i))b = i; // b 的类型是 int&

// 或者用在无名函数的类型推导上
auto f = [](int a, int b) -> int
{
return a * b;
};

decltype(f) g = f;
i = f(2, 2);
j = g(3, 3);

在日常编程中,用到decltype的情况还是比较少的,我们一般用在模版中,结合auto和尾返回类型,我们可以写出语言级别支持的简洁代码:

1
2
template<typename T, typename U>
auto foo(T t, U u) -> decltype(t + u) { return t + u; }

另外,要判断是否为左值,可以考虑使用c++11标准库提供的模版类来做检查:

1
is_lvalue_reference<decltype(++i)>::value;

Scaling Distributed Machine Learning with the Parameter Server——MIT6.824

发表于 2019-12-15

Scaling Distributed Machine Learning with the Parameter Server

这篇论文提出了一种用于解决分布式机器学习问题的参数服务器框架。通过将数据和工作负载均匀地分布在所有工作节点上,服务器节点则用来维护全局共享的参数(即一些向量和矩阵)。这个框架能够很好地管理节点之间的异步数据通信,并保持了灵活的一致性、弹性可伸缩性和容错能力。

这是它的开源实现:https://github.com/dmlc/parameter_server

Introduction

分布式的优化和推理正成为解决大规模机器学习问题的先决条件,因为数据的增长和模型复杂,很难通过单机去快速解决这些问题。因此,如此大量的计算工作和数据都需要仔细的系统设计,

而这些复杂的模型需要在所有工作节点中进行全局共享,由于需要经常访问共享参数,因此这会带来三个挑战:

  • 访问参数需要大量的网络带宽;
  • 许多机器学习算法都是顺序执行的,如果同步和机器延迟成本很高,对性能影响很大;
  • 大规模的容错能力;因为机器学习任务通常在云环境中执行,而云环境不够稳定可靠;

Contributions

参数服务器(Parameter Server)在学术界和工业界已经有了一定的影响力。本文主要描述其第三代开源实现。其注重于分布式推理的实现,提供了5个关键功能:

  1. 高效的通信:使用了不会阻塞计算的异步通信模型;
  2. 灵活的一致性模型:较为宽松的一致性降低了同步成本和延迟;
  3. 弹性可伸缩行:主要是在运行时添加新节点,无需重启;
  4. 容错性和耐用性:秒级恢复故障机器,不会中断计算,并使用向量明确网络分区和故障行为;
  5. 计算更简单:全局共享的参数是向量和矩阵的形式;

Machine Learning

该论文因为要在非常大型的训练数据中证明参数服务器的有效性,因此介绍了两种广泛使用的机器学习技术。

Risk Minimization

risk minimization是机器学习中最直观的一个问题,大概意思就是对预测误差的衡量,即通过risk minimization的模型来预测自变量x的值y。训练数据量与模型大小的有着重要的联系,详细的模型可能提高了准确性,却导致过拟合,反之则可能是欠拟合。为了解决这个问题,则通过正则化来实现在模型复杂度和训练误差之间取得平衡,即最小化训练数据预测误差损失和惩罚模型复杂度的正则器。

虽然这两个对于机器学习算法的性能有着很重要的影响,但不是评估参数服务器的关键,因此这里采用了比较简单的算法,一种叫次梯度的分布式下降算法( distributed subgradient descent)

在参数服务器中,训练数据分配到所有的worker,共同学习参数向量w。算法在每次迭代的时候,每个worker会独立地使用自己的训练数据来计算Δwi,这就是subgradient,参数向量w的移动方向,然后使用所有的subgradient来表示最终w的梯度。为了快速收敛,需要设计有效的学习率。算法如下所示

img
img

由于计算梯度的成本比较高,如果w的维度很高的话,计算将无法执行,因此每个worker都需要知道其训练数据所依赖的w的坐标范围,从而减少计算量。

论文的实验结果是,随着worker的增长,单机所需内存也在下降。如下图,对于100个worker,每个worker只需要使用参数的7.8%。拥有10,000名工人,这一比例降低到0.15%。

Generative Models

另一类主要的机器学习算法就是使用非监督算法来捕获数据的基础结构,具体不表,也是通过学习部分参数,然后进行聚合,但可能有点不同的是,有些算法不是使用的梯度下降,而是其它的比较方法。

Architecture

参数服务器可以同时运行多种算法,其将节点分为server group和好几个worker group。如下图所示,server group中的server节点负责维护全局共享的部分参数,而这些节点则通过相互通信完成参数迁移和复制,同时通过维护诸如节点状态和参数分配等原数据的一致性视图。

每个worker都运行一个应用程序,存储着部分的训练数据,worker之间是不能通讯的,只能与server节点交流,从而更新和检索参数。每个worker都有一个叫task scheduler的节点,负责为worker节点分配任务和监视进度。当有workers新增或移除节点·时,task scheduler负责重新分配未完成的任务。

参数服务器支持独立的参数名称空间,也允许多个worker group共享同一个名称空间。

(Key,Value) Vectors

节点之间的共享模型可以表示为一组键值对,并且假设所有的key都是有序的,例如损失函数最小化问题中,键值对是特征ID及其权重,对于LDA,则是单词ID和主题ID以及计数的组合。

Range Push and Pull

参数服务器中,worker与server的节点之间发送数据是通过推拉操作完成的,并且支持基于范围的推拉。worker将计算好的梯度push到server,然后worker从server读取新的参数。

1
2
w.push(R, dest) // 将key范围中w的所有现存项全部发往目的,这里的目的可以是特定节点,也可以是节点组
w.pull(R, dest) // 从目的位置读取key范围中w的所有现存项

User-Defined Functions on the Server

除了从worker聚合数据之外,server节点还可以执行用户自定义的功能,这里的好处是因为server节点往往具有共享参数更加完整更加新的的信息。

Asynchronous Tasks and Dependency

task是通过远程调用发出的,worker向server发出的消息是pull或者push其中一种,也可以是由调度程序发给任何节点的用户自定义功能。另外task可能包含任意数量的子task。

task是异步执行的,发出task之后,caller可以马上执行进一步的计算,caller仅在收到callee答复才能标记任务完成。默认情况下,callee并行执行任务,如果要实现序列化,则可以在task之间执行一个依赖关系。如下图就指示了三个迭代例子,其中10和11是独立的,但12依赖于11,因此callee在10中完成计算局部梯度之后可以立即开始11,但12则需要等到11的计算完成,返回数据之后才能开始。

Flexible Consistency

独立的任务虽然可以通过并行化CPU的使用,磁盘和网络带宽的方式提高系统效率,但也可能带来节点间数据不一致的问题,从而降低算法收敛速度,因此需要在系统效率和算法收敛速度之间进行取舍,但这里的权衡取舍又会依赖于以下几种因素:

  • 算法对数据不一致的敏感度;
  • 训练数据的特征相关性;
  • 硬件组件的容量差异;

参数服务器为算法设计人员提供了定义一致性模型的灵活性,下图就是通过任务依赖关系实现的三种不同模型:

img
img

Sequential:顺序执行所有任务,当前一个任务执行完成,才能启动下一个任务;

Eventual:所有任务同时执行,这只有在对延时敏感度很robust的算法中才会被使用;

Bounded Delay:就是前两种模型的折衷,即使用一个最大延时t,在t时刻之前完成前一个任务才能启动下一个任务。t=0就是Eventual,t=∞即Sequential;

另外,tasks之间的依赖关系可能是动态的,即可以根据系统情况来改变最大延时以平衡系统效率和优化算法的收敛性。

User-defined Filters

作为调度程序控制流的一个拓展,参数服务器支持用户自定义filter,以选择性地在节点之间同步各个键值对,从而更加细粒度地控制数据。这里的关键是,要明确优化算法本身拥有的跟参数有关的信息是什么。例如有些filter只会push自上次同步依赖变化超过阈值的item,有些则利用filterpush仅仅可能影响server权重的梯度。

Implementation

参数服务器使用一致性哈希来存储参数,并使用链式复制来备份内容,同时对基于key范围的通信进行了优化。

Vector Clock

参数服务器会将每个键值对与矢量时钟关联起来,该时钟会记录每个节点在该键值对上的时间,矢量时钟的一个好处就是可以用来跟踪聚合状态或者是拒绝重复发送数据。但矢量时钟的最初版需要O(mn)的空间来处理n个节点和m个参数,这样的成本太高了。

但由于参数服务器可以进行基于范围的通信模式,这样就会使得许多参数具有相同的时间戳,这样就可以将矢量时钟进行压缩。

一开始,所有的节点都共享一个range vector clock,覆盖了整个参数键空间,其最初的时间戳为0。每个range key都可以抽取子range,并最多创建3个新的矢量时钟,参考下面的算法,这样就可以大大降低参数数量:

Messages

节点可以将消息发送给耽搁节点或者是节点组,一条消息由key范围R中的键值对列表和相关范围的矢量时钟组成,这是参数服务器的基本通信格式,不仅用于共享参数,也会应用于任务。而对于任务,键值对则可能表示为(taskID,参数/返回值)。

参数服务器基本通信格式:[vc(R), (k1,v1), ..., (kp,vp)] kj ∈ R and j ∈ {1,...p}

由于机器学习问题往往需要高带宽,因此需要进行消息压缩:

  1. 如果迭代之间的训练数据保持不变,则可以要求接收节点缓存key列表,以后发送节点只需要发送该key的hash值即可。
  2. 由于键值对的value本身可能包含许多零项,另外用户定义的过滤器也可以将部分值归零,因此可以只发送非零的键值对即可。

Consistent Hashing

参数服务器将key的分区方式是通过将key和服务器节点ID插入到哈希环中,如下:

每个服务器节点管理一定的key范围,每个服务器节点都管理者它按逆时针方向到下一个节点之间的key range,这就是该key range的主节点。同样,每个节点都复制了按逆时针方向的k个节点的key range。另外为了改善平衡和提高恢复效率,物理服务器通过多个"虚拟服务器"在环中表示。例如k=2时,S1就会复制S2和S3管理的key range,这时S1就是这两个key range的slave

Replication and Consistency

Worker仅与该key range的master进行通信,在主server节点的修改和时间戳都需要复制到slave服务器。下面的左图就是这样的一个情况,

Worker1会push x到server1,server1执行完用户自定义函数后同步到slave server2,同步完成后,任务才算结束。

但简单的同步复制可能会使网络流量增加k倍,这对很多依赖于高网络带宽的机器学习应用是很致命的。因此参数服务器框架做了一个重要的优化:聚合后的复制,即servers先聚合从workers接收到的数据后再复制到slaves。如上右图所示,对于n个workers来说,复制带宽将会降低n倍。

Server Management

为了实现容错和动态扩展,参数服务器还必须支持添加和删除节点,为了方便起见,将会引入虚拟服务器。server 节点加入后,将会发生:

  1. server管理器为新节点分配一个key range以用作master节点。这可能会导致另一个key range分裂或者从某个终止节点中删除;
  2. 新加入的节点会获取k个key range,自身成为这些key range的slave;
  3. server管理器广播节点更改,接收方server将移除不属于自己管理的key range的数据,并将未完成的任务重新提交给新节点;

Worker Management

添加新的worker节点与添加server节点类似,但更简单:

  1. task调度器为新节点分配数据范围;
  2. 该节点从网络文件系统或者现有的工作程序中加载训练数据,接下来则是从server节点pull共享参数;
  3. task调度器广播修改,这可能会使部分workers释放部分训练数据;

当worker节点挂掉后,参数服务器提供了选项,用户可以自行控制恢复程序,这是因为:

  • 如果训练数据量巨大,则恢复worker节点可能比恢复server节点的成本更高;
  • 在优化过程中丢失少量训练数据通常只会对模型造成少许影响;

因此,算法设计者可能更喜欢继续操作而不替换失败的工作程序。

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing——MIT6.824

发表于 2019-12-07

Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

Abstract

本文介绍了弹性的分布式数据集,这是一种分布式的内存抽象形式,它可以让程序员以一种容错的方式在大型集群中进行内存计算。

Introduction

尽管想MapReduce之类的框架已经提供了关于访问集群的计算资源的抽象,但仍然缺乏对于分布式内存的抽象,这使得它们在处理需要重用多个计算中中间结果的应用程序上不够高效。像一些机器学习算法、图算法,交互式数据挖掘都需要对数据子集做临时的查询,但那些框架的做法往往是将其写入到外部的存储系统里,这里IO、序列化之类的开销非常大。

本文提出了一种弹性的分布式数据集的新抽象。其可以将中间结果明确地保存在内存中,控制其分区进行优化放置,并使用一组丰富的运算符进行操作。

Apache Spark应运而生。

Resilient Distributed Datasets (RDDs)

RDD Abstraction

所谓的弹性的分布式数据集,这里的弹性指的是在任何时候都可以进行重算,让用户不会感知到某部分内容曾经丢失过,这是Spark的核心抽象,是一种只读的、分区的数据记录集合。RDD的产生,要么通过从确定存储中获取,要么就是通过其它的RDD进行转换获取,这里的转换包括map、filter和join。

RDD应该有足够的信息,去记录自身是如何从其它数据集派生而来的。用户可以控制RDD的持久化和分区,比如指示重用的RDD和存储策略,也可以命令RDD的元素进行分区。分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

Spark Programming Interface

Spark会使用集成API的方式暴露RDD,其中每个数据集标示为一个对象,并使用对象上的方法进行调用转换。

首先是会对稳定存储中的数据通过转换的方式定义一个或多个RDD,然后在操作中使用这些RDD,比如是返回数据给应用程序,还是导出到存储系统。此外,还可以对RDD进行持久化来指示哪些RDD是需要重用的,默认将持久性RDD保存在内存中,在RAM不够的话,将会将其溢出到磁盘,当然还有很多种持久化策略。

Advantages of the RDD Model

RDD与分布式共享内存最大的区别就是,RDD只能通过粗粒度的转换得来,而DSM则可以读取/写入到每一个内存位置。这样RDD在处理容错时,就不会产生额外的有关checkpoint的开销,如果有分区丢失,RDD可以在不同的节点并行地重建,而不需要回滚整个系统。

由于RDD的不可变特性,系统可以通过运行较慢的备份副本来缓解慢速节点,而DSM在这种情况会因为多副本访问相同内存位置,而产生干扰更新。

Applications Not Suitable for RDDs

RDD更适用于在批处理应用程序中对全集数据执行相同的操作,在这种情况下,RDD能够很好地记录每一步的转换,并且能够在分区丢失时快速恢复。

而对于那些需要对共享状态进行异步更新的应用,RDD则不适合。

Spark Programming Interface

为了使用Spark,开发人员编写了一个驱动程序,用以连接到一组worker,并通过一个或多个RDD来调用action,同时该驱动程序上的Spark代码还可以跟踪RDD的lineage(血统?)。

这些worker是一个长期活跃的进程,在内存中存着RDD分区。

RDD Operations in Spark

下表列出了Spark中可用的RDD转换和可用操作:

Representing RDDs

论文中提到使用了大约14000行Scala代码实现了Spark,这个系统在Mesos集群上运行。每个Spark程序都作为一个单独的Mesos应用程序,具有独立的驱动程序和worker程序,并且这些应用程序之间的资源共享由Mesos处理。

论文中提到RDD的表示是一种基于图的表示。因此对于RDD的表示,则是通过暴露5个接口方法来实现的。

  • Partions:数据集的原子结构;
  • preferredLocations:能更快访问分区的系列节点;
  • dependencies:记录父子RDD的记录;
  • iterator:用于从父RDD计算子RDD;
  • patitioner:数据分区的元信息;

至于如何表示RDD的关系,由于RDD在物理上是分区的,散列在集群不同机器的内存上的,文中将其定义为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)两种。

  • 宽依赖:父RDD中的分区可能被子RDD中的多个分区所依赖;
  • 窄依赖:父RDD的每个分区至多被子RDD中的一个分区所依赖;像map/filter这些操作都属于窄依赖;

这两种依赖的差别在于:窄依赖可以pipeline执行,在失败时只需要重新执行对应的父RDD即可;而宽依赖则需要shuffle,并且如果出现故障恢复则需要重算所有父RDD;

Job Scheduling

每当用户在RDD执行action的时候,调度器就会检查RDD的谱系图,以构建要执行的DAG(有向无环图)。如下图所示:

每个stage内部都包含尽可能多的具有窄依赖的操作。这些stage的边界是宽依赖所必需执行的shuffle操作,另外任何已经计算出的分区都可以使父RDD的计算短路。调度程序会集群上启动任务以计算每个阶段中缺少的分区,直到它计算出目标RDD。

调度器会根据数据局部性的原则来执行delay scheduling算法:

  • 如果任务需要的数据分区在某节点的内存中,则将任务发送到节点上执行;
  • 否则,如果该分区有指定的位置,则直接发送给它;

对于宽依赖,Spark会在存有父分区的节点上暂存shuffle的中间记录,以便做容灾处理,就像mapreduce存下map的输出一样。

如果任务失败,只要stage的父stage还是可用的,就可以将task调度到另一个节点上重新运行即可。如果父stage也失效了,就会重新提交一个计算父stage数据的Task来并行计算丢失的分区。但论文也提到了Spark没有考虑调度器本身的高可用。

Interpreter Integration

Scala包含了交互shell,可以让用户从解释器中交互地运行Spark以查询大数据集。

Scala解释器通常会为用户输入的每一行编译一个类,并将其加载到JVM中通过调用一个函数来进行操作。该类会包含一个单例对象,对象则包含该行上的变量或者函数,并以初始化的方式运行该行代码。

另外,Spark的解释器还做了两处修改:

  1. 类传递:为了让工作节点能够获取在每一行上创建的类的字节码,解释器会通过HTTP为这些类提供服务;
  2. 修改代码的生成:因为每行代码的单例对象都是通过对应的静态方法访问,这意味着无法引用上一行定义的变量。因此需要修改代码的生成,以便直接引用每行对象的实例,如下图:

Memory Management

Spark提供了三种对持久化RDD的存储策略:

  • 未序列化Java对象存在内存;
  • 序列化的数据存于内存;
  • 磁盘存储;

第一种性能最好,因为可以直接访问在Java虚拟机内存里的RDD对象;第二种性能会降低,在空间有限的情况下可以让用户选择比Java对象图更高效的内存表示方式;第三种则是针对RDD太大无法保留在内存中,但每次使用都需要重新计算开销很大时,这个方法会很有用;

为了管理可用的有限内存,Spark在RDD级别使用了LRU逐出策略。当计算了一个新的RDD分区但没有足够存储空间时,就会通过LRU的方式逐出一个分区。除非是该RDD便是新分区对应的RDD,在这种情况下,Spark会将旧的分区保留在内存,避免同一个RDD的分区被循环地调进调出。

Support for Checkpointing

虽然lineage机制可以满足失败后RDD的重建恢复,但对于具有很长链条的RDD来说,恢复时间会很长。特别是包含了宽依赖的长lineage的RDD,因此能设置检查点的操作就会非常有用。Spark当前提供了为RDD设置检查点操作的API,可以用一个REPLICATE标志来持久化,用户自行决定使用方式。

redis设计与实现——AOF持久化

发表于 2019-11-05

AOF持久化

RDB持久化是通过保存数据库中的键值对来记录数据库状态的,而AOF则是通过保存redis服务器所执行的命令来完成记录的。

例如执行命令:

1
2
SET msg "hello"
RPUSH numbers 128 256 512

那么RDB的持久化就是保存msg和numbers的键值对,而AOF则是保存SET和RPUSH的命令。

AOF持久化的实现

AOF的持久化功能分为命令追加、文件写入、文件同步三个步骤。

命令追加

打开AOF持久化功能后,服务器在执行完一个写命令后,会以redis的协议格式将被执行的命令写到服务器状态的缓冲区,则redisServer结构的aof_buf字段。在大量写请求情况下,利用缓冲区缓存一部分命令,尔后再根据某种策略写入磁盘,减少IO。

1
2
3
struct redisServer {
sds aof_buf; /* AOF buffer, written before entering the event loop */
}

AOF文件的写入与同步

Redis的服务器进程中有一个事件循环,正如注释所说的,每次结束事件循环前都会调用flushAppendOnlyFile()函数,该函数则根据配置选项决定如何写入AOF文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void beforeSleep(struct aeEventLoop *eventLoop)  {
// ...
flushAppendOnlyFile(0);
// ...
}

#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif

void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

void flushAppendOnlyFile(int force) {
//...
try_fsync:
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;

if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
latencyStartMonitor(latency); //监控
redis_fsync(server.aof_fd); /* 同步到磁盘 */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd); // 在额外的线程中开启一个任务去执行fsync()
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
  • force:如果持久化策略为everysec,就有一定的可能延迟flush,因为后台进程可能还在进行fsync(),而如果force设成1,则无论什么情况都会进行写入。

另外由于在Linux中用户调用write函数时,操作系统会先将写入数据保存在一个内存缓冲区中,redis支持服务器配置appendfsync选项来定义上面的函数行为:

1
2
3
4
5
/* Append only defines */
#define AOF_FSYNC_NO 0
#define AOF_FSYNC_ALWAYS 1
#define AOF_FSYNC_EVERYSEC 2
#define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC //默认
  • AOF_FSYNC_ALWAYS:将aof_buf缓冲区的所有内容写入并同步到AOF文件;
  • AOF_FSYNC_EVERYSEC:将aof_buf缓冲区的所有内容写入AOF文件,如果上次同步AOF文件的时间距离现在超过1秒,则再次进行同步;
  • AOF_FSYNC_NO:写入文件但不同步;

AOF文件的载入与数据还原

由于AOF文件包含了重建数据库的所有写命令,因此只需要重读执行一遍,就可以恢复服务器状态了。其实现函数为loadAppendOnlyFile()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int loadAppendOnlyFile(char *filename) {
struct client *fakeClient; // 创建一个伪客户端
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */

if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}

/* 特殊处理aof文件长度为0的情况 */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
server.aof_fsync_offset = server.aof_current_size;
fclose(fp);
return C_ERR;
}

/* 参数关系aof,避免有新纪录写入同一个文件 */
server.aof_state = AOF_OFF;

fakeClient = createAOFClient();
startLoadingFile(fp, filename); // 做全局状态的标记,表示正在加载文件

/* 如果有RDB前缀,则需要加载RDB文件 */
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
// ...
}

/* 读入AOF文件,一个一个命令执行. */
while(1) {
// ... 读取cmd

if (cmd == server.multiCommand) valid_before_multi = valid_up_to;

/* 在fake客户端上下文里执行命令 */
fakeClient->cmd = cmd;
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
queueMultiCommand(fakeClient);
} else {
cmd->proc(fakeClient);
}

/* 该客户端不作回应 */
serverAssert(fakeClient->bufpos == 0 &&
listLength(fakeClient->reply) == 0);

/* 客户端不受blocked */
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);

// ...
}

// ....
}
  1. 由代码可见,首先是创建一个不带网络连接,不做回应不受blocked的客户端,因为执行命令只能在客户端上下文执行;
  2. 从AOF文件中分析并读出写命令;
  3. 用伪客户端执行该命令,知道所有命令处理完毕;

AOF重写

为了解决AOF文件体积膨胀的问题,Redis提供了AOF文件重写的功能,即Redis服务器会创建一个新的AOF文件来替代现有的AOF文件,并去除任何浪费空间的冗余命令。

AOF文件重写的实现

事实上,AOF文件重写并不会对老的AOF文件进行任何读取、分析或者写入操作,而是通过直接读取当前数据库的状态实现的。aof的重写是通过函数rewriteAppendOnlyFileRio实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
size_t processed = 0;
int j;

// 遍历数据库
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; // 写入select命令
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);

/* 写入select命令,指定数据库号码 */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

/* I遍历数据库中的每个key value */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;

keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);

expiretime = getExpire(db,&key);

/* 根据key的类型进行重写*/
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}
/* 如果key带有过期时间,需要保存过期时间 */
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}
/* 从父进程中读取diff的内容 */
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL;
}
return C_OK;

werr:
if (di) dictReleaseIterator(di);
return C_ERR;
}

另外,以写入集合键为例,可以看到为了避免在执行命令时导致客户端输入缓冲区溢出,重写快速链表、哈希表、集合和有序集合这种带有多个元素的key时,会先检查key包含的元素数量。如果超过了AOF_REWRITE_ITEMS_PER_CMD,则会使用多条命令进行重写。默认是64。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#define AOF_REWRITE_ITEMS_PER_CMD 64

int rewriteListObject(rio *r, robj *key, robj *o) {
long long count = 0, items = listTypeLength(o);

if (o->encoding == OBJ_ENCODING_QUICKLIST) {
//........
while (quicklistNext(li,&entry)) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items; // 判断key元素是否超过AOF_REWRITE_ITEMS_PER_CMD
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}

// 写入value,省略

if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0; // 如果超过了则使用多条RPUSH命令重写
items--;
}
quicklistReleaseIterator(li);
} else {
// ...
}
return 1;
}

AOF后台重写

为了避免函数会阻塞服务器处理客户端的请求,Redis将AOF重写放到子进程中执行,同时为了避免在子进程执行AOF重写期间,由于服务器进程在处理新的请求,从而使得现有数据库状态发生改变,Redis设置了一个AOF重写缓冲区,在服务器创建完子进程后开始使用,当Redis执行完一个写命令之后,会同时将写命令发送到AOF缓冲区和AOF重写缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
void bgrewriteaofCommand(client *c) {
if (server.aof_child_pid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (hasActiveChildProcess()) {
server.aof_rewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReplyError(c,"Can't execute an AOF background rewriting. "
"Please check the server logs for more information.");
}
}

首先判断是否已经存在相关bgrewrite子进程,倘若有会在这些命令完成后执行。否则会fork出子进程。在子进程完成aof重写后,会发一个信号给父进程,父进程会调用backgroundRewriteDoneHandler()将aof重写缓冲区中的所有内容写入到新的aof文件中,然后进行原子性地覆盖旧的aof文件。重写缓冲区的内容是通过aofRewriteBufferWrite写入到新的aof文件中的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;

listRewind(server.aof_rewrite_buf_blocks,&li);
// 逐个地将aof_rewrite_buf_blocks缓冲区中的内容重写到aof文件
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;

if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}

No compromises: distributed transactions with consistency, availability, and performance——MIT6.824

发表于 2019-10-10

No compromises: distributed transactions with consistency, availability, and performance

Abtract

本文展示了一个名为FaRM的主存分布式计算平台,可以提供强串行化、高性能、高可用和耐用性等特质,为此设计了新的事务,复制和恢复协议。

Introduction

具有高可用性和严格序列化的事务虽然简化了编程,但在一定程度上也影响了系统的性能。因此,像Dynamo或者Memcached之类的系统通过不支持事务或者提供弱一致性来提高性能。有些系统则是仅在所有数据都停留在一台机器中时才提供事务。因此需要程序员费心思去考虑数据分区的问题。

FaRM提供了分布式的ACID事务,具有严格的可分级性,高可用性,高吞吐量和低延迟。设计的协议则利用数据中心中出现的两种硬件趋势——具有RDMA的快速网络和廉价的DRAM提供,通过在电源故障时将DRAM的内容写入SSD来实现非易失性。FaRM的协议遵循三个原则来解决CPU瓶颈:减少消息计数,使用单向的RDMA读写而不是消息,并有效地利用并行性。

FaRM通过使用单向RDMA操作进一步降低了CPU开销,因为并不会使用到远程CPU。为了使用单向的RDMA,需要设计新的恢复协议(例如RDMA的数据请求是通过网卡提供的,不能简单地在期限到时拒绝传入请求)。另外,恢复协议借助并行性,在集群中均匀地分配每个状态的恢复,并在每台机器的core之间作并行恢复。

Hardware trends

前面讲过,FaRM的提出利用了两种硬件趋势——非易失性DRAM和具有RDMA的快速网络。

RDMA networking

FaRM尽可能使用单向的RDMA操作,这是一种远程直接数据存取,是为了解决网络传输中服务器端数据处理的延迟而产生的。文中的实验发现,RDMA读取比可靠性的RPC执行性能高2倍,而RDMA的性能瓶颈是网卡的消息速率。另外,RDMA和RPC都会受到CPU的限制,因此降低CPU开销才是挖掘硬件潜力的好方法。

Programming model and architecture

FaRM为应用程序提供了跨集群机器的全局地址空间的抽象,每个机器都运行独立的应用程序进程并存储对象在地址空间里。FaRM的API提供了对本地或者远程对象的透明访问,应用程序线程可以随时启动事务,在事务执行期间可以执行任意逻辑,随后可以调用FaRM来提交这些逻辑操作。

FaRM事务使用乐观并发控制,所有更新都被本地缓存,并且仅在成功提交后才对其他事务可见。如果并发事务冲突,事务的提交就会失败。

FaRM API还提供了无锁读取(优化的单对象只读事务)和位置提示,这样应用程序就可以将相关对象共存于同一组机器上,从而改善性能。

如下图所示,每台机器在用户进程中固定在每个硬件线程中的内核线程上运行FaRM,,每个内核线程运行一个事件循环,该循环执行应用程序代码并轮询RDMA完成队列。

随着计算机故障或添加新计算机,FaRM实例会随着时间推移逐步进行一系列的配置。配置是元组⟨i,S,F,CM⟩,其中i是唯一的,单调递增的64位配置标识符,S是配置中的一组计算机,F是从计算机到故障域的映射,CM则是配置管理器,FaRM使用Zookeeper来协调服务,确保机器就当前配置达成一致并进行存储。每个配置更改都会由CM调用一次Zookeeper,以更新配置。

FaRM中的全局地址空间由2GB的Region组成,每个Region都会备份到一个主备份和f个副备份中。每台机器在非易失性DRAM中存储多个Region,其他Region可以使用RDMA读取这些Region。读取对象必须要从包含该Region的主备份中完成,如果该Region位于本机,则使用局部地址空间读取。如果是远程,则使用单面RDMA读取。每个对象都有一个用于并行控制和复制的64位版本。Region标识符,即从主备份和副备份的映射由CM管理,并由线程与将单面RDMA读取发布到主备份所需的RDMA引用一起缓存。

所有机器都与CM沟通以分配新Region,从单调递增的计数器分配Region标识符,并选择该区域的副本。副本选择需要平衡存储在每台机器上的Region数量,同时受到以下限制:容量足够大,每个副本位于不同的故障域中,并且当应用程序指定位置限制时,该Region与目标Region位于同一位置。CM将准备消息发送给具有Region标识符的所选副本。如果所有副本都报告分配区域成功,则CM向所有副本发送一条提交消息。这是一个两阶段协议。

每台机器还存储实现FIFO队列的环形缓冲区。它们用作事务日志或消息队列。发送者会使用对尾部进行的单面RDMA写入,将记录追加到日志中。NIC会确认这些写入,但是不会涉及接收方的CPU。接收者定期轮询日志的开头以处理记录。

Distributed transactions and replication

FaRM集成了事务和备份的协议可以很好地提高性能,传统协议相比,它使用的消息更少,并且利用单面RDMA读取和写入来提高CPU效率和降低延迟。FaRM使用非易失性DRAM中的主备份复制来存储数据和事务日志,并使用单个事务协调器直接与primary和backup进行通信。

下图是FaRM事务的timeline。虚线和实线分别表示RDMA的读写,点线表示硬件的响应,矩形是对象数据。

在执行阶段,事务使用单向RDMA读取对象,并且它们在本地缓存写操作。协调器还记录所有访问对象的地址和版本,如果primary和backup与协调器位于同一个机器,对象访问会使用本地内存而不是RDMA来读取和写入日志。

提交事务:

  1. Lock:协调器将LOCK记录写入每台机器上的日志,这些机器是写入对象的主要机器。Primary的机器通过锁定特定版本对象的方式来处理这些记录。如果获取到所有的lock,那么将发送一条报告消息,否则会终止事务;
  2. Validate:协调器对primary机器执行读取验证,主要是读取所有对象的版本号,看是否一致。验证默认是通过单边的RDMA读取完成的;
  3. Commit backups:协调器在每次备份时将COMMITBACKBACK记录写入非易失性日志,然后等待NIC硬件的确认,而不是中断backup机器的CPU;
  4. Commit primaries:在COMMITBACKBACK写入backup机器之后,协调器开始对每台机器提交COMMIT-PRIMARY。Primary会更新对象的版本号;
  5. Truncate:在协调器收到所有主节点的响应之后,就会通过在其他日志记录中附带截断事务的标识符来实现记录的截断;

正确性

提交的读写事务在获取所有写锁时是可序列化的,而提交的只读事务在上一次读取时是可序列化的。在没有失败的情况下,这等效于在序列化时间点原子地执行和提交整个事务。

为了确保跨故障的可序列化性,必须在写入COMMIT-PRIMARY之前等待所有备份硬件的确认。否则一旦主节点在不接收COMMIT-BACKUP记录的情况下挂掉了,那么就可能丢失掉某个region的修改。

由于读集仅存储在协调器中,因此如果协调器失败并且没有提交记录可以生存以证明验证是成功的,则事务将中止。协调器必须要在其中一个主数据库上等待成功提交,然后再向应用程序报告成功提交。否则,如果协调器和所有相关backup节点都挂掉了,那么事务就会被终止了。因为没有可以用来做验证的记录。

Performance

对于FaRM来说,其协议比传统的分布式提交协议具有更多的优势,以带有备份的两阶段提交协议——Spanner的协议为例,Spanner使用Paxos复制事务协调器及其参与者,它们是存储由事务读取数据或写入数据的机器。每个Paxos状态机在传统的两阶段提交协议中都扮演着单独的机器的角色。因此这需要2f +1个副本才能容忍f个故障,每个状态机操作至少需要2f +1个往返消息,则需要4P(2f +1)个消息(其中P是事务参与者的数量)。

FaRM使用primary-backup复制而不是Paxos状态机复制。这将数据副本的数量减少到f+1,减少了在事务处理期间传输的消息的数量。并且由于协调器直接与主备节点交流,进一步减少了延迟和消息数。此外,通过RDMA进行的读取验证可确保只读参与者主节点不占用CPU,并且对COMMIT-PRIMARY和COMMIT-BACKUP记录使用单向RDMA写操作可减少对远程CPU的等待,另外CPU也可以批处理和懒惰处理。

Failure recovery

FaRM中的故障恢复有以下五个阶段:故障检测,重新配置,事务状态恢复,批量数据恢复和分配器状态恢复。

Failure detection

FaRM使用租约来检测故障。除CM之外,每台计算机都在CM上拥有租约,而CM在其他每台计算机上都拥有租约。租约到期就会触发故障恢复,租约是通过3次握手来授予的。每台机器向CM发送一个租赁请求,并以一条消息作为响应,该消息既充当对该计算机的租赁授权,又充当CM的租赁请求。然后非CM的计算机回复该租赁请求就行。

为了确保高可用性,FaRM的租期非常短。FaRM使用了专门的队列来实现租约,以避免其它消息类型在共享队列中,影响了其的延迟。为了提高性能,避免为每台机器在CM上增加一个队列,FaRM使用无限带宽的技术发送和接受各种操作。

租约的续期是在CPU上实现的,FaRM使用了专门的租约管理器线程,该线程以最高的用户空间优先级运行。

Reconfiguration

重新配置协议将FaRM实例从一种配置移到另一种。以下是重新配置的时间图:

  1. 怀疑。当某个机器的租约在CM到期时,它将怀疑该机器发生了故障。然后屏蔽所有外部客户端请求。
  2. 探测。新的CM向配置中的所有机器发出RDMA读取,但被怀疑的机器除外。同时也怀疑任何读取失败的机器,新CM仅在获得大多数探测的响应后才继续进行重新配置,避免CM处于小分区。
  3. 更新配置。在收到对探针的答复后,新的CM尝试将存储在Zookeeper中的配置数据更新为⟨c+ 1,S,F和CMid⟩,其中c是当前配置标识符,S是已回复的探测器,F是计算机到故障域的映射,而CMid是其自身的标识符。
  4. 重新映射区域。新CM重新分配先前映射到故障机器的区域,以将副本数恢复到f+1。
  5. 发送新配置。重新映射区域后,CM将NEW-CONFIG消息发送到配置中的所有计算机,其中包含配置标识符,其自身的标识符,配置中其他计算机的标识符以及区域到计算机的所有新映射。
  6. 应用新配置。当机器收到配置标识符大于其自身配置的NEW-CONFIG时,它将更新其当前配置标识符及其区域映射的缓存副本,并分配空间以容纳分配给它的所有新区域副本。
  7. 提交新配置。一旦CM从配置中的所有计算机接收到NEW-CONFIG-ACK消息,它将等待以确保先前配置中授予该配置中的计算机的租约均已到期。然后,CM将NEW-CONFIG-COMMIT发送给所有配置成员,这些成员拿到了租约的授权;

Transaction state recovery

在配置更改之后,FaRM使用分布在因事务而修改对象副本所产生的日志来恢复事务状态。下图展示了事务恢复状态的timeline,FaRM通过在集群中的线程和机器之间分配工作来实现快速恢复。

  1. Block access to recovering regions.

当一个primary挂掉,backup会被配置选举成新的primary,此时所有对相关区域的访问都会被屏蔽,知道上图第四步完成,重新获取读写锁。

  1. Drain logs.

要确保跨配置的一致性,一般是拒绝来自旧配置的消息。但FaRM无法这样做,因为NIC会提交写入事务日志的COMMIT-BACKUP和COMMIT-PRIMARY记录,而不会考虑它们的发布配置。FaRM通过drain日志的方式解决这个问题,即在收到NEW-CONFIGCOMMIT消息时都会处理其日志中的所有记录。完成后,它们会将配置标识符记录在变量LastDrained中,配置标识符小于或等于LastDrained的事务日志记录将会被拒绝。

  1. Find recovering transactions.

所有机器必须就给定事务是否为恢复事务达成一致,FaRM通过在重新配置阶段在通信中附带一些额外的元数据来实现此目的。协调器读取每台计算机上的LastDrained变量,对于自LastDrained之后其映射被更改的每个区域r,CM都会在NEW-CONFIG消息中向该计算机发送两个配置标识符——LastPrimaryChange[r]和LastReplicaChange[r],分别是r的主备对象更改时的最后一个配置标识符,在配置c-1中开始提交的事务将在配置c中恢复。

用于恢复事务的记录可以分布在不同主数据库的日志中,以及由事务更新的备份机器中。region的每个备份都将NEED-RECOVERY消息与配置标识符,区域标识符以及更新该区域的恢复事务标识符一起发送给主数据库。

  1. Lock recovery.

每个region的primary都会一直等到本机的日志排干并且等待收到每台backup的NEED-RECOVERY消息,然后才去构建完整的恢复事务集合。然后,它通过其线程上的标识符对事务进行分片,以便每个线程t恢复具有协调器线程标识符t的事务状态。同时,主数据库中的线程并行地从尚未本地存储的备份中获取所有事务日志记录,然后锁定通过恢复事务修改的任何对象。

当某个区域的锁恢复完成时,该区域就处于活动状态,本地和远程协调器可以获得本地指针和RDMA引用。

  1. Replicate log records.

primary日志中的线程通过向backup发送缺失的事务的REPLICATE-TXSTATE消息来进行记录。该消息包含区域标识符,当前配置标识符以及与LOCK记录相同的数据。

  1. Vote.

正在恢复事务的协调器根据事务更新的每个区域的投票来决定是提交还是中止事务。

  1. Decide.

如果协调器收到来自任何region的commit-primary投票,则决定进行事务。否则,它将等待所有区域投票,如果至少一个区域对 commit-backup投票,而其他所有区域被事务投票锁定、提交备份或截断,则它将等待提交。

Recovering data

FaRM必须在某个region的新备份中恢复(重新复制)数据,以确保将来可以容忍复制失败。一个区域的新备份最初具有新分配的零区域副本。它将区域划分为多个工作线程,以并行方式恢复该工作线程。

在复制到备份之前,必须检查每个恢复的对象。如果对象的版本大于本地版本,则备份将通过比较和交换锁定本地版本,更新对象状态,然后将其解锁。

Recovering allocator state

FaRM分配器将区域划分为块(1 MB),用作分配小对象的slabs。它保留了两个元数据:块标头(包含对象的大小)和slab的空闲列表。

分配新块时,块头将复制到备份中。这样可确保它们在发生故障后在新的主数据库上可用。slab空闲列表仅保留在primary上,以减少对象分配的开销。

redis设计与实现——RDB持久化

发表于 2019-09-27

redis设计与实现——RDB持久化

由于Redis是内存数据库,在服务器进程退出时,服务器状态也会丢失不见,因此Redis提供了RDB持久化功能,可以帮助把内存中的数据库状态保存到磁盘里面,避免数据丢失。

RDB持久化既可以手动执行,也可以服务器配置定期执行,执行后会生成一个经过压缩的二进制RDB文件。

RDB文件的创建与载入

有两个Redis命令可以生成RDB文件——SAVE和BGSAVE,前者会阻塞Redis服务器进程,直到创建完RDB文件,后者则是fork出一个子进程来负责创建RDB文件。

在redis/src/rdb.c中存在实际创建RDB文件的函数rdbSave(),SAVE命令和BGSAVE命令都会以不同方式调用这个函数。

SAVE命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) { // 正在执行BGSAVE
addReplyError(c,"Background save already in progress");
return;
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
// 调用rdbSave保存文件
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
}
}

redis 的事件循环中会去检测redisServer的saveparams字段,判断是否执行BGSAVE,在执行完之后,子进程调用_exit()退出,避免因为父进程正在对文件进行操作而子进程直接回写文件缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();

start = ustime();
if ((childpid = fork()) == 0) { // Fork一个子进程
int retval;

/* Child */
closeListeningSockets(0); // 关闭子进程的监听
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi); // 调用rdbSave
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}

server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1); // 调用_exit(retcode);
} else {
/* Parent */
// 父进程会记录一些BGSAVE状态
}
return C_OK; /* unreached */
}

无论是SAVE还是BGSAVE,最终都需要调用rdbSave完成工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;

// 创建一个临时的rdb文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}

// 初始化 static const rio rioFileIO
rioInitWithFile(&rdb,fp);

if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

// 保存rdb文件
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

/* 使用原子性的重命名操作 */
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0; // 重设dirty属性
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;

werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}

由此可见,rdbSave的操作主要分为两步:

  • 先将数据写到一个临时文件——tmp-%d.rdb;
  • 调用原子性的重命名操作;

自动间隔保存

对于BGSAVE命令,Redis支持用户可以通过制定配置文件或者传入启动参数的方式设置save选项。

  • save 900 1:服务器在900秒之内,对数据库进行了至少一次修改;

redis支持多RDB配置,满足任意一个就可以触发BGSAVE。在redisServer结构体中,存在serverparams字段记录了save条件。该字段结构有两个field:时间和修改次数。

1
2
3
4
5
6
7
8
9
10
11
12
struct saveparam {
time_t seconds;
int changes;
};

struct redisServer {
// ...
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
// ...
}

Redis的服务器周期性操作函数serverCron默认每100ms执行一次,其中一项工作就是检查save选项设置的保存条件是否满足。除了需要检查是否满足在规定时间内操作数据库的次数,还要检查上一次bgsave是否成功,如果不成功的话,需要等待CONFIG_BGSAVE_RETRY_DELAY秒,默认是5秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren()) {
// 检查是否只在bgsave或者存在aof子进程
} else {
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;

// 检查多个触发条件
// 是否满足操作数和时间
// 上一次bgsave是否成功,如果不成功要等待CONFIG_BGSAVE_RETRY_DELAY秒, #define CONFIG_BGSAVE_RETRY_DELAY 5
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);// 调用bdsave
break;
}
}
}
// ...
}

RDB文件结构

本节主要介绍RDB的文件结构,具体的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10]; // 标识rdb文件
int j;
uint64_t cksum; // 校验和
size_t processed = 0;

if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
// RDB文件的开头,5字节的REDIS和四字节的RDB文件版本
// #define RDB_VERSION 9,当前是9
// 当格式更改不再兼容后向时,此数字将递增
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
// 遍历数据库,dump数据
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict; // 获取所有的键值对
if (dictSize(d) == 0) continue; // 保存非空数据库
di = dictGetSafeIterator(d);

// #define RDB_OPCODE_SELECTDB 254。保存一字节长,表示接下来会读入一个数据库号码,该号码可以使得服务器调用SELECT命令切换数据库
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;

// 写入一个 RESIZE DB 操作码,#define RDB_OPCODE_RESIZEDB 251
// 该数字只是一个重建哈希表的大小参考,不限制实际读取
// 接下来会写入键值对个数
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

/* 遍历该db,写入所有键值对*/
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;

initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
// 写入expire time, type, key, value
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;


if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}

// ....其它操作

/* 写入EOF #define RDB_OPCODE_EOF 255 /*
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
// 写入校验和
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;

werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}

因此RDB的文件结构可以总结为五个部分:

REDIS db_version Databases EOF check_sum
REDIS 0009 kv内容 255 8字节无符号整数

其中DataBase部分会保存多个非空数据库,总结可以分为三个部分,1字节长的标示码,整数的db序列号和键值对

RDB_OPCODE_SELECTDB | db_number | kv对

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

/* 带有过期时间的键值对保存 */
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}

/* 保存LRU信息 */
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}

/* 保存LFU信息 */
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);

if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}

/* 保存type,和键值对 */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;

/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
usleep(server.rdb_key_save_delay);

return 1;
}

键值对的保存结构是通过函数rdbSaveKeyValuePair()实现的,并且带有过期时间的键值对和不带有的都混在一起保存。其中如果有过期时间,则通过开头的RDB_OPCODE_EXPIRETIME_MS进行标示。至于保存类型则有其中,都是1字节长。key都是字符串对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST 1
#define RDB_TYPE_SET 2
#define RDB_TYPE_ZSET 3
#define RDB_TYPE_HASH 4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7
/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
#define RDB_TYPE_LIST_ZIPLIST 10
#define RDB_TYPE_SET_INTSET 11
#define RDB_TYPE_ZSET_ZIPLIST 12
#define RDB_TYPE_HASH_ZIPLIST 13
#define RDB_TYPE_LIST_QUICKLIST 14
#define RDB_TYPE_STREAM_LISTPACKS 15

结构就是:

RDB_OPCODE_EXPIRETIME_MS | ms | TYPE | key | value

或者:

TYPE | key | value

redis设计与实现——数据库

发表于 2019-09-17

redis设计与实现——数据库

服务器中的数据库

redis服务器将所有的数据库都保存在服务器状态redis.h/redisServer结构的db数组里,每一个redisDb代表一个数据库,redis默认创建16个数据库。

1
2
3
4
5
6
7
8
9
10
11
12
struct redisServer {
// ...
redisDb *db; // 保存着服务器中所有的数据库
int dbnum; /* Total number of configured DBs */
};

// 初始化db配置
#define CONFIG_DEFAULT_DBNUM 16
void initServerConfig(void) {
// ...
server.dbnum = CONFIG_DEFAULT_DBNUM;
}

切换数据库

由于每个Redis客户端都有自己的目标数据库,客户端通过SELECT命令来切换目标数据库。而server.h的结构体client中就有一个指向redisDb的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 同样client会含有指向db的指针
typedef struct client {
//...
redisDb *db; /*指向当前被选中的db */
} client;

// 通过select命令切换数据库
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
c->db = &server.db[id];
return C_OK;
}

目前没有命令可以获取当前db的index,但可以通过设置唯一名字并获取clientInfo的方法动态获取index:https://stackoverflow.com/questions/50534492/redis-how-to-get-current-database-name

数据库键空间

Redis是一个键值对数据库服务器,由上面可知每个数据库都由一个redisDb结构表示,其中redisDb的dict字段保存了数据库中的所有键值对。

1
2
3
4
5
6
7
8
9
10
typedef struct redisDb {
dict *dict; /* key空间 */
dict *expires; /* 国企高管时间 */
dict *blocking_keys; /* 客户端正在等待数据的key*/
dict *ready_keys; /* 接受了push命令的key */
dict *watched_keys; /* 被监控的key*/
int id; /* Database ID */
long long avg_ttl; /* 用来做统计,平均ttl */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

添加新键

添加新键值对,实际上就是将键值对添加到键空间字段中,key为字符串对象,值为任意一种类型的redis对象。

以set命令为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
// 如果设置了国旗时间则校验expire是否为数字
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
// 实现NX和XX两种添加方式
// #define OBJ_SET_NX (1<<0) /* Set if key not exists. */
//#define OBJ_SET_XX (1<<1) /* Set if key exists. */
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
return;
}
setKey(c->db,key,val);// 设置key
server.dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);//设置国旗时间
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);

在设置过期时间的操作中,可以看到,虽然key和expire是分开存放在redisDb结构体中的,但实际上两者指向了同一个对象。

1
2
3
4
5
6
7
8
9
void setExpire(redisDb *db, robj *key, long long when) {    // 设置过期时间
dictEntry *kde, *de;

/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr); // 找到对应的字典节点
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictReplaceRaw(db->expires,dictGetKey(kde)); // 将过期时间expire加入到dict,其中共用同一个字符串对象实例
dictSetSignedIntegerVal(de,when);
}

其他的删改查操作,都是在dict字段上面封装了一层。

设置键的生存时间或过期时间

设置过期时间

通过EXPIRE或PEXPIRE命令可以为key设置秒级或毫秒级的生存时间(Time To Live,TTL)。也可以用EXPRIEAT或者PEXPIREAT设置一个过期时间戳。事实上,这三个命令的底层实现都是通过时间戳的设置方式来完成过期时间设置的。

参考源码,这个函数是EXPIRE, PEXPIRE, EXPIREAT和PEXPIREAT四个命令的底层实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void expireGenericCommand(client *c, long long basetime, int unit) {
robj *key = c->argv[1], *param = c->argv[2];
long long when; /* 毫秒级别的unix时间戳 */

if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
return;

if (unit == UNIT_SECONDS) when *= 1000;
when += basetime;

/* No key, return zero. */
if (lookupKeyWrite(c->db,key) == NULL) {
addReply(c,shared.czero);
return;
}

// ....
}

保存过期时间

redisDb结构的expires字段保存了数据库中所有key的过期时间,这是一个字典,其key是一个指向某个键对象的指针,而value则是一个long long类型的整数,一个毫秒级别的unix时间戳。

移除过期时间

使用PERSIST命令可以移除一个键的过期时间,实际上就是删除expires字段中该键与过期时间的项。

1
2
3
4
5
6
7
8
9
10
11
12
13
void persistCommand(client *c) {
if (lookupKeyWrite(c->db,c->argv[1])) {
// 调用 dictDelete(db->expires,key->ptr) 删除
if (removeExpire(c->db,c->argv[1])) {
addReply(c,shared.cone);
server.dirty++;
} else {
addReply(c,shared.czero);
}
} else {
addReply(c,shared.czero);
}
}

计算并返回剩余生存时间

TTL和PTTL命令则是返回以秒为单位或者毫秒为单位的键剩余时间,实现比较简单,就是计算键的过期时间与当前时间之间的差。

过期键删除策略

Redis采用了两种删除策略:惰性删除和定期删除。其中,惰性删除是一种对CPU时间最友好的策略,程序只会在取出键时才会对键进行过期检查。

惰性删除策略的实现

过期键的惰性删除策略都必须要经常函数db.c/expireIfNeeded函数实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0; // key不存在,什么都不处理

if (server.masterhost != NULL) return 1; // 只对master库进行删除

/* 删除key */
server.stat_expiredkeys++;
// 当一个主库key被删除时,会向从库发一条del命令和被启动的AOF文件追加del
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}

int dbAsyncDelete(redisDb *db, robj *key) {
/* 删除expire字段的dict不会释放空间,因为该字典与主字典是共享内存 */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

// 如果对象很小,以惰性删除的方式实际上更慢
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);

/* 创建一个后台任务,添加对象到lazy free list里 */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}

/* 释放键值对 */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key);//集群删除
return 1;
} else {
return 0;
}
}

调用dictDelete的时候不会删除dict对象,只会删除expires对象,尽管它们公用key对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 同步删除,通过调用dictDelte实现
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}

// 在实现删除expire字段,但不删除共享key的实现上,主要利用了dict底层结构中的dictType字段,该字段定义了dict的各种操作。
typedef struct dictType { // 各种字典操作
unsigned int (*hashFunction)(const void *key); // 计算hash值的函数
void *(*keyDup)(void *privdata, const void *key); // 键复制
void *(*valDup)(void *privdata, const void *obj); // 值复制
int (*keyCompare)(void *privdata, const void *key1, const void *key2); // 键比较
void (*keyDestructor)(void *privdata, void *key); // 键销毁
void (*valDestructor)(void *privdata, void *obj); // 值销毁
} dictType;

// 而在初始化expires时,则将keyDestructor和valDestructor设置为了NULL

定期删除策略的实现

过期键的定期删除策略时由expire.c/activeExpireCycle()实现的,它会在规定的时间内,多次去遍历服务器中的各个数据库,从数据库的expires字段中随机抽查一部分键的过期时间。

AOF、ROB和复制功能对过期键的处理

  • 产生的新RDB文件和重写的AOF文件都不会包含已过期的键;
  • 当主服务器删除一个键之后,会向所有从服务器发del命令;
  • 当一个过期键被删除之后,服务器会追加一条del命令到现有AOF文件的末尾;
  • 从服务器即使发现过期键也不删除,而是等待master节点发来del命令;

redis设计与实现——对象

发表于 2019-09-03

redis设计与实现——对象

前面介绍了那么多数据结构,但redis并不是直接使用它们组成键值对,二是在上面封装了一层创建了一个对象系统。另外,redis的对象系统还实现了基于引用计数的内存回收机制和访问时间记录信息,从而能删除那些空转时长较大的key。

对象的类型与编码

redis使用对象来表示数据库中的key和value,redis的对象实现数据结构在src/server.h中:

1
2
3
4
5
6
7
8
9
10
11
12
#define LRU_BITS 24

// 使用bit field节省空间,https://www.geeksforgeeks.org/bit-fields-c/
typedef struct redisObject {
unsigned type:4; // 类型,4bit
unsigned encoding:4; // 编码,4bit
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount; // 引用计数
void *ptr; // 指向底层数据结构的指针
} robj;

类型

object的type字段用于记录对象的类型,分别是字符串、列表、哈希、集合和有序集合。对于redis保存的键值对来说,key总是字符串对象,而value则是上面所说的五种,可用TYPE命令获取对象类型。

1
2
3
4
5
#define OBJ_STRING 0    /* String object. */
#define OBJ_LIST 1 /* List object. */
#define OBJ_SET 2 /* Set object. */
#define OBJ_ZSET 3 /* Sorted set object. */
#define OBJ_HASH 4 /* Hash object. */

编码和底层实现

对象的ptr指针指向了对象的底层数据结构,但这些数据结构是由对象的encoding属性决定。encoding的取值如下:

1
2
3
4
5
6
7
8
9
10
11
#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */

除了OBJ_LIST之外,其它每种类型的对象至少使用了两种不同的编码,使得redis可以根据不同的使用场景来为一个对象设置不同的编码从而优化使用效率。

类型 编码 对象
OBJ_STRING OBJ_ENCODING_INT 用整数值实现的字符串对象
OBJ_STRING OBJ_ENCODING_EMBSTR 用embstr编码sds的字符串对象
OBJ_STRING OBJ_ENCODING_RAW 使用sds实现的字符串对象
OBJ_LIST OBJ_ENCODING_QUICKLIST 使用quicklist实现的列表对象
OBJ_HASH OBJ_ENCODING_ZIPLIST 使用压缩列表实现的哈希对象
OBJ_HASH OBJ_ENCODING_HT 使用字典实现的哈希对象
OBJ_SET OBJ_ENCODING_HT 使用哈希实现的集合对象
OBJ_SET OBJ_ENCODING_INSET 使用整数集合实现的集合对象
OBJ_ZSET OBJ_ENCODING_ZIPLIST 使用压缩列表实现的有序集合对象
OBJ_ZSET OBJ_ENCODING_SKIPLIST 使用跳表实现的有序集合对象

字符串对象

由上表可得,字符串的对象编码有三种:int, raw和embst。

当字符串是可以用long类型保存的整数,则转为long。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;

/* 确保是一个字符串对象 */
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

/* 使用某些特殊的编码方式编码raw和emstr */
if (!sdsEncodedObject(o)) return o;

/* 不对共享对象进行编码 */
if (o->refcount > 1) return o;

/* 编码字符串长度小于或等于20,且能够转换成long */
len = sdslen(s);
if (len <= 20 && string2l(s,len,&value)) {
/* 使用共享的整数数据,节省内存
* shared是server的共享数据,保存一些常用数据,
* 用户在使用这部分数据时不用新申请内存 */
if ((server.maxmemory == 0 ||
!(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS)
{
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else {
if (o->encoding == OBJ_ENCODING_RAW) {
sdsfree(o->ptr);
o->encoding = OBJ_ENCODING_INT; // 用int编码
o->ptr = (void*) value;
return o;
} else if (o->encoding == OBJ_ENCODING_EMBSTR) {
decrRefCount(o);
return createStringObjectFromLongLongForValue(value);
}
}
}

/* 对于保存的字符串值长度小于44的进行embstr编码 */
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
robj *emb;

if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
emb = createEmbeddedStringObject(s,sdslen(s));
decrRefCount(o);
return emb;
}

/* We can't encode the object...
*
* Do the last try, and at least optimize the SDS string inside
* the string object to require little space, in case there
* is more than 10% of free space at the end of the SDS string.
*
* We do that only for relatively large strings as this branch
* is only entered if the length of the string is greater than
* OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */
trimStringObjectIfNeeded(o);

/* Return the original object. */
return o;
}

如果字符串对象保存的字符串值小于或等于44,则用embstr编码的方式,否则用raw编码的方式。之所以选择44个字节,是因为使用了jemalloc,需要将embstr类型的字符串限定在64字节。而redis object占用了16个字节,当字符串长度小于44时sds会采用占用3字节的sdshdr8保存字符串,因此16+3+44=63,再加上字符串末尾的'\0',刚好是64。

1
2
3
4
5
6
7
8
9
10
11
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44
robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
return createEmbeddedStringObject(ptr,len);
else
return createRawStringObject(ptr,len);
}

robj *createRawStringObject(const char *ptr, size_t len) {
return createObject(OBJ_STRING, sdsnewlen(ptr,len));
}

其中embstr编码是用来保存短字符串的一种优化的编码方式,虽然其跟raw一样都是采用redisobject结构和sdshdr结构来保存字符串对象,但embstr是调用一次内存分配函数来分配一块连续的空间(raw是调用两次,空间不连续)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
robj *createEmbeddedStringObject(const char *ptr, size_t len) {
robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
struct sdshdr8 *sh = (void*)(o+1);

o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR;
o->ptr = sh+1;
o->refcount = 1;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}

sh->len = len;
sh->alloc = len;
sh->flags = SDS_TYPE_8;
if (ptr == SDS_NOINIT)
sh->buf[len] = '\0';
else if (ptr) {
memcpy(sh->buf,ptr,len);
sh->buf[len] = '\0';
} else {
memset(sh->buf,0,len+1);
}
return o;
}

通过将相对于raw两次的内存分配和释放次数降低到一次,并且保存了一块连续的内存空间,也很好地利用了缓存的优势。另外,该编码方式是创建一种unmodifiable string,redis不提供直接修改其的方法。要修改该字符串对象,只能先转为raw。

列表对象

在redis3.2.9之后,quicklist取代了ziplist和linkedlist,成为了列表对象的底层实现。创建一个新的列表对象:

1
2
3
4
5
6
robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate();
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}

由于列表对象只有一种编码方式,因此只是简单调用了quicklistCreate()。

哈希对象

哈希对象有两种编码方式:ziplist或者hashtable。默认是ziplist

1
2
3
4
5
6
robj *createHashObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}

为了探究其编码转换和插入生成哈希对象的方式,我们先来看hset命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void hsetCommand(client *c) {
int i, created = 0;
robj *o;

if ((c->argc % 2) == 1) {
addReplyError(c,"wrong number of arguments for HMSET");
return;
}
// 从db中查找或者创建一个哈希对象
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
hashTypeTryConversion(o,c->argv,2,c->argc-1);// 尝试转换编码

for (i = 2; i < c->argc; i += 2)
// 真正去添加新的键值对
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);

/* HMSET (deprecated) and HSET return value is different. */
char *cmdname = c->argv[0]->ptr;
if (cmdname[1] == 's' || cmdname[1] == 'S') {
/* HSET */
addReplyLongLong(c, created); // 通知客户端更改了多少个
} else {
/* HMSET */
addReply(c, shared.ok);
}
signalModifiedKey(c->db,c->argv[1]);// 通知数据变更
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);// 推送变更的订阅消息
server.dirty++;
}

首先来看hashTypeLookupWriteOrCreate。

1
2
3
4
5
6
7
8
9
10
11
12
13
robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);// 从db中查找
if (o == NULL) {
o = createHashObject();// 不存在则新创建一个
dbAdd(c->db,key,o);
} else {
if (o->type != OBJ_HASH) {
addReply(c,shared.wrongtypeerr);
return NULL;
}
}
return o;
}

接着来看hashTypeTryConversion,通过检查ptr对应sds长度是否比hash_max_ziplist_value更大,则转换到哈希编码的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define OBJ_HASH_MAX_ZIPLIST_VALUE 64
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;

if (o->encoding != OBJ_ENCODING_ZIPLIST) return;

for (i = start; i <= end; i++) {
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
}
}

hashTypeSet的作用是往哈希对象添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;

if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;

zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1); // 从跳跃表中查找对应的field
if (fptr != NULL) {
/* 拿到value对应的指针为止,在field之后 */
vptr = ziplistNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;

/* 删除当前的值 */
zl = ziplistDelete(zl, &vptr);

/* 插入新的值 */
zl = ziplistInsert(zl, vptr, (unsigned char*)value,
sdslen(value));
}
}

if (!update) {
/* 将field/value对插入到ziplist的尾部,其中filed在value的前面*/
zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
ZIPLIST_TAIL);
zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
ZIPLIST_TAIL);
}
o->ptr = zl;

/* 检查是否需要把ziplist编码转换为哈希编码,这是另一种转换编码的条件,如果哈希对象的键值对个数大于 512则需要转换编码*/
// #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
// 哈希编码的哈希对象其中每个键值对都是使用字典的键值对保存,并且key和value都是字符串对象
dictEntry *de = dictFind(o->ptr,field);
if (de) {
sdsfree(dictGetVal(de));
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = 1;
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
} else {
serverPanic("Unknown hash encoding");
}

/* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}

由此可见,当哈希对象同时满足以下两个条件才会使用ziplist编码:

  • 哈希对象保存的所有key/value的字符串长度都小于64个字节;
  • 哈希对象保存的键值对个数小于512个;

以上代码都在t_hash.c中。

集合对象

集合对象有两种编码方式:intset或者hashtable

以sadd命令对集合对象的编码方式做解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int isSdsRepresentableAsLongLong(sds s, long long *llval) {
return string2ll(s,sdslen(s),llval) ? C_OK : C_ERR;
}

robj *setTypeCreate(sds value) {
// 判断sds是否能用longlong表示
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject(); // 返回整数集合编码的对象
return createSetObject(); // 返回hash编码的对象
}

// 在src/object.c中实现
robj *createIntsetObject(void) {
intset *is = intsetNew(); // 底层数据结构 intset
robj *o = createObject(OBJ_SET,is);
o->encoding = OBJ_ENCODING_INTSET;
return o;
}

robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL); // 底层数据结构 字典
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}

void saddCommand(client *c) {
robj *set;
int j, added = 0;

set = lookupKeyWrite(c->db,c->argv[1]); // 寻找key对应对象
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr); // 新key则创建一个
dbAdd(c->db,c->argv[1],set);
} else {
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}

for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added; // add了多少次,添加到执行命令数量里
addReplyLongLong(c,added); 返回结果给客户端
}

而添加新的值的方式,则是通过调用setTypeAdd()实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* 如果集合对象的个数太多(默认是多于512),则转为哈希编码 */
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* 无法转为整数,则使用哈希编码 */
setTypeConvert(subject,OBJ_ENCODING_HT);

/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK); // 添加新的数据到字典
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}

由此可见,当集合对象的个数大于server.set_max_intset_entries(默认为512)或者集合对象保存了非整数值的元素,则需要使用哈希编码。否则可以用整数集合编码。

有序集合对象

有序集合zset有两种编码方式,一种是ziplist,另一种就是skiplist。

我们通过zadd命令来看,这两种编码的使用和转换方法,在src/t_zset.c实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
/* 前面有一系列参数的初始化,包括对客户端的响应,添加的参数统计等等
* 比如初始化 elements = c->argc-scoreidx; elements /= 2; */

/* 解析所有的score,保证事务,要么全部完成,要么就什么都不做 */
scores = zmalloc(sizeof(double)*elements); // 初始化分数值
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) // 把所有传递进的分值放到scores里
!= C_OK) goto cleanup;
}
/* 查找key是否存在 */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
// zset_max_ziplist_entries设置为0或者长度大于zset_max_ziplist_value(默认为64)
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
// 创建zset对象,该zset对象使用skiplist编码
zobj = createZsetObject();
} else {
// 创建ziplist编码的zset对象
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}

for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = flags;

ele = c->argv[scoreidx+1+j*2]->ptr;
// 往有序列表插入或者更新一个新的元素
int retval = zsetAdd(zobj, score, ele, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_ADDED) added++;
if (retflags & ZADD_UPDATED) updated++;
if (!(retflags & ZADD_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);

reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}

cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}

添加或者更新元素的主要实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (*flags & ZADD_INCR) != 0;
int nx = (*flags & ZADD_NX) != 0;
int xx = (*flags & ZADD_XX) != 0;
*flags = 0; /* We'll return our response flags. */
double curscore;

/* 检查分值是否为nan*/
if (isnan(score)) {
*flags = ZADD_NAN;
return 0;
}

/* 更具编码区更新有序列表,压缩列表 */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
// 元素存在
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}

/* 增加score */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* 删除后重新插入 */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* 优化: 检查元素是否太大,或者有序列表太长,如果满足了则进行转换 */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
// 有序列表长度超过zset_max_ziplist_entries(默认64)
// 元素的字符串长度超过zset_max_ziplist_entries(默认128)
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;

de = dictFind(zs->dict,ele);
if (de != NULL) {
/* 元素已经存在 */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);

/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}

/* Remove and re-insert when score changes. */
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* 并没有移除原来的元素,而是更新表示哈希表的字典 */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);//往跳跃表添加一个元素
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); // 往哈希表添加一个元素
*flags |= ZADD_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*flags |= ZADD_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}

由此可见,在创建skiplist编码的有序集合时,会创建一个zset对象。该zset包含一个字节和一个跳跃表。但两者只会存储一份数据,hashTable和skiplist共享元素的成员和分值。这样就可以保证在执行ZSCORE命令时,通过哈希表可以在O(1)的时间获取结果,而执行ZRANK,ZRANGE这些则可以用skiplist更快得到范围结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;

zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}

类型检查与命令多态

类型检查的实现

为了确保指定类型的键才可以执行某些特定的命令,在执行命令之前会先检查输入键的类型正确与否。

例如当我们使用LLEN命令,其会去检查操作对象是否为一个列表键。

1
2
3
4
5
void llenCommand(client *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return; // 检查类型是否LIST
addReplyLongLong(c,listTypeLength(o));
}

有些命令还需要检查对象的编码方式,然后根据不同的编码调用不同的函数。这就是命令多态的来源。

内存回收

C语言不能自动做垃圾回收,因此redis构造了一个引用计数的技术来做内存回收。即redisobject1中refcount字段。

  • 当创建新对象时,引用计数为1;
  • 当对象被一个新程序使用时,引用计数+1;
  • 当对象不再被一个程序使用时,引用计数-1;
  • 当对象的引用计数为0,对象所占用的内存被释放;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 递增引用计数
#define OBJ_SHARED_REFCOUNT INT_MAX
void incrRefCount(robj *o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount++;
}

// 递减引用计数
void decrRefCount(robj *o) {
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
case OBJ_MODULE: freeModuleObject(o); break;
case OBJ_STREAM: freeStreamObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--;
}
}

// 将引用计数设为0,但不释放对象。通常用于传递对象到一个新的函数里
// 例如:functionThatWillIncrementRefCount(resetRefCount(CreateObject(...)));
robj *resetRefCount(robj *obj) {
obj->refcount = 0;
return obj;
}

对象共享

为了节省内存,redis会创建一些特殊对象用于全局共享。例如redis会创建10000个字符串对象,包含了从0到9999的所有整数值。那么当服务器要用到这些对象时,会直接取出共享对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
void trimStringObjectIfNeeded(robj *o) {
// ....
if ((server.maxmemory == 0 ||
!(server.maxmemory_policy & MAXMEMORY_FLAG_NO_SHARED_INTEGERS)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS)
{
decrRefCount(o);//销毁原字符串对象
incrRefCount(shared.integers[value]);//共享对象引用计数+1
return shared.integers[value];//返回共享对象
}
// ...
}

在server.c中预先创建10000个对象。

1
2
3
4
5
6
7
8
9
10
#define OBJ_SHARED_INTEGERS 10000
void createSharedObjects(void) {
//....
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
shared.integers[j]->encoding = OBJ_ENCODING_INT;
}
//...
}

因此对于这些共享对象,服务器会默认持有一个引用计数。

对象的空转时长

这部分特性通过redisobject的lru属性实现,该字段记录了最后一次被命令程序访问的时间。

使用OBJECT命令可以访问key对象,但不会修改其的lru属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void objectCommand(client *c) {
robj *o;

if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
"ENCODING <key> -- Return the kind of internal representation used in order to store the value associated with a key.",
"FREQ <key> -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.",
"IDLETIME <key> -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.",
"REFCOUNT <key> -- Return the number of references of the value associated with the specified key.",
NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyLongLong(c,o->refcount);
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
} else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
/* LFUDecrAndReturn should be called
* in case of the key has not been accessed for a long time,
* because we update the access time only
* when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o));
} else {
addReplySubcommandSyntaxError(c);
}
}

robj *objectCommandLookup(client *c, robj *key) {
dictEntry *de;
// 直接到db去查找key
if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
return (robj*) dictGetVal(de);
}

通过源码可见,这个命令在访问key对象时,不会修改对象的lru属性,因为时直接到db去查找状态的。

而更新lru字段的处理需要经过db.c。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 底层级的查找
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);

// 当存在rdb和aof子进程运行时,不进行lru更新,避免不断地写副本
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
!(flags & LOOKUP_NOTOUCH))
{
// 使用lfu策略
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK(); // 更新lru时间
}
}
return val;
} else {
return NULL;
}
}

redis设计与实现——quicklist

发表于 2019-08-30

quicklist

概述

A doubly linked list of ziplists

根据quicklist.c的注释,这种数据结构是一个以ziplist为节点的双向链表。在redis3.2之后,quicklist取代了压缩列表和linkedlist,成为了列表对象的唯一编码形式。commit 记录

之所以这样设计,是因为原先的linkedlist由于各个节点都是单独的内存,很容易造成内存碎片;而对于压缩列表,由于其每次修改都会引发内存的重新分配,导致大量的内存拷贝。经过对时间和空间的折中,选择了quicklist这种方法。

数据结构

首先来看节点,

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct quicklistNode {
struct quicklistNode *prev; // 前一个节点
struct quicklistNode *next; // 后一个节点
unsigned char *zl; // ziplist结构,压缩的ziplist会指向一个quicklistLZF结构
unsigned int sz; /* ziplist的大小 */
unsigned int count : 16; /* ziplist的item个数*/
unsigned int encoding : 2; /* ziplist是否压缩,1没有压缩,2压缩*/
unsigned int container : 2; /* 目前固定为2,表示使用ziplist作为数据容器 */
unsigned int recompress : 1; /* 是否压缩,1表示压缩。有些命令需要做解压,因此用该标记以便后续压缩*/
unsigned int attempted_compress : 1; /* 暂时不用管,自动测试用的 */
unsigned int extra : 10; /* 扩展字段,目前还没被使用,刚好凑成32bit */
} quicklistNode;

然后quicklist这个结构体将上面节点表示连起来:

1
2
3
4
5
6
7
8
typedef struct quicklist {
quicklistNode *head; // 头部节点
quicklistNode *tail; // 尾部节点
unsigned long count; /* ziplist的item个数总和 */
unsigned long len; /* 节点个数 */
int fill : 16; /* 单个ziplist的大小设置 */
unsigned int compress : 16; /* 节点的压缩设置 */
} quicklist;

fill的设置与单个quicklistNode的大小有关,当该值为正数时,表示节点指向的ziplist的数据项个数,因此16bit可以最多表示32k的个数;当该值为负数时,表示单个节点最多存储大小。(-1:4kb, -2:8kb, -3:16kb, -4:32kb, -5:64kb)。默认是-2,8kb。

1
2
3
4
5
6
7
8
9
#define FILL_MAX (1 << 15)
void quicklistSetFill(quicklist *quicklist, int fill) {
if (fill > FILL_MAX) {
fill = FILL_MAX;
} else if (fill < -5) {
fill = -5;
}
quicklist->fill = fill;
}

创建

1
2
3
4
5
6
7
8
9
10
11
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;

quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL; // 不包含多余的头部节点
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2; // 默认的fill是-2,8kb
return quicklist;
}

插入push

push操作是通过quicklistpush实现的:

1
2
3
4
5
6
7
8
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) {
quicklistPushHead(quicklist, value, sz); // push头部
} else if (where == QUICKLIST_TAIL) {
quicklistPushTail(quicklist, value, sz); // push尾部
}
}

插入头部的函数,返回0表示已经存在头部,返回1表示创建了新的头部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#if __GNUC__ >= 3
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
// 判断ziplist大小是否超过限制
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 插数据到头部节点
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();//新建一个节点
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);// 新建一个ziplist并插入一个节点

quicklistNodeUpdateSz(node); // 更新节点的sz
_quicklistInsertNodeBefore(quicklist, quicklist->head, node); // 将该节点插入到原头部节点之前
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

检查ziplist的大小是否满足要求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)

REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;

int ziplist_overhead;
/* size of previous offset */
if (sz < 254)
ziplist_overhead = 1;
else
ziplist_overhead = 5;

/* size of forward offset */
if (sz < 64)
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;

/* 忽略sz被编码成整数的情况*/
unsigned int new_sz = node->sz + sz + ziplist_overhead;
// 检查fill为负数时,是否超过容量大小
// 五种情况:static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
else if (!sizeMeetsSafetyLimit(new_sz)) // fill为正数时,不能超过#define SIZE_SAFETY_LIMIT 8192,8kb
return 0;
else if ((int)node->count < fill) // fill为正数时,检查节点的ziplist项数
return 1;
else
return 0;
}

节点压缩

前面提到过,如果当前的节点需要进行压缩,zl数据指针将指向quicklistLZF结构体。

1
2
3
4
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes, compressed的长度*/
char compressed[]
} quicklistLZF;

具体的压缩操作函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Compress the ziplist in 'node' and update encoding details.
* Returns 1 if ziplist compressed successfully.
* Returns 0 if compression failed or if ziplist too small to compress. */
REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
#ifdef REDIS_TEST
node->attempted_compress = 1;
#endif

/* 小于 #define MIN_COMPRESS_BYTES 48 不进行压缩 */
if (node->sz < MIN_COMPRESS_BYTES)
return 0;

quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz); // 分配内存

/* 如果压缩失败,或者压缩尺寸不够,节省的空间不足8字节则取消 */
// #define MIN_COMPRESS_IMPROVE 8
if (((lzf->sz = lzf_compress(node->zl, node->sz, lzf->compressed,
node->sz)) == 0) ||
lzf->sz + MIN_COMPRESS_IMPROVE >= node->sz) {
/* lzf_compress aborts/rejects compression if value not compressable. */
zfree(lzf);
return 0;
}
lzf = zrealloc(lzf, sizeof(*lzf) + lzf->sz); // 重新分配内存
zfree(node->zl); // 释放原来的节点
node->zl = (unsigned char *)lzf;
node->encoding = QUICKLIST_NODE_ENCODING_LZF;
node->recompress = 0;
return 1;
}

具体lzf压缩算法,可以参考lzf_compress函数

<i class="fa fa-angle-left"></i>1…456…28<i class="fa fa-angle-right"></i>

278 日志
29 标签
RSS
© 2025 LucienXian
由 Hexo 强力驱动
主题 - NexT.Pisces