写点什么

顺序、时钟与分布式系统

作者:
  • 2022 年 9 月 25 日
    北京
  • 本文字数:9081 字

    阅读完需:约 30 分钟

顺序、时钟与分布式系统

Ordering

现实生活中时间可以记录事情发生的时刻、比较事情发生的先后顺序。


分布式系统的一些场景也需要记录和比较不同节点间事件发生的顺序。如数据写入先后顺序,事件发生的先后顺序等等。

关系

复习下离散数学中关系:


假设 A 是一个集合 {1,2,3,4} ;R 是集合 A 上的关系,例如{<1,1>,<2,2>,<3,3>,<4,4>,<1,2>,<1,4>,<2,4>,<3,4>}


  • 自反性:任取一个 A 中的元素 x,如果都有<x,x>在 R 中,那么 R 是自反的。

  • <1,1>,<2,2>,<3,3>,<4,4>

  • 对称性:任取一个 A 中的元素 x,y,如果<x,y> 在关系 R 上,那么<y,x> 也在关系 R 上,那么 R 是对称的。

  • 反对称性:任取一个 A 中的元素 x,y(x!=y),如果<x,y> 在关系 R 上,那么<y,x> 不在关系 R 上,那么 R 是反对称的。

  • 对于 <1,2>,有 <2,1> 不在 R 中;对于<2,4> 有<4,2>不在 R 中;对于<3,4> 有<4,3> 不在 R 中,满足。

  • 传递性:任取一个 A 中的元素 x,y,z,如果<x,y>,<y,z> 在关系 R 上,那么 <x,z> 也在关系 R 上,那么 R 是对称的。

  • <1,1><1,2>在 R 中,并且<1,2>在 R 中;<1,1><1,4>在 R 中,并且<1,4>在 R 中;<2,2><2,4>在 R 中,并且<2,4>在 R 中;<3,3><3,4>在 R 中,并且<3,4>在 R 中;等等其他,满足。

  • 完全性(全关系):包含了自反性;对集合 A 中所有<x,y>,都有关系 x 到 y 或 y 到 x;

  • R 中并没有<1, 3>,所以不满足完全性

偏序 The Partial Ordering

集合内只有部分元素之间是可以比较的。


偏序关系的定义(R 为 A 上的偏序关系):设 R 是集合 A 上的一个二元关系,若 R 满足:


  • 反对称性:对任意 x,y∈A,若 xRy,且 yRx,则 x=y;

  • 传递性:对任意 x, y,z∈A,若 xRy,且 yRz,则 xRz

  • 自反性:对任意 x∈A,有 xRx;


一个 partitial ordering 关系满足的条件是自反的,反对称的和可传递的,因此在 partitial ordering 中,可能有两个元素之间是不相关的。

全序 The Total Ordering

集合内只有部分元素之间是可以比较的。


比如:比如复数集中并不是所有的数都可以比较大小,那么“大小”就是复数集的一个偏序关系。


全序关系的定义:


  • 反对称性:对任意 x,y∈A,若 xRy,且 yRx,则 x=y;

  • 传递性:对任意 x, y,z∈A,若 xRy,且 yRz,则 xRz

  • 完全性(total relation 全关系):对任意 x,y∈A,由 xRy 或 yRx (包括了自反性)


完全性本身也包括了自反性,所以全序关系是偏序关系。


所以偏序中满足完全性就是全序了。


一个 total ordering 关系满足的条件是反对称的,可传递的和完全性,因此在 total ordering 中,两个元素一定是有关系的,要么是 a<>b 或 b<>a。

happens before

在分布式系统中,一个进程包含一系列的事件,对于同一进程内的事件,如果 a happens before b,那么 a 发生在 b 之前。并且,假定收或发消息都是一个事件。


happens before 的定义如下(用->表示)


  • 如果 a 和 b 在同一进程中,并且 a 发生在 b 之前,那么 a->b

  • 如果 a 是一个进程发消息的事件,b 是另一个进程接收这条消息的事件,则 a->b

  • 如果 a->b 且 b->c,那么 a->c。

  • 如果同时不满足 a->b,且 b->a,那么说 a 和 b 是并发的 concurrent



[图来自 Time, Clocks, and the Ordering of Events in a Distributed System]


以一个例子来说明 happens before 关系,如上图,垂直线上代表一个进程,从下往上,时间依次增加,水平的距离代表空间的隔离。原点代表一个事件,而曲线代表一条消息。


从图中很容易地看出,如果一个事件 a,能通过进程的线和消息线,到达 b,那么 a->b。


在图中,p3 和 q4 是并行的事件,因为,只有到了 p4 才能确定 q4 的发生,而 q3 也只能确定 p1 发生。

clock 时钟

物理时钟

晶振和时钟偏移

计算机有固定频率晶体的震荡次数,晶体的振荡周期决定了单机的时钟精度。


时钟频率也可能因为温度等外部因素导致时钟偏移,普通的石英晶体的漂移大约


原子钟的漂移约为 所以原子钟精度远远高于石英晶体。

分布式下带来的问题

不同机器上的物理时钟难以同步,导致无法区分在分布式系统中多个节点的事件时序。即使设置了 NTP 时间同步节点间也存在毫秒级别的偏差,因而分布式系统需要有另外的方法记录事件顺序关系。


1978 年 Lamport 在《Time, Clocks and the Ordering of Events in a Distributed System》中提出了逻辑时钟的概念,来解决分布式系统中区分事件发生的时序问题。

逻辑时钟 Logical clocks

逻辑时钟指的是分布式系统中用于区分事件的发生顺序的时间机制。 从某种意义上讲,现实世界中的物理时间其实是逻辑时钟的特例。


Logical Clock 解决的问题是找到一种方法,给分布式系统中所有时间定一个序,这个序能够正确地排列出具有因果关系的事件(注意,是不能保证并发事件的真实顺序的),使得分布式系统在逻辑上不会发生因果倒置的错误。因果一致性

Lamport timestamps

论文

Time, Clocks, and the Ordering of Events in a Distributed System

Lamport timestamps

Leslie Lamport 在 1978 年提出逻辑时钟的概念,并描述了一种逻辑时钟的表示方法,这个方法被称为 Lamport 时间戳(Lamport timestamps)。


分布式系统中按是否存在节点交互可分为三类事件:


  • 发生在节点内部

  • 发送事件

  • 接收事件


时钟的定义如下


  • 对于一个进程 i,Ci(a)表示进程 i 中事件 a 的发生时间

  • 对于整个系统来讲,对于任意的事件 b,其发生时间为 C(b),当 b 为进程 j 的事件时,则 C(b) = Cj(b)为了使得事件按照正确的排序,需要使得如果事件 a 发生在事件 b 之前,那么 a 发生的时间要小于 b,如下


for any events a, bif a->b then C(a) < C(b)
复制代码


根据关系->的定义,我们可以得出


  • 如果 a 和 b 都是进程 i 中的事件,且 a 发生在 b 之前,那么 Ci(a) < Ci(b)

  • 如果事件 a 发送消息给事件 b,a 属于进程 i,b 属于进程 j,那么 Ci(a) < Cj(b)



为了让系统满足上述条件,在实现中,需要满足以下原则


  • 对于每个进程,相邻的事件的时钟要增加 1

  • (a) 如果事件 a 是进程 i 发送消息 m 的事件,发送时带时间戳 Tm = Ci(a),(b)事件 b 是进程 j 接受消息 m 的事件,那么事件 b 的取值为 max(进程 b 的当前时钟,Tm+1)


假设有事件 a、b,C(a)、C(b)分别表示事件 a、b 对应的 Lamport 时间戳,如果 a->b,则 C(a) < C(b),a 发生在 b 之前(happened before)。


所以 Lamport timestamps 原理如下:


  • 每个事件对应一个 Lamport 时间戳,初始值为 0

  • 如果事件在节点内发生,时间戳加 1

  • 如果事件属于发送事件,时间戳加 1 并在消息中带上该时间戳

  • 如果事件属于接收事件,时间戳 = Max(本地时间戳,消息中的时间戳) + 1


通过该定义,事件集中 Lamport 时间戳不等的事件可进行比较,我们获得事件的偏序关系(partial order)。



上图更形象的解释了事件之间的关系。


以 B4 事件为基准:


  • B4 左边深灰色的区域的事件,都发生在 B4 前,和 B4 具有因果关系,这些事件属于与 B4 因果关系中的因(cause)

  • B4 右边的深红色区域的事件,都发生在 B4 后,和 B4 具有因果关系,这些事件属于与 B4 因果关系中的果(effect)

  • B4 上下的白色区域是跟 B4 无关的事件,可以认为是并发关系(concurrent)

  • 在浅灰色和浅红色区域中的事件,C2、A3 两个事件与 B4 是并行关系,根据 Lamport timestamps 的定义,将他们判定为与 B4 具前后关系。(所以 Lamport timestamps 并不能严格的表示并行关系)

Lamport timestamps 与偏序关系

Lamport timestamps 只保证因果关系(偏序)的正确性,不保证绝对时序的正确性。

Lamport logical clock

由于 Lamport timestamps 只能得到偏序关系,如果要得到全序关系,就需要给 Ci(a) = Cj(b)的事件定一个先后顺序。


total order 的事件关系=>定义如下:


如果事件 a 发生在进程 Pi,事件 b 发生在进程 Pj,那么当满足下列两者条件之一时,a=>b


  • Ci(a) < Cj(b)

  • Ci(a) = Cj(b) 且 Pi < Pj


根据以上条件,对于任意的两个事件,都能判断出它们之间的关系,因此是 total ordering 的。


当 Lamport timestamp 一致时,通过义 Pi < Pj 来定义顺序,确保分布式场景下各个进程间发生的事件的全序定义。至于 Pj < Pj:可采用不同的方式,Lamport Logical Clock 提到的 arbitrary total ordering。

vector clock

Lamport timestamp 得到的是全序关系,但无法严格表示对于没有因果关系、存在同时发生关系(concurrent)的事件。


Vector clock 是在 Lamport timestamp 基础上改进的一种逻辑时钟方法,它构不但记录本节点的 Lamport timestamp,同时也记录了其他节点的 Lamport timestamp。


原理如下:


  • 本地 vector clock 的 clock 数组中每一个逻辑时间(clock)对应一个进程的 clock

  • 初始化 vector clock 中每一个逻辑时间为 0;

  • 每一次处理内完内部事件,将 vector clock 中自己的逻辑时间戳+1;

  • 每发送一个消息的时候,将 vector clock 中自己的逻辑时间+1,且将其和消息一起发送出去

  • 每接收到一个消息的时候,需要将本地的 vector clock 中自己的逻辑时间戳+1,且将自己 vector clock 中的逻辑时间和消息中携带的进行比较,取最大的更新本地 vector clock 中的逻辑时间。



图来源于wikipedia


vector clock 判定并发关系:


  • 事件 i、事件 j 对应的 vector clock 中,每一个进程 Pk 的逻辑时间戳都满足 Vi[Pk]<Vj[Pk]时,我们称事件 i happen before 事件 j;

  • vector clock 中,存在 P1、P2,使得 Vi[P1]<Vj[P1],Vi[P2]>Vj[P2],我们称事件 i 和事件 j 是并发关系(没有因果关系);


和之前 lamport timestamp 的一样,以 B4 事件为基准(vector clock 为[A:2,B:4,C:1]),根据 vector clock 的判定,可以判断出


  • 灰色区域的事件 happens before B4 事件,B4 事件 happens before 红色区域的事件

  • 白色区域与 B4 事件没有因果关系。


特性:


  • vector clock 不需要在节点之间同步时钟,不需要在所有节点上维护一段数据的版本数;

  • 缺点是时钟值的大小随着节点增多和时间不断增长

version vector

分布式系统多个副本被同时更新时,会导致副本之间数据的不一致。version vector 用于来发现这些不一致的冲突。


version vector 只能发现冲突,无法解决冲突;当然也可以通过再添加一个维度信息 timestamp,发生冲突时进行比较,但是又回到了物理时钟不同步的问题。


下图展示了数据由不同副本处理后导致的不同版本冲突。


D5 时发现了数据的冲突,这时会将不同版本数据都存储下来,一般由客户端来解决冲突。



version vector 与 vector clock 的差异


  • vector clocks 使用 receive 和 send 方法来更新 clock,而 version vector 使用 sync 方法来更新。

  • vector clocks 是给事件定序的,确定事件的因果关系;而 version vector 是确定同一个数据不同版本的因果关系。

分布式与时钟

分布式系统中,每个节点的物理时钟是不同步的,都有一定的差异。


这样就带来了一些分布式系统实现的难题,如基于 MVCC 实现的事务,基于 MVCC 实现事务会要求版本之间能判断先后顺序,只有确定先后才知道应该用哪一个版本的数据,确定先后顺序就涉及到时间,而不同机器之间的本地时钟是无法保证一致的,所以这就需要确保时钟的同步。


而通常解决方案有两种:


  • 中心化的时钟方案,如 Timestamp oracle(TSO)

  • 无中心化的时钟方案,如 google True Time,Hybrid Logic Time

Timestamp oracle

如果我们整个系统不复杂,而且没有跨全球的需求,这时用一台中心授时服务就可以了。


如 TiDB 使用的就是 TSO 方案,tipb 作为一个 TSO 集群,来提供授时服务。


使用 TSO 的好处在于因为只有一个中心授时,所以我们一定能确定所有时间的时间,但 TSO 需要关注几个问题:


  • 网络延时:因为所有的事件都需要从 TSO 获取时间,所以 TSO 只适合小集群部署,不能是那种全球级别的数据库

  • 性能:每个事件都需要从 TSO 获取时间,所以 TSO 需要非常高的性能

  • 容错:TSO 是一个单点,需要考虑节点的 failover

True Time

由于节点间 NTP 是有偏差的,且可能出现时间回退的情况,所以 NTP 无法准确的判定事件的全序关系。在 Google Spanner 里面,通过引入 True Time 来解决了分布式时间问题。

True Time 实现

Spanner 通过使用 GPS + 原子钟 atomic clock 来对集群的机器时间进行校对,保证了集群机器的时间戳差距不会超过一个上限值(ε)。


用两种技术来处理,是因为导致这两种技术的失败的原因是不同的。


  • GPS 会有一个天线,电波干扰会导致其失灵。原子钟很稳定。

  • 当 GPS 失灵的时候,原子钟仍然能保证在相当长的时间内,不会出现偏差。

API

  • TT.now() : 返回一个当前时间,其位于范围区间[earliest,latest]

  • TT.after(t) : 当前时间是否在 t 之后

  • TT.before(t) : 当前时间是否在 t 之前


虽然 spanner 引入了 TrueTime 可以得到全球范围的时序一致性,但由于 TrueTime 返回的时间仍然有一定的偏差,如果要给两个事件定序,就需要等待 2 个偏差的时间间隔,来确保其先后顺序。


  • 事件 a:[Tai, Taj], Taj-Tai=ε

  • 事件 b:[Tbi, Tbj], Tbj-Tbi=ε

  • 所以要确定 b>a, 那么就要确保 Tbi > Taj, 就需要在事件 b 进行等待,以确保:事件b时间 - 事件a时间 > 2ε

Hybrid logical clock

HLC

Logical Physical Clocks and Consistent Snapshots in Globally Distributed Databases


TrueTime 需要硬件的支持,所以有一定的成本,而 HLC 无需硬件支持也能解决分布式下时间问题。


HLC 同时使用了物理时钟和逻辑时钟(physical clock + logical clock),能够保证单点的时间发生器是单调递增的,同时能够尽量控制不同节点之间的时钟偏差在规定的偏差范围内。


判断两个事件的先后顺序:先判断物理时间,再判断逻辑时间。

HLC 的算法

l.j 维护的是节点 j 当前已知的最大的物理时间(wall time),c.j 则是当前的逻辑时间。


// 在节点j上面:初始化: l.j = 0,c.j = 0。initially l.j :=0; c.j := 0
// 本地事件或者发送消息时,// 如果本地时钟pt大于当前的混合逻辑时钟的l,// 则将l更新成本地时钟,将c清零。// 否则,l保持不变,将c加1。Send or local event{ l'.j := l.j; l.j := max(l'.j, pt.j); // 本地物理时间pt if (l.j = l'.j) then c.j := c.j+1 else c.j := 0; Timestamp with l.j, c.j}
// 收到消息时// l在 当前的逻辑时钟的l、机器的本地时钟pt、收到消息里面带的l,三者中取最大的。// 如果l部分是更新为本地时钟了,则将c清零。否则,c取较大的那个l对应到的c加1。Receive event of message m{ l'.j := l.j; l.j := max(l'.j, l.m, pt.j); if (l.j = l'.j = l.m) then c.j := max(c.j, c.m) + 1 elseif (l.j=l'.j) then c.j := c.j + 1 elseif (l.j=l.m) then c.j := c.m + 1 else c.j := 0 Timestamp with l.j, c.j}
复制代码

特性

HLC 算法保证了 HLC 时间有如下特性:


  • 事件 e 发生在事件 f 之前,那么事件 e 的 HLC 时间一定小于事件 f 的 HLC 时间:(l.e, c.e) < (l.f, c.f)

  • 本地 WallTime 大于等于本地物理时间(l.e ≥ pt.e):HLC 时间总是不断递增,不会随着物理时间发生回退。

  • 对事件 e,l.e 是事件 e 能感知的到的最大物理时间值:如果 l.e > pt.e,那么一定存在着一个发生在 e 之前的事件 g,有 pt.g=l.e。简单来说是如果出现 l.e > pt.e 肯定是因为有一个 HLC 时间更大的的节点把当前节点的 HLC 时间往后推了。

  • WallTime 和物理时钟的偏差是有界的(ε ≥ |pt.e - l.e| ):因为节点之间通过 NTP 服务校时,那么节点之间的物理时钟偏差一定小于某个值ε。那么对于任一事件 b 和 e,如果 b hb e,那么事件 b 的物理时间 pt.b 一定满足 pt.e + ε ≥ pt.b。结合特性 3 存在一个事件 g 满足,l.e = pt.g。那么 pt.e + ε ≥ l.e=pt.g > pt.e。

开源实现

CockroachDB 采用基于 NTP 时钟同步的 HLC 去中心化方案。

时钟同步

所有节点间的 RPC 消息都会把时间戳带入到消息中,接收到消息的节点会通过消息中的时间戳更新自己的时间, 从而达到节点间时间同步的效果。

代码分析

参考:https://github.com/cockroachdb/cockroach/blob/v1.1.3/pkg/util/hlc/hlc.go


HLC 定义


// Timestamp represents a state of the hybrid logical clock.type Timestamp struct {    // Holds a wall time, typically a unix epoch time    // expressed in nanoseconds.    WallTime int64 `protobuf:"varint,1,opt,name=wall_time,json=wallTime" json:"wall_time"`    // The logical component captures causality for events whose wall    // times are equal. It is effectively bounded by (maximum clock    // skew)/(minimal ns between events) and nearly impossible to    // overflow.    Logical int32 `protobuf:"varint,2,opt,name=logical" json:"logical"`}
复制代码


  • WallTime:本地已知物理时钟

  • Logical:逻辑时钟

  • Timestamp:HLC,单调递增


获取物理时钟


// PhysicalNow returns the local wall time. It corresponds to the physicalClock// provided at instantiation. For a timestamp value, use Now() instead.func (c *Clock) PhysicalNow() int64 {    c.mu.Lock()    defer c.mu.Unlock()    return c.getPhysicalClockLocked()}
// getPhysicalClockLocked returns the current physical clock and checks for// time jumps.func (c *Clock) getPhysicalClockLocked() int64 { // physicalClock 就是 UnixNano newTime := c.physicalClock()
if c.mu.lastPhysicalTime != 0 { interval := c.mu.lastPhysicalTime - newTime // 检查时钟是否回退 if interval > int64(c.maxOffset/10) { c.mu.monotonicityErrorsCount++ log.Warningf(context.TODO(), "backward time jump detected (%f seconds)", float64(-interval)/1e9) } }
c.mu.lastPhysicalTime = newTime return newTime}

// UnixNano returns the local machine's physical nanosecond// unix epoch timestamp as a convenience to create a HLC via// c := hlc.NewClock(hlc.UnixNano, ...).func UnixNano() int64 { return timeutil.Now().UnixNano()}
复制代码


获取当前 HLC 时钟


// Now returns a timestamp associated with an event from// the local machine that may be sent to other members// of the distributed network. This is the counterpart// of Update, which is passed a timestamp received from// another member of the distributed network.func (c *Clock) Now() Timestamp {    c.mu.Lock()    defer c.mu.Unlock()    if physicalClock := c.getPhysicalClockLocked(); c.mu.timestamp.WallTime >= physicalClock {        // The wall time is ahead, so the logical clock ticks.        c.mu.timestamp.Logical++    } else {        // Use the physical clock, and reset the logical one.        c.mu.timestamp.WallTime = physicalClock        c.mu.timestamp.Logical = 0    }    return c.mu.timestamp}
复制代码


  • 如果当前物理时钟小于 WallTime,则将逻辑时钟+1

  • 如果当前物理时钟大于 WallTime,则更新 WallTime 为当前物理时钟,且将逻辑时钟设置为 0


节点时钟同步


节点之间通过在 RPC 请求中携带 HLC 时间来进行时钟同步。


// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.func (ds *DistSender) sendSingleRange(    ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor,) (*roachpb.BatchResponse, *roachpb.Error) {    ......        br, err := ds.sendRPC(ctx, desc.RangeID, replicas, ba)    if err != nil {        log.ErrEvent(ctx, err.Error())        return nil, roachpb.NewError(err)    }
// If the reply contains a timestamp, update the local HLC with it. if br.Error != nil && br.Error.Now != (hlc.Timestamp{}) { ds.clock.Update(br.Error.Now) } else if br.Now != (hlc.Timestamp{}) { ds.clock.Update(br.Now) } ......}

// Update takes a hybrid timestamp, usually originating from// an event received from another member of a distributed// system. The clock is updated and the hybrid timestamp// associated to the receipt of the event returned.// An error may only occur if offset checking is active and// the remote timestamp was rejected due to clock offset,// in which case the timestamp of the clock will not have been// altered.// To timestamp events of local origin, use Now instead.func (c *Clock) Update(rt Timestamp) Timestamp { c.mu.Lock() defer c.mu.Unlock() // 如果本地物理时间pt physicalClock := c.getPhysicalClockLocked()
// 大于本地WallTime且大于rt.WallTime: // 更新本地WallTime=pt,且logical=0 if physicalClock > c.mu.timestamp.WallTime && physicalClock > rt.WallTime { // Our physical clock is ahead of both wall times. It is used // as the new wall time and the logical clock is reset. c.mu.timestamp.WallTime = physicalClock c.mu.timestamp.Logical = 0 return c.mu.timestamp }
// In the remaining cases, our physical clock plays no role // as it is behind the local or remote wall times. Instead, // the logical clock comes into play. // 如果rt.WallTime > 本地WallTime: // 检查rt.WallTime与pt是否大于时钟偏差; // 本地WallTime=rt.WallTime,logical++ if rt.WallTime > c.mu.timestamp.WallTime { offset := time.Duration(rt.WallTime-physicalClock) * time.Nanosecond if c.maxOffset > 0 && offset > c.maxOffset { log.Warningf(context.TODO(), "remote wall time is too far ahead (%s) to be trustworthy - updating anyway", offset) } // The remote clock is ahead of ours, and we update // our own logical clock with theirs. c.mu.timestamp.WallTime = rt.WallTime c.mu.timestamp.Logical = rt.Logical + 1 } else if c.mu.timestamp.WallTime > rt.WallTime { // 如果本地WallTime>rt.WallTime:logical++ // Our wall time is larger, so it remains but we tick // the logical clock. c.mu.timestamp.Logical++ } else { // Both wall times are equal, and the larger logical // clock is used for the update. if rt.Logical > c.mu.timestamp.Logical { c.mu.timestamp.Logical = rt.Logical } c.mu.timestamp.Logical++ } return c.mu.timestamp}
复制代码

参考


发布于: 刚刚阅读数: 3
用户头像

关注

还未添加个人签名 2018.06.12 加入

还未添加个人简介

评论

发布
暂无评论
顺序、时钟与分布式系统_分布式_楚_InfoQ写作社区