ES2_2分布式文档存储

[x] ES如何保证断电时数据也不会丢失

分片原理 - Shard

一个 分片 是一个底层的 工作单元 ,它仅保存了 全部数据中的一部分,是一个 Lucene 实例,所以它本身就是一个完整的搜索引擎。
一个索引可以被划分成多个分片,创建索引时可指定分片数量,默认是 5。
每个分片是一个 Lucene 实例,它本身也是一个功能完善并且独立的“索引”,这个“索引” 可以被放置到集群中的任何节点上。
分片是 ES 中集群管理的最小单位,在此基础上,ES 允许:

  • 允许你水平分割/扩展你的内容容量
  • 允许你在分片(位于多个节点上)之上进行分布式的、并行的操作,进而提高性能/吞吐量

分片类似 DB 里的分库分表,是为了解决数据量很大查询效率低下的问题,同时突破单节点磁盘限制。
集群的配置非常灵活,比如对于一个需要占据 100G 磁盘空间的索引,5 个分片每个分片大小 20G,假设单节点磁盘上限 100G,可以有以下几种方案:

  1. 单节点一个分片:单次只能在 100G 数据里查询一条数据,磁盘占用率 100%。
  2. 单节点 5 个分片:每个分片存储 20G 数据,可以 5 个任务并行查询,磁盘占用率 100%,索引大小上限 100G,无法再插入新数据。
  3. 集群(2 个节点)5 个分片:一个节点 3 片,另一个节点 2 片,可以并行查找,同时单节点磁盘占用量 <60%,索引最大存储上限为 200G。
  4. 集群(5 个节点)5 个分片:每个节点包含一个分片,单节点磁盘占用量 20%,索引最大存储上限为 500G。

主分片和副分片Replica

分片也分为主分片和副分片,也存在主分片的选举问题,选出主分片后由主分片来执行所有写请求,副分片来分担搜索请求。
索引内任意一个文档都归属于一个主分片,所以主分片的数目决定着索引能够保存的最大数据量,在索引建立的时候就已经确定了主分片数,但是副本分片数可以随时修改。
一个副本分片只是一个主分片的拷贝。副本分片作为硬件故障时保护数据不丢失的冗余备份,并为搜索和返回文档等读操作提供服务。

副分片(副本) - Replica

Elasticsearch 允许创建分片的一份或多份拷贝(默认复制 1 份),这些拷贝叫做副本。
在分片/节点失败的情况下,复制提供了高可用性
因为搜索可以在所有的复制上并行运行,复制可以扩展你的搜索量/吞吐量
复制分片不与相同的主分片置于同一节点上,否则失去备份的意义。
默认情况下,每个索引会有10个shard,5个primary shard,每个primary shard会有一个replica shard,即5个replica shard,最小的高可用配置是2台服务器(节点),其中每个节点上有5个shard,如果其中一个节点宕机了,因为还有一台机器,因此数据并不会丢失,另一个节点仍可以响应搜索请求。
可以在创建索引的时候指定:

1
curl -XPUT 'localhost:9200/test' -d `{"settings": {"number_of_replicas": 2}}`

总而言之:

  1. 每个索引可以被分成多个分片。
  2. 一个索引也可以被复制 0 次(即没有复制) 或多次。
  3. 一旦复制了,每个索引就有了主分片(作为复制源的分片)和复制分片(主分片的拷贝)。
  4. 分片和复制的数量可以在索引创建的时候指定。在索引创建之后,可以在任何时候动态地改变复制的数量,但是不能再改变分片的数量。

至于 ES 集群中如何分配分片与备份,都是 ES 内部维护管理的,对用户完全透明。

分片数设置

分片数的设置需要提前做好容量规划,不要过大也不要过小:

  • 如果分片数过小
    会导致后续无法通过增加节点来实现水平扩展

    主分片无法动态增加
    如果要做数据迁移,也会因为单个分片数据量太大,导致数据重新分配耗时

  • 如果分片数过大
    影响搜索结果的相关性打分,影响统计结果的准确性
    单个节点上分片过多,会导致资源浪费,同时也会影响性能

    7.0开始,ES默认分片数改成了1,不会造成over-sharding问题

  • 如果主分片数过小:如果该索引增长很快,集群无法通过增加节点实现对这个索引的数据扩展
  • 如果主分片数过大:导致单个分片容量很小,引发一个节点上有过多分片,影响性能
  • 副本分片数设置过多,会降低集群整体的写入性能

分片数可以在创建索引时指定,也可以在运行时修改:

1
2
3
4
5
6
7
PUT /test
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}

分布式文档存储

相关配置

  • wait_for_active_shards:开始执行写入操作前需要等待的活跃分片数量,主要用于维护写一致性。

文档路由

文档到分片的映射算法

1
shard = hash(routing) % number_of_primary_shards

routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到 余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求的文档所在分片的位置。
这就解释了为什么我们要在创建索引的时候就确定好主分片的数量 并且永远不会改变这个数量:因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。
所有的文档 API( get 、 index 、 delete 、 bulk 、 update 以及 mget )都接受一个叫做 routing 的路由参数 ,通过这个参数我们可以自定义文档到分片的映射。

主节点数固定不是一件好事,我们会在集群扩容里提到如何解决扩容问题。

hash算法

hash算法应能确保文档均匀分布在所用分片上,充分利用硬件资源,避免部分机器空闲,部分机器繁忙。

指定hash字段

1
2
3
4
5
PUT posts/_doc/100?routing=bigdata
{
"title": "Mastering Elasticsearch",
"body": "let's Rock"
}
  • 默认_routing值是文档ID
  • 可以自行指定_routing值

分发机制

可以将请求发送到 集群中的任何节点 ,包括主节点。每个节点都知道任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。无论我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。
一个分片是一个 Lucene 实例,我们的文档被存储和索引到分片内,但是应用程序是直接与索引而不是与分片进行交互的。Elasticsearch 是利用分片将数据分发到集群内各处的。分片是数据的容器,文档保存在分片内,分片又被分配到集群内的各个节点里。当你的集群规模扩大或者缩小时,Elasticsearch 会自动的在各节点中迁移分片,使得数据仍然均匀分布在集群里。
我们可以发送请求到集群中的任一节点。 每个节点都有能力处理任意请求。 每个节点都知道集群中任一文档位置,所以可以直接将请求转发到需要的节点上。但是为了更好地实现负载均衡,我们一般会轮询节点,每次接受请求的节点称为 (协调节点),比如下面的 Node1。

一致性(Consistency)

为了保证一致性,在默认设置下,即使仅仅是在试图执行一个_写_操作之前,主分片都会要求 必须要有 **规定数量(quorum)**(或者换种说法,也即必须要有大多数)的分片副本处于活跃可用状态,才会去执行_写_操作(其中分片副本可以是主分片或者副本分片)。这是为了避免在发生网络分区故障(network partition)的时候进行_写_操作,进而导致数据不一致。

注意:下面的参数只对 ElasticSearch 5.0 以下的版本有效,在 ElasticSearch 5.0 之后貌似使用 wait_for_active_shards 代替了 consistency。所以之前的参数了解即可,实际可以参考:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/indices-create-index.html

consistency 有三种取值:

  1. quorum(规定大多数,默认)
    1
    int( (primary + number_of_replicas) / 2 ) + 1
  2. one(只要主分片 ok 就可以)
  3. all(必须要主分片和所有副本分片的状态都没问题才允许执行写操作)

timeout

如果没有足够的副本分片数,Elasticsearch 会等待,希望更多的分片出现。默认情况下,它最多等待 1 分钟。可以设置 timeout 的值来修改等待时间。

乐观并发控制

ES中的文档是不可变更的,如果更新一个文档,会将该文档标记为删除,同时增加一个全新的文档,同时文档的version字段+1。

并发更新问题
如上图所示 Web-2 不知道自己的对象拷贝已经过期,结果执行更新时会认为库存尚充足。
像这样的变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。
在数据库领域中,有两种策略通常被用来确保并发更新时变更不会丢失:

  • 悲观并发控制
    这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。
  • 乐观并发控制
    Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。

Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许 顺序是乱的。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。

  1. 内部版本号
    当我们之前讨论 index , GET 和 delete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增。 Elasticsearch 使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。
    当我们在更新数据时,需要指定想要修改文档的_version,如果该版本不是当前版本号,我们的请求将会失败。
    1
    2
    3
    4
    5
    PUT /website/blog/1?version=1 
    {
    "title": "My first blog entry",
    "text": "Starting to get the hang of this..."
    }
  2. 内部版本号
    通过version来进行内部版本控制在新版中已经被废除
    改为了if_seq_no + if_primary_term组合的方式来进行并发控制
    1
    2
    3
    4
    5
    PUT products/_doc/1?if_seq_no=0&if_primary_term=1
    {
    "title": "iphone",
    "count": 100
    }
  3. 外部版本号
    一个常见的设置是使用其它数据库作为主要的数据存储,使用 Elasticsearch 做数据检索,Elasticsearch 提供了一种机制来重用主数据库中已经存在的版本号字段。
    外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前 _version 和请求中指定的版本号是否相同, 而是检查当前 _version 是否 小于 指定的版本号。 如果是则请求成功,外部的版本号作为文档的新 _version 进行存储,这意味着每次请求必须先在主数据库中增加版本号的值,再将新版本的数据索引到 Elasticsearch 中。
    1
    2
    3
    4
    5
    PUT /website/blog/2?version=5&version_type=external
    {
    "title": "My first external blog entry",
    "text": "Starting to get the hang of this..."
    }

创建、索引、删除

新建、索引和删除 请求都是 写 操作, 必须在主分片上面完成之后才能被复制到相关的副本分片,所以需要:

  1. 根据 id 确定其所属的主分片位置
  2. 主分片若执行成功,将请求并行转发到副本分片上;
  3. Node3 上的主分片执行成功,向协调节点报告成功,协调节点再汇报给客户端;
    新增、索引、删除

取回单个文档

可以从主分片或者从其它任意副本分片检索文档:

  1. 向协调节点发送请求,请求方式是轮询,比如上次从 2 获取到数据,则这次从 3 开始;
  2. 使用 id 确定文档所处主分片位置,如果存在多份则返回第一份即可;
  3. 协调节点将文档返回给客户端
    取回单个文档

局部更新

部分更新包括读取和写入两个过程:

  1. 同样先向协调节点发送更新请求;
  2. 从 id 计算出文档所在的主分片位置;
  3. 从主分片检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片的文档。如果文档已经被另一个进程修改,它会重试步骤 3 ,超过 retry_on_conflict 次后放弃。
  4. 如果成功地更新文档,它将新版本的文档(而不是更新请求本身,因为如果以错误的顺序到达,可能导致文档损坏)并行转发到其他节点的副本分片,重新建立索引。
  5. 一旦所有副本分片都返回成功,向协调节点也返回成功,协调节点向客户端返回成功。
    局部更新

mget(批量读)

mget 和 bulk API 的 模式类似于单文档模式。区别在于协调节点知道每个文档存在于哪个分片中。

  1. 将整个多文档请求分解成 每个分片 的多文档请求,并且将这些请求并行转发到每个参与节点;
  2. 协调节点一旦收到来自每个节点的应答,就将每个节点的响应收集整理成单个响应,返回给客户端。
    批量读

bulk(批量改)

  1. 协调节点接受请求,为每个节点创建一个批量请求,并将这些请求并行转发到每个包含主分片的节点主机;
  2. 主分片一个接一个按顺序执行每个操作。当每个操作成功时,主分片并行转发新文档(或删除)到副本分片,然后执行下一个操作。 一旦所有的副本分片报告所有操作成功,该节点将向协调节点报告成功,协调节点将这些响应收集整理并返回给客户端。

    bulk API 格式中有换行符,这是因为请求文档可能属于不同的主分片,如果将这些文档放到一个 JSON 数组中,则需要解析它们并重新创建请求数组,这需要占用大量 RAM 空间来存储原数据的副本,JVM 将不得不花费时间进行垃圾回收。
    批量改

数据副本模型

主从模型是分布式的经典模型之一,通过主节点的选举可以实现高可用。的那个然主从只能提高可用性,如果要性能上的可伸缩性,一般还会对数据进行 hash。
主从模型实际上也是下面将要讨论的分布式文档存储、搜索的基础。

HDFS、Cassandra 等使用的是对等模型。

PacificA 算法

ES 的主副本模型的实现参考了微软的 PacificA 算法,下面是算法中涉及到的几个概念:

  • Replica Group
  • Configuration
  • Configuration Version
  • Serial Number(SN)

PacificA 算法运行在分布式系统之上,对系统有以下几项假设:

  • 节点可以失效,对消息延迟的上限不做假设;
  • 消息可以丢失、乱序,但不能被篡改,即不存在拜占庭问题

    拜占庭问题

  • 网络分区可以发生,系统时钟可以不同步,但漂移是有限度的。

    漂移

存储管理 - 写入

数据的读取和更新策略,及使用多副本方式保证数据的可靠性和可用性。
数据写流程如下:
数据写入流程

  1. 写请求进入主副本节点;
  2. 节点为该请求分配 SN,并使用该 SN 创建 UpdateRequest,然后将该 UpdateRequest 插入自己的 prepareList;
  3. 主副本将携带 SN 的 UpdateRequest 发往从副本节点,从节点收到后同样插入 prepareList;
  4. 主副本节点接收到所有从副本节点的 ACK 响应,确认该数据已经被写入到所有的从副本节点;
  5. 此时达到了可提交的状态,主副本将此 UpdateRequest 放入 committedList,committedList 向前移动;
  6. 主副本节点回复客户端更新成功。
  7. 主节点向从节点发送 commit 通知,告诉它们自己的 committed point 位置,从节点收到通知后根据指示来移动 committed point 到相同位置。

数据存储流程中遵循不变式Commit Invariant

  1. 主节点 committedList 是任何一个从节点 prepareList 的前缀(子集);
  2. 任一从节点上的 committedList 是主节点上 committedList 的前缀(子集)。