ES4_1集群原理

对ES源码的阅读记录。

ES调试环境的搭建

ES调试环境的初始化见我的另一篇文档:
https://tallate.github.io/c395b48b.html#%E8%B0%83%E8%AF%95debug-es

ES的分布式原理

集群 - Cluster

一个集群就是由一个或多个节点组织在一起,它们共同持有全部的数据。
一个集群有一个唯一的名字标识 cluster.name ,其节点只能通过指定某个集群的名字,来加入这个集群,默认情况下,ES会自动发现同一网段内的节点,自动组成集群,当然也可以手动配置集群内节点的地址(discovery.zen.ping.unicast.hosts)。
集群有一个主节点,这个主节点是选举产生的,但是需要注意的是ES是一个去中心化的集群:

  • 对内,有一个主节点负责管理集群状态,包括管理分片的状态和副本的状态,以及节点的发现和删除;
  • 对外,没有主节点,第一个接收请求的节点作为协调节点

集群监控

1
2
# 测试环境es集群状态查看
curl http://ip:9219/_cluster/health

Elasticsearch 的集群监控信息中包含了许多的统计数据,其中最为重要的一项就是 集群健康 , 它在 status 字段中展示为:

  • green:所有的主分片和副本分片都正常运行
  • yellow:所有的主分片都正常运行,但不是所有的副本分片都正常运行,比如刚创建时没有被分配到任何节点
  • red:有主分片没能正常运行

    比如服务器磁盘容量超过85%时创建了一个新的索引

集群状态监控

  • 集群状态
    http://localhost:9200/_cluster/state
    主要看集群节点信息、其中哪个是主节点。
  • 集群统计
    http://localhost:9200/_cluster/stats
    集群中有几个节点、多少文档、多少索引等。
  • 集群任务管理
    http://localhost:9200/_tasks
  • 节点信息
    http://localhost:9200/_nodes
  • 活跃线程信息
    http://localhost:9200/_nodes/hot_threads

节点 - Node

一个节点是集群中的一个服务器,即一个 Elasticsearch 实例。
作为集群的一部分,它存储数据,参与集群的索引和搜索功能。

主节点 - Master-eligible 和 Master 和 选举

每个节点启动后,默认就是一个Master eligible节点。

可以设置node.master: false禁止,即设置成false后就无法成为主节点了。

Master-eligible节点可以参加选主流程,成为Master节点。

  • 当第一个节点启动时候,它会将自己选举成Master节点。
  • 每个节点上都保存了集群状态,但只有Master节点才能修改集群的状态信息(为保持一致性因此只有Master节点才能修改)

    集群状态信息包括:所有节点信息、索引及其相关的Mapping和Setting信息、分片的路由信息。

Master-eligible选主流程:

  • Master-eligible会互相Ping对方,对所有可以成为Master的节点,根据Node Id字典排序,Node Id低的会成为被选举的节点;
  • 如果对某个节点的投票数达到一定的值(可以成为Master节点数n / 2 + 1)并且该节点自己也选举自己,则该节点成为Master,否则重新选举一直到满足上述条件。
  • 其他节点会加入集群,但是不承担Master节点的角色;
  • 一旦发现被选中的主节点丢失,就会选举出新的Master节点。

当一个节点被选举成为主节点时,它将负责管理集群范围内的所有变更,例如增加、删除索引,或者增加、删除节点等。
主节点并不需要全权负责涉及到文档级别的变更和搜索等操作(这和ES中协调节点的概念有关),所以当集群只拥有一个主节点的情况下,即使流量增加它也不会成为瓶颈。

协调节点Coordinating Node和数据节点Data Node

协调节点并不是物理意义上的一个节点,而是指ES集群中第一个接收请求的节点,这个节点需要负责将请求路由到数据节点执行对应的操作,如果是Search请求,协调节点还需要对结果进行合并排序分页等操作。

节点类型配置

节点类型 配置参数 默认值
master eligible node.master true
data node.data true
ingest node.ingest true
coordinating only 设置上面三个参数全部为false
machine learning node.ml true(需要enable x-pack)

脑裂问题及脑裂避免

ES集群是有一个节点作为Master的,它负责集群内的状态同步。
如果集群内出现了复数个Master,则说明集群发生了脑裂,进而可能导致集群内分片副本数据的不同步。
当出现网路分区时,一个节点(或多个)和其他节点无法连接,这时可能发生脑裂问题。
ES的脑裂
如上图所示,node1和其他节点的网络断开:

  • node2和node3会重新选举Master
  • node1自己还是作为Master,组成一个集群,同时更新Cluster State
  • 此时出现了2个Master,维护不同的Cluster State,当网络恢复时,无法选择正确恢复

发生脑裂的原因

  1. 网络分区
    因为是在内网通信,出现网络通信问题的可能性较小,可以看下脑裂发生时的内网流量情况;
  2. 负载过大
    一般Master节点也会作为data节点提供服务,当它处理数据请求的负载较大时,可能会导致该ES实例停止响应,并重新选举Master;
    较大规模的内存回收操作也能造成ES进程失去响应。

脑裂避免

1、将Master和Data节点分离
如果Master节点仅仅负责集群状态管理,则发生负载过高的可能性相对会小一些。

1
2
node.master=true
node.data=false

2、限定一个选举条件,设置quorum(仲裁),只有在Master eligible节点数大于quorum时,才能进行选举
大多数节点:quorum = (master节点总数 / 2) + 1
当3个master eligible时,设置discovery.zen.minimum_master_nodes为2,即可避免脑裂

1
2
3
4
5
6
# 探活时间,默认3秒,即Master节点3秒没有应答就认为死掉了
# 增加这个值可以减小误判的可能,但是也会导致Master节点真宕机的情况不能被及时发现
discovery.zen.ping_timeout
# 一个节点需要看到至少有该数量的具有Master节点资格的节点后才能在集群中做操作
# 默认是1
discovery.zen.minimum_master_nodes

3、从7.0开始,不需要minimum_master_nodes这个配置了
7.0开始移除了minimum_master_nodes参数,让Elasticsearch自己选择可以形成仲裁的节点
典型的主节点选举现在只需要很短的时间就可以完成,集群的伸缩变得更安全、更容易,并且可能造成丢失数据的系统配置选项更少了。
节点更清楚地记录它们的状态,有助于诊断为什么它们不能加入集群或为什么无法选举出主节点。

故障转移

同机多节点

当在同一台机器上启动了第二个节点时,只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。

不同机器多节点

在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播主机列表,设置其他节点的 ip:port 列表:

1
discovery.zen.ping.unicast.hosts: ["host1", "host2"]

故障转移机制

故障转移
所有新近被索引的文档都将会保存在主分片上,然后被并行的复制到对应的副本分片上。这就保证了我们既可以从主分片又可以从副本分片上获得文档,即使主分片不可用了,服务仍然可用。如上图所示。
如果集群缺失主节点,集群不能正常工作。所以发生的第一件事情就是选举一个新的主节点。但是需要注意的是集群必须存在且仅存在一个主节点,否则会出现脑裂现象,因此 Elasticsearch 提供了很多参数来避免这个情况。
如果索引缺失主分片,索引也不能正常工作。 如果此时来检查集群的状况,我们看到的状态将会为 red :不是所有主分片都在正常工作。在其它节点上存在着这两个主分片的完整副本, 所以新的主节点立即将这些分片在 Node 2 和 Node 3 上对应的副本分片提升为主分片, 此时集群的状态将会为 yellow ,因为在创建索引时设置了每个主分片需要对应 2 份副本分片:

1
number_of_replicas : 2

重启节点后,集群可以将缺失的副本分片再次进行分配,如果 Node 1 依然拥有着之前的分片,它将尝试去重用它们,同时仅从主分片复制发生了修改的数据文件。

集群扩容

ES扩容限制

  • ES的replica shard可以扩增,但是primary shard无法动态增加,因此读性能可以随时扩展,但是ES中的写操作是需要转发到primary shard的,这意味着写性能的弹性是有限的;
  • shard数量固定的前提下,只能在节点规模上进行扩展,最多一个shard一个节点,每个shard占用一台服务器的所有资源,可用性也会更高,只要每个shard留有一个primary shard即可。

配置节点

集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者从集群中移除节点时,集群将会重新平均分布所有的数据

水平扩容

水平扩容1
主分片的数目在索引创建时 就已经确定了下来。实际上,这个数目定义了这个索引能够 存储 的最大数据量。但是,读操作——搜索和返回数据——可以同时被主分片 或 副本分片所处理,所以当你拥有越多的副本分片时,也将拥有越高的吞吐量。
动态调整副本分片数目的命令:

1
2
3
4
PUT /blogs/_settings
{
"number_of_replicas" : 2
}

水平扩容2
为了提高性能,要么是为已有的结点分配更多的资源,要么是分配更多的结点并扩展当前集群。但是不能通过为已有的分片分配更多的资源或者为一个结点分配更多的分片来提高性能,因为一个结点内的所有分片共享这个结点(JVM)的资源,最终还是必须增加更多的硬件资源来提升吞吐量。
更多的副本分片数提高了数据冗余量:按照上面的节点配置,我们可以在失去 2 个节点的情况下不丢失任何数据(因为每个结点上要么存在一个分片要么存在一个分片的冗余)。

更多关于扩容的知识需要阅读相关文献,比如:扩容设计

数据迁移

MySQL 与 ES 同步

为什么要同步 MySQL 和 ES

为了使海量数据能够提供实时快速的查询,mysql 很显然力不从心,于是我们需要利用 es 提供大数据搜索服务,典型的场景就是:产品或者商品搜索。
MySQL 的优势:

  1. 关系数据模型
    • 比较普遍;
    • 应用层的对象可以很“干净地”(类似一一对应)映射到数据库表上;
    • 关联多张表很方便。
  2. 易用
    • 成熟的产品(如 Amazon RDS)
    • 大家非常熟悉
    • 很方便在网上找到支持

MySQL 的问题:

  1. 数据模型问题
    • 表增多后可能会变得很复杂;
    • schema(MySQL 中与 database 等价,Oracle 中代表当前用户拥有的数据库中对象集合)不灵活,结构在确定后就不容易更改了,如果真要修改也可能需要另数据库停工一段时间;
  2. 查询慢
    • 关联容易拖慢速度
    • 表会变得很大
    • 时间序列化数据(Time series data)操作(比如按时间排序)比较困难,一种解决方案:TimescaleDB
    • 应用层的后处理(数据查出来后的处理)开销比较大
  3. 伸缩问题
    • 数据库会变得越来越大
    • 垂直扩展(扩展现有系统部件,比如增加 CPU/RAM/disk 到当前节点)需要暂停当前服务
    • 水平扩展困难(增加节点)
  4. 不满足搜索需求

注意事项

不要完全从 MySQL 迁移到 Elasticsearch,尽管 Elasticsearch 在搜索、文档分析、日志等方面很优秀。主要出于以下原因:

  1. ES 是复杂的,会遇到很多技术挑战;
    • 将 MySQL 数据库表映射到 ES 的文档;
    • 在 ES 中处理关联关系比较复杂:MySQL 可以很容易地利用外键来关联多个表,但是 ES 中关联两个 Type(相当于表)很困难;
    • 处理 update 和 delete 操作时必须手动处理关联类型;
  2. ES不提供事务支持
  3. update 和 delete 操作比较难处理;

所以,如果已经存在有一个 MySQL 数据库的情况下,最好:

  1. 将 MySQL 作为主数据存储,ES 作为第二存储,使用 ES 专门应对较苛刻的搜索需求;
  2. 使用 MySQL 处理关联和事务。

一种架构方案

MySQL与ES同步的一种架构

Kafka

  1. 将数据集合与存储解耦
  2. 增强数据通道 pipeline 的可靠性(Kafka 的一些特性,比如消息队列持久化、重试)
  3. 增强数据通道 pipeline 的可伸缩性(增加 broker、并行的 consumer/producer)

Data Transformation

  1. 增加传输规则(校验、enrichment、denormalization、rollup)
  2. 将数据写到 ES 的索引上
  3. 错误处理(网络问题、ES 负载/超时问题、映射冲突)
  4. 多个 worker(消息处理者)提高整体吞吐量
  5. 提供实时和异步的 worker

ES

  1. 基于文档的数据库
    • 灵活(mapping 是动态的)
    • 数据结构丰富(JSON、嵌套对象)
    • 和现有系统整合容易(REST API)
  2. 查询效率高
    • 分片可以灵活地在节点间分配
    • (没看懂)Rolling indexes for Time series data == querying only the indexes needed (versus entire MySQL table)
  3. 搜索
    • 丰富的内置查询
    • 强大的聚合 & 子聚合
  4. 伸缩性
    • 对分片和索引的控制
    • 可以通过增加节点和集群来水平伸缩
    • 打包旧的数据/索引很方便,可以容易地释放系统资源
  5. 能适应快速的产品需求迭代

ES 缺点

  1. 数据更新比较复杂(update by query、upsert、script security issues)
  2. 不是真的 schema-less
    Elasticsearch 是一个 schema-less 的系统,但并不代表 no shema,而是会尽量根据 JSON 源数据的基础类型猜测你想要的字段类型映射。
  3. 重新索引比较耗时(添加域、映射冲突)
  4. 仍然需要在应用中对索引进行管理(映射、setting、template、naming pattern、data retention、backup/restore)
  5. 操作 ES 更花力气(部署、配置、性能优化、监控)

下一步

  1. 更多索引管理
    • 更好地支持索引的不同 type
    • 增加 API 和工具
    • 避免过度分片,可能引起集群稳定性问题
  2. 更多关注 update 操作
    • 通过 update by query/script 更新域
    • 更快的重新索引(添加新域、更改旧域映射)
    • slow update/reindexing can affect other system operations/transactions
    • Data denormalization vs joins
  3. 更完善的生产环境监控

ES集群的启动源码分析

节点的启动

启动节点前期主要是解析各种参数、检查内部环境(Lucene)等,我主要感兴趣的是初始化完毕后对各子模块的加载过程,子模块的start方法基本就是初始化内部数据、创建线程池、启动线程池等操作。

  1. 入口
    Elasticsearch.main
  2. 初始化
    Bootstrap.init
  3. 检测外部环境
    Bootstrap#setup
    初始化 node 时重写 validateNodeBeforeAcceptingRequests,在其中包含了检查逻辑
    -> BootstrapChecks.check
  4. 启动各子模块
    Bootstrap.start
    调各子模块的start来初始化。
  5. 启动集群
    nodeConnectionsService.start()
  6. 启动网络监听
    开始接收客户端的连接请求
    transportService.start()
  7. 启动发现
    discovery.start()
  8. 开始加入集群
    discovery.startInitialJoin()

节点启动流程大致如上所示,其中重要的几点,这里接下来再展开说说。

keepalive

唯一的用户线程,作用是保持进程运行。
Bootstrap#keepAliveThread

节点关闭

Bootstrap#stop
-> IOUtils#close:关闭 node,注意 Node 实现了 Closable
-> Node#close

选举主节点源码分析

ES选主算法是基于Bully算法实现的,Bully算法简单地说就是:

  1. 每个节点都有一个ID;
  2. 每次选出集群内现存最大的ID作为新的Master节点。

Bully算法存在的问题:

  • 在ID最大的节点不稳定的场景下会有问题。
    主要是ID最大的节点挂了又重启,会触发集群重复选择该节点为Master。
    ES通过推迟选举来解决这个问题,只要当前主节点不挂掉就不重新选主,但是容易产生脑裂(双主),脑裂问题在ES中是通过“法定得票人数过半”来解决的。

ES选主的实际算法实现叫做ZenDiscovery,其流程如下:

选举相关概念

  • 选举(Election)
  • 主从(主:Leader、Coordinator、Master,从:Follower、Slave)
  • 分布式哈希表(DHT)
  • 多数派(法定人数、Quorum)
  • 分区和脑裂:集群中出现双主或多主。
  • Bully 算法
  • Paxos 算法

配置

  • node.master:决定当前节点是否具备 Master 资格
  • discovery.zen.minimum_master_nodes
  • discovery.zen.ignore_non_master_pings

本地节点实例的创建

管理节点配置
Node#localNodeFactory

选主流程

ZenDiscovery 的选主流程如下:

  1. 每个节点计算最小的已知节点 ID,该节点为临时 Master,向该节点发送领导投票;
  2. 如果一个节点收到足够多的票数,并且该节点也为自己投票,那么它将扮演领导者的角色,开始发布集群状态。

选举临时 Master

Node#start
-> ZenDiscovery#startInitialJoin
-> ZenDiscovery#JoinThreadControl#startNewThreadIfNotRunning
-> threadPool.generic().execute:使用 generic 线程池执行选主流程
-> ZenDiscovery#innerJoinCluster:加入集群
-> ZenDiscovery#findMaster:查找当前集群的活跃 Master,或者从候选者中选择新的 Master
-> ZenDiscovery#pingAndWait:ping 一下所有节点(除了本节点),获取 pingResponses
-> 构建activeMasters列表,将每个节点所认为的当前 Master 节点加入 activeMasters 列表
-> 构建masterCandidates列表,从 pingResponses 列表中去掉不具备 Master 资格的节点
-> 如果 activeMasters 为空,则从 masterCandidates 中选举,否则从 activeMasters 中选择最合适的作为 Master

从 masterCandidates 选主

activeMasters 为空一般发生在集群刚启动或大规模重启的情况下。

ZenDiscovery#findMaster
-> ElectMasterService#hasEnoughCandidates:当前候选人数是否达到法定人数,若未达到则直接令选举失败
-> ElectMasterService#electMaster:当候选者达到法定人数后,从中选出一个作为 Master,选择前需要先用自定义比较函数进行排序
-> MasterCandidate.compare 自定义的排序逻辑

排序条件:

  1. 版本号大的优先;
  2. 具备 Master 资格的优先;
  3. 节点 ID 小的优先。

从 activeMasters 列表中选主

此时列表中存储着集群当前活跃的 Master,从这些已知的 Master 节点中选择一个作为选举结果。

ZenDiscovery#findMaster
-> ElectMasterService#tieBreakActiveMasters:使用自定义比较函数排序后取第一个
-> ElectMasterService#compareNodes

排序条件:

  1. 具备 Master 资格的优先;
  2. 节点 ID 小的优先。

收集投票进行统计

ZenDiscovery#handleJoinRequest
-> NodeJoinController#handleJoinRequest
-> NodeJoinController.ElectionContext#addIncomingJoin 将收到的连接存储到 NodeJoinController.ElectionContext#joinRequestAccumulator 中

NodeJoinController.ElectionContext#getPendingMasterJoinsCount:节点检查收到的投票是否足够时,就是检查加入它的连接数是否足够,其中会去掉没有 Master 资格节点的投票

加入集群

如果按以上逻辑选举出的临时 Master 是本节点:

  1. 等待足够多的具备 Master 资格的节点加入本节点,直到投票达到法定人数,完成选举;
  2. 超时(默认 30 秒且可配置)后还没有满足数量的 join 请求,则选举失败,需要进行新一轮选举;
  3. 成功后发布新的 clusterState。

ZenDiscovery#innerJoinCluster
NodeJoinController#waitToBeElectedAsMaster

如果按以上逻辑选举出的临时 Master 并非本节点:

  1. 不再接受其他节点的 join 请求;
  2. 向 Master 发送 join 请求,并等待回复。超时时间默认为 1 分钟(可配置),如果遇到异常,则默认重试 3 次(可配置)。
  3. 最终当选的 Master 会先发布集群状态,再确认客户的 join 请求,因此,joinElectedMaster 返回代表收到了 join 请求的确认,并且已经收到了集群状态。本步骤检查收到的集群状态中的 Master 节点如果为空,或者当选的 Master 不是之前选择的节点,则重新选举。

ZenDiscovery#innerJoinCluster
-> ZenDiscovery#joinElectedMaster