前言
Google 能成为散播大数据火种的人,是有着历史的必然性的。作为一个搜索引擎,Google 在数据层面,面临着比任何一个互联网公司都更大的挑战。无论是 Amazon 这样的电商公司,还是 Yahoo 这样的门户网站,都只需要存储自己网站相关的数据。而 Google,则是需要抓取所有网站的网页数据并存下来。而且光存下来还不够,早在 1999 年,两个创始人就发表了 PageRank 的论文,也就是说,Google 不只是简单地根据网页里面的关键字来排序搜索结果,而是要通过网页之间的反向链接关系,进行很多轮的迭代计算,才能最终确认排序。而不断增长的搜索请求量,让 Google 还需要有响应迅速的在线服务。
- GFS 的论文发表于 2003 年,它主要是解决了数据的存储问题。作为一个上千节点的分布式文件系统,Google 可以把所有需要的数据都能很容易地存储下来。
- 光存下来还不够,我们还要基于这些数据进行各种计算。这个时候,就轮到 2004 年发表的 MapReduce 出场了。通过借鉴 Lisp,Google 利用简单的 Map 和 Reduce 两个函数,对于海量数据计算做了一次抽象,这就让“处理”数据的人,不再需要深入掌握分布式系统的开发了。而且他们推出的 PageRank 算法,也可以通过多轮的 MapReduce 的迭代来实现。
- 这样,无论是 GFS 存储数据,还是 MapReduce 处理数据,系统的吞吐量都没有问题了,因为所有的数据都是顺序读写。但是这两个,其实都没有办法解决好数据的高性能随机读写问题。因此,面对这个问题,2006 年发表的 Bigtable 就站上了历史舞台了。
- 到这里,GFS、MapReduce 和 Bigtable 这三驾马车的论文,就完成了“存储”“计算”“实时服务”这三个核心架构的设计。不过你还要知道,这三篇论文其实还依赖了两个基础设施。
- 第一个是为了保障数据一致性的分布式锁。Google 在发表 Bigtable 的同一年,就发表了实现了 Paxos 算法的 Chubby 锁服务的论文
- 第二个是数据怎么序列化以及分布式系统之间怎么通信。
GFS论文
在 《The Google File System》这篇论文发表之前,工业界的分布式系统最多也就是几十台服务器的 MPI 集群。而这篇 GFS 的论文一发表,一下子就拿出了一个运作在 1000 台服务器以上的分布式文件系统。并且这个文件系统,还会面临外部数百个并发访问的客户端,可以称得上是石破天惊。当然,在 18 年后的今天,开源社区里的各种分布式系统,也都远比当初的 GFS 更加复杂、强大。回顾这篇 18 年前的论文,GFS 可以说是“技术上辉煌而工程上保守”。说 GFS 技术上辉煌,是因为 Google 通过廉价的 PC 级别的硬件,搭建出了可以处理整个互联网网页数据的系统。而说 GFS 工程上保守,则是因为 GFS 没有“发明”什么特别的黑科技,而是在工程上做了大量的取舍(trade-off)。
GFS 定了三个非常重要的设计原则
- 以工程上“简单”作为设计原则。GFS 直接使用了 Linux 服务上的普通文件作为基础存储层,并且选择了最简单的单 Master 设计。单 Master 让 GFS 的架构变得非常简单,避免了需要管理复杂的一致性问题。不过它也带来了很多限制,比如一旦 Master 出现故障,整个集群就无法写入数据,而恢复 Master 则需要运维人员手动操作,所以 GFS 其实算不上一个高可用的系统。但另外一方面,GFS 还是采用了 Checkpoints、操作日志(Operation Logs)、影子 Master(Shadow Master)等一系列的工程手段,来尽可能地保障整个系统的“可恢复(Recoverable)”,以及读层面的“可用性(Availability)”。 The Google File System (一): Master的三个身份
- 根据硬件特性来进行设计取舍。2003 年,大家都还在用机械硬盘,随机读写的性能很差,所以在 GFS 的设计中,重视的是顺序读写的性能,对随机写入的一致性甚至没有任何保障。 The Google File System (二):如何应对网络瓶颈?
- 根据实际应用的特性,放宽了数据一致性(consistency)的选择。GFS 是为了在廉价硬件上进行大规模数据处理而设计的。所以 GFS 的一致性相当宽松。GFS 本身对于随机写入的一致性没有任何保障,而是把这个任务交给了客户端。对于追加写入(Append),GFS 也只是作出了“至少一次(At Least Once)”这样宽松的保障。The Google File System (三): 多写几次也没关系
本质上,GFS 是对上千台服务器、上万块硬盘的硬件做了一个封装,让 GFS 的使用者可以把 GFS 当成一块硬盘来使用。通过 GFS 客户端,无论你是要读还是写海量的数据,你都不需要去操心这些数据最终要存储到哪一台服务器或者哪一块硬盘上。你也不需要担心哪一台服务器的网线可能松了,哪一块硬盘可能坏了,这些问题都由 GFS 这个“分布式系统”去考虑解决了。
MapReduce
作为一个框架,MapReduce 设计的一个重要思想,就是让使用者意识不到“分布式”这件事情本身的存在。从设计模式的角度,MapReduce 框架用了一个经典的设计模式,就是模版方法模式。而从设计思想的角度,MapReduce 的整个流程,类似于 Unix 下一个个命令通过管道把数据处理流程串接起来。
要想让写 Map 和 Reduce 函数的人不需要关心“分布式”的存在,那么 MapReduce 框架本身就需要解决好三个很重要的问题:
- 第一个,自然是如何做好各个服务器节点之间的“协同”,以及解决出现各种软硬件问题后的“容错”这两部分的设计。
- 第二个,是上一讲我们没怎么关心的性能问题。尽量充分利用 MapReduce 集群的计算能力,并让整个集群的性能可以随硬件的增加接近于线性增长,可以说是非常大的一个挑战。
- 尽可能减少需要通过网络传输的数据。由于 MapReduce 程序的代码往往很小,可能只有几百 KB 或者几 MB,但是每个 map 需要读取的一个分片的数据是 64MB 大小。在分配 map 任务的时候,根据需要读取的数据在哪里,就把map 任务分配给所在节点的Worker进程。如果那台服务器上没有,那么它就会找离这台服务器最近的、有 worker 的服务器,来分配对应的任务。
- 尽可能让中间数据的数据量小一些。MapReduce 允许开发者自己定义一个 Combiner 函数。这个 Combiner 函数,会对在同一个服务器上所有 map 输出的结果运行一次,然后进行数据合并。
- 最后一个,还是要回到易用性。map 和 reduce 的任务都是在分布式集群上运行的,这个就给我们对程序 debug 带来了很大的挑战。
- 提供一个单机运行的 MapReduce 的库,这个库在接收到 MapReduce 任务之后,会在本地执行完成 map 和 reduce 的任务。这样,你就可以通过拿一点小数据,在本地调试你的 MapReduce 任务了,无论是 debugger 还是打日志,都行得通。
- 在 master 里面内嵌了一个 HTTP 服务器,然后把 master 的各种状态展示出来给开发者看到。
- MapReduce 框架里提供了一个计数器(counter)的机制。作为开发者,你可以自己定义几个计数器,然后在 Map 和 Reduce 的函数里去调用这个计数器进行自增。所有 map 和 reduce 的计数器都会汇总到 master 节点上,通过上面的 HTTP 服务器里展现出来。
在 MapReduce 任务提交了之后(注意与 Hadoop 不完全一样)
- 写好的 MapReduce 程序,已经指定了输入路径。所以 MapReduce 会先找到 GFS 上的对应路径,然后把对应路径下的所有数据进行分片(Split)。每个分片的大小通常是 64MB,这个尺寸也是 GFS 里面一个块(Block)的大小。接着,MapReduce 会在整个集群上,启动很多个 MapReduce 程序的复刻(fork)进程。
- 在这些进程中,有一个和其他不同的特殊进程,就是一个 master 进程,剩下的都是 worker 进程。然后,我们会有 M 个 map 的任务(Task)以及 R 个 reduce 的任务,分配给这些 worker 进程去进行处理。这里的 master 进程,是负责找到空闲的(idle)worker 进程,然后再把 map 任务或者 reduce 任务,分配给 worker 进程去处理。这里你需要注意一点,并不是每一个 map 和 reduce 任务,都会单独建立一个新的 worker 进程来执行。而是 master 进程会把 map 和 reduce 任务分配给有限的 worker,因为一个 worker 通常可以顺序地执行多个 map 和 reduce 的任务。
- 被分配到 map 任务的 worker 会读取某一个分片,分片里的数据就像上一讲所说的,变成一个个 key-value 对喂给了 map 任务,然后等 Map 函数计算完后,会生成的新的 key-value 对缓冲在内存里。
- 这些缓冲了的 key-value 对,会定期地写到 map 任务所在机器的本地硬盘上。并且按照一个分区函数(partitioning function),把输出的数据分成 R 个不同的区域。而这些本地文件的位置,会被 worker 传回给到 master 节点,再由 master 节点将这些地址转发给 reduce 任务所在的 worker 那里。
- 运行 reduce 任务的 worker,在收到 master 的通知之后,会通过 RPC(远程过程调用)来从 map 任务所在机器的本地磁盘上,抓取数据。当 reduce 任务的 worker 获取到所有的中间文件之后,它就会将中间文件根据 Key 进行排序。这样,所有相同 Key 的 Value 的数据会被放到一起,也就是完成了我们上一讲所说的混洗(Shuffle)的过程。
- reduce 会对排序后的数据执行实际的 Reduce 函数,并把 reduce 的结果输出到当前这个 reduce 分片的最终输出文件里。
- 当所有的 map 任务和 reduce 任务执行完成之后,master 会唤醒启动 MapReduce 任务的用户程序,然后回到用户程序里,往下执行 MapReduce 任务提交之后的代码逻辑。
MapReduce 的容错机制非常简单,就是重新运行和写 Checkpoints。
- worker 节点的失效(Worker Failure)。master 节点会定时地去 ping 每一个 worker 节点,一旦 worker 节点没有响应,我们就会认为这个节点失效了。解决也简单,换一台服务器重新运行这个 worker 节点被分配到的所有任务。
- master 节点的失效(Master Failure)。就任由 master 节点失败了,也就是整个 MapReduce 任务失败了。那么,对于开发者来说,解决这个问题的办法也很简单,就是再次提交一下任务去重试。谷歌也给出了一个很简单的解决方案,那就是让 master 定时把它里面存放的信息,作为一个个的 Checkpoint 写入到硬盘中去。一旦 master 失效,我们就可以启动一个新的 master,来读取 Checkpoints 数据,然后就可以恢复任务的继续执行了,而不需要重新运行整个任务。
Bigtable
即使有了 GFS 和 MapReduce,我们仍然有一个非常重要的需求没有在大型的分布式系统上得到满足,那就是可以高并发、保障一致性,并且支持随机读写数据的系统。
基于分库分表的方案,运维起来会很费劲,主要体现在以下三点
- 不得不进行的“翻倍扩容”,比如水平分库,把订单号 Hash 一下,然后取“模”(mod)个 4,拆分到 4 台不同的服务器的数据库里。当服务器性能出现瓶颈需要扩容的时候,我们常常只能采取“翻倍”分库增加服务器的方案。为什么呢?因为如果我们只增加 2 台服务器,把各个服务器的分片,从模上 4 变成模上 6,我们就需要在增加服务器之后,搬运大量的数据。并且这个数据搬运,不只是搬到新增加的服务器上,而是有些数据还要在原有的 4 台服务器上互相搬运。这个搬运过程需要占用大量的网络带宽和硬盘读写,所以很有可能要让数据库暂停服务。而我们希望的伸缩性是什么样的呢?自然是需要的时候,加 1 台服务器也加得,加 10 台服务器也加得。而用不上的时候,减少个 8 台 10 台服务器也没有问题,并且这些动作都不需要停机。
- 其次,是底层的数据分区策略对于应用不透明。比如公司是 2018 年成立,2019 年和 2020 年快速成长,每年订单数涨 10 倍,如果你用年份来进行订单的分片,那么服务器之间的负载就要差上十倍。而用日的话,双十一这样的大促也会让你猝不及防。那么,我们希望的分布式数据库是什么样的呢?自然是数据的分片是自适应的。比如 2019 年只有 100 万订单,那就分片到一个服务器节点上;2020 年有了 1000 万订单,自动给你分了 10 个节点;
- 天天跑机房的人肉运维,如果有一个 1000 台服务器的 MySQL 集群,每台服务器上都给插上 12 块硬盘,一共有 1 万 2 千块硬盘。这么多硬盘,我们到底要面临多少故障呢?基本上 3 天也要坏上一块硬盘。而我们希望的可运维性是怎么样的呢?最好是 1000 台节点的服务器,坏个 10 台 8 台没事儿,系统能够自动把这 10 台 8 台服务器下线,用剩下的 990 台继续完成服务。我们的运维人员只要 1 个月跑一趟机房批量换些机器就好
Bigtable 的设计中需要重点解决的问题
- 如何支撑好每秒十万、乃至百万级别的随机读写请求。
- 如何解决好“可伸缩性”和“可运维性”的问题。在一个上千台服务器的集群上,Bigtable 怎么能够做到自动分片和灾难恢复。
当然,除了这些目标之外,Bigtable 也放弃了很多目标,其中有两个非常重要:
- 第一个是放弃了关系模型,也不支持 SQL 语言;
- 第二个,则是放弃了跨行事务,Bigtable 只支持单行事务模型。 这两个问题,一直要到 10 年后的 Spanner 里,才被真正解决好。
Bigtable 的整体架构
数据模型,是一个很宽的稀疏表。每一张 Bigtable 的表都特别简单,每一行就是一条数据:
- 一条数据里面,有一个行键(Row Key),也就是这条数据的主键,Bigtable 提供了通过这个行键随机读写这条记录的接口。因为总是通过行键来读写数据,所以很多人也把这样的数据库叫做 KV 数据库。
- 每一行里的数据呢,你需要指定一些列族(Column Family),每个列族下,你不需要指定列(Column)。每一条数据都可以有属于自己的列,每一行数据的列也可以完全不一样,因为列不是固定的。这个所谓不是固定的,其实就是列下面没有值。因为 Bigtable 在底层存储数据的时候,每一条记录都要把列和值存下来,没有值,意味着对应的这一行就没有这个列。这也是为什么说 Bigtable 是一个“稀疏”的表。
- 列下面如果有值的话,可以存储多个版本,不同版本都会存上对应版本的时间戳(Timestamp),你可以指定保留最近的 N 个版本(比如 N=3,就是保留时间戳最近的三个版本),也可以指定保留某一个时间点之后的版本。
对于列族,它是一张“物理表”,同一个列族下的数据会在物理上存储在一起。而整个表,是一张“逻辑表”。Bigtable 的这个数据模型,使得我们能很容易地去增加列,而且增加列并不算是修改 Bigtable 里一张表的 Schema,而是在某些这个列需要有值的行里面,直接写入数据就好了。这里的列和值,其实是直接以 key-value 键值对的形式直接存储下来的。
- 数据分区,可伸缩的第一步。把一个数据表,根据主键的不同,拆分到多个不同的服务器上,在分布式数据库里被称之为数据分区( Paritioning)。MySQL 集群的分区之所以遇到种种困难,是因为我们通过取模函数来进行分区,也就是所谓的哈希分区,最大的问题,在于分区需要在一开始就设计好,而不是自动随我们的数据变化动态调整的,但是往往计划不如变化快。在 Bigtable 里,我们就采用了另外一种分区方式,也就是动态区间分区。我们不再是一开始就定义好需要多少个机器,应该怎么分区,而是采用了一种自动去“分裂”(split)的方式来动态地进行分区。我们的整个数据表,会按照行键排好序,然后按照连续的行键一段段地分区。如果某一段行键的区间里,写的数据越来越多,占用的存储空间越来越大,那么整个系统会自动地将这个分区一分为二,变成两个分区。而如果某一个区间段的数据被删掉了很多,占用的空间越来越小了,那么我们就会自动把这个分区和它旁边的分区合并到一起。
- 要是 MySQL 集群也用这样的分区方式,问题是不是就解决了?答案当然是办不到了。因为我们还需要有一套存储、管理分区信息的机制,这在哈希分片的 MySQL 集群里是没有的。在 Bigtable 里,我们是通过 Master 和 Chubby 这两个组件来完成这个任务的。这两个组件,加上每个分片提供服务的 Tablet Server,以及实际存储数据的 GFS,共同组成了整个 Bigtable 集群。
GFS 里采用了单Master 设计,所有的Worker 直接和Master通信,把各种信息放在Master 的内存里面,也就使得 Master 变成了一个单点故障点(SPOF-Single Point of Failure),当然,我们可以通过 Backup Master 以及 Shadow Master 等方式,来尽可能提升可用性,比如可以通过一个外部服务去监控 Master 的存活,等它挂了之后,自动切换到 Backup Master。但是,我们怎么知道 Master 是真的挂了,还是只是“外部服务”和 Master 之间的网络出现故障了呢?如果是后者的话,我们很有可能会遇到一个很糟糕的情况,就是系统里面出现了两个 Master。这个时候,可能两个客户端分别认为这两个 Master 是真的,当它们分头在两边写入数据的时候,我们就会遇到数据不一致的问题。那么 Chubby,就是这里的这个外部服务,不过 Chubby 不是 1 台服务器,而是 5 台服务器组成的一个集群,它会通过 Paxos 这样的共识算法,来确保不会出现误判。而且因为它有 5 台服务器,所以也一并解决了高可用的问题,就算挂个 1~2 台,也并不会丢数据。PS: 所以 Chubby的集群的价值 来源于 对“外部服务”的替代。
单机存储
Bigtable 为了做到高性能的随机读写,采用了下面这一套组合拳
- 如何提供高性能的随机数据写入?首先是将硬盘随机写,转化成了顺序写,也就是把 Bigtable 里面的提交日志(Commit Log)以及将内存表(MemTable)输出到磁盘的 Minor Compaction 机制。
- 当一个写请求过来的时候,如果写入的请求是合法的,对应的数据写入请求会以追加写的形式,写入到 GFS 上的提交日志文件中,这个写入对于 GFS 上的硬盘来说是一个顺序写。这个时候,我们就认为整个数据写入就已经成功了。在提交日志写入成功之后,Tablet Server 会再把数据写入到一张内存表(MemTable)中。当我们写入的数据越来越多,要超出我们设置的阈值的时候,Tablet Server 会把当前内存里的整个 MemTable 冻结,然后创建一个新的 MemTable。被冻结的这个 MemTable,一般被叫做 Immutable MemTable,它会被转化成一个叫做 SSTable 的文件,写入到 GFS 上,然后再从内存里面释放掉。这个写入过程,是完整写一个新文件,所以自然也是顺序写。
- 在插入数据和更新数据的时候,其实只是在追加一个新版本的数据。我们在删除数据的时候,也只是写入一个墓碑标记,本质上也是写入一个特殊的新版本数据。对于数据的“修改”和“删除”,其实是在两个地方发生的。第一个地方,是一个叫做 Major Compaction 的机制。按照前面的数据写入机制,随着数据的写入,我们会有越来越多的 SSTable 文件。这样我们就需要通过一个后台进程,来不断地对这些 SSTable 文件进行合并,以缩小占用的 GFS 硬盘空间。第二个地方,是在我们读取数据的时候。在读取数据的时候,我们其实是读取 MemTable 加上多个 SSTable 文件合并在一起的一个视图。也就是说,我们从 MemTable 和所有的 SSTable 中,拿到了对应的行键的数据之后,会在内存中合并数据,并根据时间戳或者墓碑标记,来对数据进行“修改”和“删除”,并将数据返回给到客户端。
- 如何提供高性能的随机数据读取?利用“局部性原理”,最近写入的数据,会保留在内存表里。最近被读取到的数据,会存放到缓存(Cache)里,而不存在的行键,也会以一个在内存里的布隆过滤器(BloomFilter)进行快速过滤,尽一切可能减少真正需要随机访问硬盘的次数。
- MemTable 的数据结构通常是通过一个 AVL 红黑树,或者是一个跳表(Skip List)来实现的。而 BigTable 的 Memtable 和 SSTable 的源码,一般被认为就是由 Google 开源的 LevelDB 来实现的。
- SSTable 的文件格式由两部分组成:第一部分,就是实际要存储的行键、列、值以及时间戳,这些数据会按照行键排序分成一个个固定大小的块(block)来进行存储,称之为数据块(data block)。第二部分,则是一系列的元数据和索引信息,这其中包括用来快速过滤当前 SSTable 中不存在的行键的布隆过滤器,以及整个数据块的一些统计指标,这些数据我们称之为元数据块(meta block)。因为 SSTable 里面的数据块是顺序存储的,所以要做 Major Compaction 的算法也很简单,就是做一个有序链表的多路归并就好了。当我们要在 SSTable 里查询数据的时候,我们先会去读取索引数据,找到要查询的数据在哪一个数据块里。然后再把整个数据块返回给到 Tablet Server,Tablet Server 再从这个数据块里,提取出对应的 KV 数据返回给 Bigtable 的客户端。在这个过程中,Bigtable 又利用了压缩和缓存机制做了更多的优化,首先,是通过压缩算法对每个块进行压缩,来减少存储需要的空间,以及后续的缓存需要的空间。其次,是把每个 SSTable 的布隆过滤器直接缓存在 Tablet Server 里,可以帮助我们快速判断,用户想要随机读的行键是否在这个 SSTable 文件里。
可以看到 Bigtable 的数据模型,其实是一系列的内存 + 数据文件 + 日志文件组合下封装出来的一个逻辑视图。数据库的存储引擎并不是用了什么高深的算法、特别的硬件,而是在充分考虑了硬件特性、算法和数据结构,乃至数据访问的局部性,综合到一起设计出来的一个系统。每一个环节都是教科书上可以找到的基础知识,但是组合在一起就实现了一个分布式数据库。而这个数据库暴露给用户的,也是一个非常简单的、类似于 Map 的根据键 - 值读写的接口。
分布式锁Chubby
GFS、MapReduce,以及 Bigtable都是一个单 Master 系统。而这就带来了一个问题,就是这个 Master 会成为整个系统的单点,一旦 Master 出现硬件故障,或者遇到 Master 网络不通的情况,整个集群就不能提供完整的服务了。所以,在 GFS 和 Bigtable 的论文里,我们看到它们都有对应的 Backup Master 机制。通过一个监控机制,当发现 Master 出现问题的时候,就自动切换到数据和 Master 完全同步的 Backup Master,作为系统的“灾难恢复”机制。乍一听,这个做法简单直接。不过如果仔细想一想,这个操作可没有那么容易实现。我们至少会遇到两个问题:
- 怎么能够做到 Backup Master 和 Master 完全同步?特别是当硬件、网络可能出现故障的情况下,我们怎么能够做到两边的数据始终同步。如果数据不能做到始终同步,那么当真有需要我们切换节点到 Backup Master 的时候,我们就会遇到数据丢失的情况。
- 监控程序本身也是一个单点,当我们的监控程序说 Master 挂了的时候,我们怎么知道 Master 是真的挂了,还只是监控程序到 Master 的网络中断了呢?如果是后者的话,会不会出现一个集群里有两个 Master 的情况?
这些问题(解决 Master 和 Backup Master 完全同步的问题)的本质,就是分布式共识问题。
从两阶段提交到 CAP 问题
在 GFS 的论文里,我们看到,GFS 的 Master 是有一个同步复制的 Backup Master 的。所有在 Master 上的操作,都要同步在 Backup Master 上写入成功之后,才算真正写入完成。这句话说起来很容易,可是实际上并不容易做到。因为同步复制要求下的数据写入操作,要跨越两个服务器。所以我们不能像前面 Bigtable 里的 SSTable 那样,只要预写日志(WAL)写入成功,就认为在 Master 上数据写入成功了。因为很有可能,同步在 Backup Master 里写入的数据,会由于硬件问题或者进程忽然被 kill 等原因失败了。这个时候,所谓的“同步复制”也就不复存在了。所以,让 Backup Master 和 Master 做到同步复制,本质上我们每次的成功就是一个分布式事务,也就是要么同时在 Master 和 Backup Master 上成功,要么同时失败。为了解决这个分布式事务问题,我们需要有一个机制,使得 Master 和 Backup Master 两边的数据写入可以互相协同。那么第一个被想到的解决办法,就是两阶段提交(2PC,Two Phases Commit)。
两阶段提交的过程其实非常直观,就是把数据的写入,拆分成了提交请求和提交执行这两个不同的阶段,然后通过一个协调者(Coordinator)来协调我们的 Master 和 Backup Master。两阶段提交虽然保障了一致性(C),但是牺牲了可用性(A)。无论是协调者,还是任何一个参与者出现硬件故障,整个服务器其实就阻塞住了,需要等待对应的节点恢复过来。两阶段提交里,任何一个服务器节点出问题,都会导致一次“单点故障”。其实两阶段提交也好,三阶段提交也好,最大的问题并不是在可用性和一致性之间的取舍。而是这两种解决方案,都充满了“单点故障”,特别是协调者。因为系统中有一个中心化的协调者,所以其实整个系统很容易出现“单点故障”。换句话说,就是整个系统的“容错”能力很差。所以,我们需要一个对单个节点没有依赖的策略,即使任何一个单个节点的故障,或者网络中断,都不会使得整个事务无法推进下去。
两阶段提交,是想要解决 Master 和 Backup Master 完全同步的问题。事实上,这个方式适用于所有的“分布式事务”问题。我们不是只能让两个参与者写入完全相同的数据,两阶段提交也完全可以在两个不同的物理数据库里,插入不同的数据。比如买卖房屋的例子,买房的银行账户的金额变动和卖房的银行账户的金额变动,可能就在两个不同的数据库。但是在实际银行转账的时候,需要两边的银行账户同步更新,不能出现一边钱没转出去,另一边钱已经收到的情况。这个特性,也就是我们常说的数据库的事务性。
Paxos
使用两阶段提交来实现 GFS 的 Master 和 Backup Master 的同步写入,好像有点杀鸡用牛刀的感觉。在这个同步复制的过程中,我们需要的并不是让 GFS 的文件系统的操作支持事务,我们对 GFS 的 Master 操作,最抽象来看,就是写日志。而 Backup Master 起到的作用,就是同步复制日志,每一条日志,都是对文件系统的一次操作。我们可以把这样的一条条操作看成是一个个事件,每一次事件,都让整个文件系统的状态进行了一次改变。所以本质上,这就是一个状态机(State Machine)。而让 Backup Master 和 Master 保持同步来保持高可用性,其实就是要在两个状态机之间做同步复制。所以这个问题,也就变成了一个状态机复制问题(replicated state machine)。
为了保障线性一致性,或者说系统的可线性化,我们必须让主从节点之间是同步复制的(异步复制是不行的)。而要做到高可用的同步复制,我们就需要 Paxos 这样的共识算法。
- prepare阶段,当提案者收到一条来自客户端的请求之后,它就会以提案者的身份发起提案。这个提案会广播给所有的接受者,这个广播请求被称为 Prepare 请求。这时的Acceptor只是返回告知提案者信息,它还没有真正接受请求。这个过程,本质上是提案者去查询所有的接受者,是否已经接受了别的提案。
- 当提案者收到超过半数的响应之后呢,整个提案就进入第二个阶段,也称之为 Accept 阶段。提案者会再次发起一个广播请求,提案者还是会等待至少一半的Acceptor返回的响应,如果其中有人拒绝,那么提案者就需要放弃这一轮的提案,重新再来;而当超过一半人表示接受请求的时候,提案者就认为提案通过了。 PS: 从某个提案者的视角看,协商过程是两轮广播,第一轮广播从接受者获悉其它提案者的提案,按统一规则(提案号最大)统一意见,第二轮广播则是对这个意见进行确认。提案者之间并不直接交互,Acceptor除了一个“存储”的作用外,还有一个信息转发的作用。从Acceptor的视角看,每个提案者提出的提案 都可能不一样。所以第一阶段,先经由Acceptor将 提案号最大的提案 尽可能扩散到提案者(即决定哪个提案者 是“意见领袖”)。第二阶段,再将“多数意见”形成“决议”(Acceptor持久化提案)
相信你也发现了 Paxos 算法的一个问题,那就是开销太大了。无论是否系统里面出现并发的情况,任何一个共识的达成,都需要两轮 RPC 调用。而且,所有的数据写入,都需要在所有的接受者节点上都写入一遍。在 Paxos 算法里,一个节点就需要承接所有的数据请求。虽然在可用性上,我们没有单点的瓶颈了,但是在性能上,我们的瓶颈仍然是单个节点。
通过 Chubby 转移可用性和“共识”问题
无论是 GFS 还是 Bigtable,其实都是一个单 Master 的系统。而为了让这样的系统保障高可用性,我们通常会采用两个策略。
- 第一个,是对它进行同步复制,数据会同步写入另外一个 Backup Master,这个方法最简单,我们可以用两阶段提交来解决。
- 第二个,是对 Master 进行监控,一旦 Master 出现故障,我们就把它切换到 Backup Master 就好了。
我们原先的系统只需要 Master,即使是做同步数据复制,也只需要通过两阶段提交这样的策略就可以了。而一旦出现单点故障,我们只需要做一件事情,就是把故障的节点切换到它同步备份的节点就行。而我们担心的系统里会有两个 Master 的问题,通过 Paxos 算法来解决就好了。只要通过 Paxos 算法,让一个一致性模块达成共识,当前哪一个是 Master 就好,其他的节点都通过这个一致性模块,获取到谁是最新的 Master 即可。
GFS 和 Bigtable 这些系统,仍然会采用单个 Master,然后对数据进行分区的方式,来提升整个系统的性能容量。但是在关键的元数据管理,以及 Master 节点挂掉切换的时候,会利用 Chubby 这个分布式锁服务,来确保整个分布式系统是有“共识”的,避免出现多个人都说自己是 Master 这样“真假美猴王”的情况出现。PS:大型分布式系统,并不是时时刻刻都会出现数据不一致的风险的,共识算法仅仅用来解决 容错恢复时出现两个 Master 的情况。以yarn为例,yarn 客户端会配置多 resourceManager地址,客户端自己随机连一个,比如客户端连接rm1,rm1 如果是master 就服务,如果不是可能报错也可能告诉客户端真实的master 地址,客户端连接另一个即可。多个rm 启动时 争当master,争上了按master角色 干活,没争上按 Backup Master 角色干活。
在 Chubby 里,它自己的多个节点,会先通过“共识”算法,确认一个 Master 节点(其他的节点,是作为“容错”而存在的)。这个 Master 节点,会作为系统中唯一的一个提案者(Proposer),所有对于 Chubby 的写入数据的请求,比如获取某个锁,都会发送到这个 Master节点,由它作为提案者发起提案,然后所有节点都会作为接受者来接受提案达成共识。只有一个提案者带来的好处就是,大部分时间,我们不太会因为两个 Proposer 之间竞争提案,而导致需要很多轮协商才能达成一致的情况。如果 Chubby 的 Master 挂掉了怎么办呢?我们可以通过剩下的节点,通过共识算法再找一个 Master 出来。而且如果是因为网络故障,导致有两个 Master 的话,也会很快通过共识算法确定一个 Master 出来。另外,两个 Master 其实只是一致性模块里的两个提案者,即使两边都接受外部请求,也都会通过共识算法,只选择一个值出来。
进化
- 作为一个“计算”引擎,MapReduce朝着以下方式进化
- 首先是编程模型。MapReduce 的编程模型还是需要工程师去写程序的,所以它进化的方向就是通过一门 DSL,进一步降低写 MapReduce 的门槛。在这个领域的第一阶段最终胜出的,是 Facebook 在 2009 年发表的 Hive。
- 其次是执行引擎。Hive 虽然披上了一个 SQL 的皮,但是它的底层仍然是一个个 MapReduce 的任务,所以延时很高
- 多轮迭代问题。在 MapReduce 这个模型里,一个 MapReduce 就要读写一次硬盘,而且 Map 和 Reduce 之间的数据通信,也是先要落到硬盘上的。这样,无论是复杂一点的 Hive SQL,还是需要进行上百轮迭代的机器学习算法,都会浪费非常多的硬盘读写。后来就有了 Spark,通过把数据放在内存而不是硬盘里,大大提升了分布式数据计算性能。
- 作为一个“在线服务”的数据库,Bigtable 的进化是这样的
- 首先是事务问题和 Schema 问题。Google 先是在 2011 年发表了 Megastore 的论文,在 Bigtable 之上,实现了类 SQL 的接口,提供了 Schema,以及简单的跨行事务。如果说 Bigtable 为了伸缩性,放弃了关系型数据库的种种特性。那么 Megastore 就是开始在 Bigtable 上逐步弥补关系型数据库的特性。
- 其次是异地多活和跨数据中心问题。Google 在 2012 年发表的 Spanner,能够做到“全局一致性”。
- 实时数据处理的抽象进化。首先是 Yahoo 在 2010 年发表了 S4 的论文,并在 2011 年开源了 S4。而几乎是在同一时间,Twitter 工程师南森·马茨(Nathan Marz)以一己之力开源了 Storm,并且在很长一段时间成为了工业界的事实标准。接着在 2011 年,Kafka 的论文也发表了。最早的 Kafka 其实只是一个“消息队列”,后来进化出了 Kafka Streams 这样的实时数据处理方案。2015 年,Google 发表的 Dataflow 的模型,可以说是对于流式数据处理模型做出了最好的总结和抽象。一直到现在,Dataflow 就成为了真正的“流批一体”的大数据处理架构。
- 将所有服务器放在一起的资源调度,因为数据中心里面的服务器越来越多,我们会发现原有的系统部署方式越来越浪费。原先我们一般是一个计算集群独占一系列服务器,而往往很多时候,我们的服务器资源都是闲置的。这在服务器数量很少的时候确实不太要紧,但是,当我们有数百乃至数千台服务器的时候,浪费的硬件和电力成本就成为不能承受之重了。于是,尽可能用满硬件资源成为了刚需。由此一来,我们对于整个分布式系统的视角,也从虚拟机转向了容器,这也是 Kubernetes 这个系统的由来。