Tallate

该吃吃该喝喝 啥事别往心里搁

两地三中心

为什么要建设多个 IDC(数据中心)

灾备。

为什么是两地三中心(同城+跨城)

最大可能性地避免天灾人祸。
但距离过远、数据传输速度慢,同时会带来数据一致性问题。

这体现了 CAP 原理:一致性(C)与高可用(A)不可兼得。

部署方式

全量灾备例子:新浪的弹性伸缩服务(back up、单元化)
部分灾备例子:支付宝早期架构

研发

业务分级

  1. SLA、SLO、SLI
  2. 为什么要做业务分级
    便于做灾备。

SRE(现已归入 devops 体系)

  1. 监控体系
    IDC
    网络
    基础服务
    应用服务
    流量
    安全
    用户(agent、听云、白山云)
  2. 灾备
  3. fire-help
    发现(完备的监控系统、混沌工程)、恢复(回滚)、解决、复盘

数据
实时性(有些业务一定要实时,有些不需要)

怎么做

  1. 调研方案
  2. 可行性分析
  3. 形成多种方案
    • 进行技术选型
    • 成本分析(人力、金钱)
    • 风险分析
    • 形成一套方案
  4. 决策

成本预算

确定资源

  1. 金钱待遇
  2. 业务
  3. 职级(向谁汇报)

立项

  1. 计算规划
  2. 分配资源
  3. 时间节点(中期汇报、日报)
  4. 风险点

启动

  1. 安排任务
  2. 通过按计划完成任务来提升自我的影响力

验收

  1. 干系方的验收
    包括让 QA、产品经理验收。
  2. 非干系方的验收
    比如让媒体报道,提升我们公司的影响力。
  3. 形成闭环
    review 计划,看是否达到了预期。

常用操作 - 映射(Mapping)

映射的定义

映射就像数据库中的 schema ,描述了数据在每个字段内如何存储,包括文档可能具有的字段或 属性 、 每个字段的数据类型—比如 string, integer 或 date —以及 Lucene 是如何索引和存储这些字段的。
Lucene 也没有映射的概念,映射是 Elasticsearch 将复杂 JSON 文档 映射 成 Lucene 需要的扁平化数据的方式。

  • 比如下面的索引名叫 data,其中定义了 people 和 transactions 类型:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    {
    "data": {
    "mappings": {
    "people": {
    "properties": {
    "name": {
    "type": "string",
    },
    "address": {
    "type": "string"
    }
    }
    },
    "transactions": {
    "properties": {
    "timestamp": {
    "type": "date",
    "format": "strict_date_optional_time"
    },
    "message": {
    "type": "string"
    }
    }
    }
    }
    }
    }
    会被转换为类似下面的映射保存:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    {
    "data": {
    "mappings": {
    "_type": {
    "type": "string",
    "index": "not_analyzed"
    },
    "name": {
    "type": "string"
    }
    "address": {
    "type": "string"
    }
    "timestamp": {
    "type": "long"
    }
    "message": {
    "type": "string"
    }
    }
    }
    }
    所以虽然创建一个文档后其类型就确定了,但是实际上这个文档所占用的空间是该索引内所有字段的总和
    所以有一条建议:一个索引中的类型应当都是相似的,他们有类似的字段,比如 man 和 woman 共享 name 属性;如果两个类型的字段集互不相同,创建一个 类型的文档后将浪费很多空间,而是应该将他们分到不同的索引中。

动态映射机制

在索引一个新的文档时,es 会自动为每个字段推断类型,这个过程称为动态映射。这意味着如果你通过引号( “123” )索引一个数字,它会被映射为 string 类型,而不是 long 。但是,如果这个域已经映射为 long ,那么 Elasticsearch 会尝试将这个字符串转化为 long ,如果无法转化,则抛出一个异常。

1
2
3
4
5
6
7
8
PUT movies
{
"mappings": {
"_doc": {
"dynamic": "false" // true, false, strict
}
}
}

如果是新增字段

  • 如果Dynamic设置为true,一旦有新增字段的文档写入,Mapping会被同时更新
  • 如果Dynamic设置为false,Mapping不会被更新,新增字段的数据无法被索引,但是信息会出现在_source中
  • 如果Dynamic被设置为strict,则文档写入失败

如果是更新字段

  • 对已有字段,一旦有数据写入,就不再支持修改字段定义
  • 如果希望改变字段类型,必须reindex重建索引

数组

ES不提供专门的数组类型,但是每个字段都可以包含多个相同类型的数值,所以ES天然是支持数组类型的。

多字段类型

有时候我们希望一个字段可以被多种方式检索,比如:

  • 通过不同语言检索
  • pinyin字段的搜索
  • 还支持为搜索和索引指定不同的analyzer

一些默认的映射

布尔型: true 或者 false | boolean
整数: 123 | long
浮点数: 123.45 | double
字符串,有效日期: 2014-09-15 | date
字符串: foo bar | string
整数 : byte, short, integer
浮点数: float

自定义映射

TODO
全文字符串域和精确值字符串域的区别
使用特定语言分析器
优化域以适应部分匹配
指定自定义数据格式

查看映射

1
2
GET /megacrp/_mapping
GET /megacrp/employee/_mapping

返回属性包括:

  • index 属性控制怎样索引字符串
    • analyzed 分析字符串再索引(全文索引),字符串且只有字符串可以取这个属性
    • not_analyzed 不分析、直接索引精确值
    • no 不索引、不能被搜索到
  • analyzer 对于 analyzed 字符串域,用 analyzer 属性指定在搜索和索引时使用的分析器,默认时 standard

更新映射

  • 可以通过更新一个映射来添加一个新域,并为其设置映射(后来版本取消了 string 类型,改成了text,要注意
  • 不能将一个存在的域从 analyzed 改为 not_analyzed。因为如果一个域的映射已经存在,那么该域的数据可能已经被索引。如果你意图修改这个域的映射,索引的数据可能会出错,不能被正常的搜索。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    PUT /gb
    {
    "mappings": {
    "testmapping" : {
    "properties" : {
    "tweetgjghjggh" : {
    "type" : "text",
    "analyzer": "english"
    },
    "date" : {
    "type" : "date"
    },
    "user_id" : {
    "type" : "long"
    }}}}}

常用操作 - 对象(文档 Document)

文档

  • ES是面向文档的,文档是所有可搜索数据的最小单位。
  • 文档会被序列化成JSON格式,保存在ES中;
    JSON对象由字段组成,每个字段都有对应的字段类型。
  • 每个文档都有一个Unique ID。
    这个Unique ID可以自己指定或由ES自动生成。

文档和字段 - Document、Field

一个文档是一个可被索引的基础信息单元,文档以 JSON 格式来表示。
在一个 index/type 里面,可以存储任意多的文档,每个文档都有唯一 id。
每个文档包含多个字段(fields),即 json 数据里的字段。

文档元数据

一个文档不仅仅包含它的数据,也包含 元数据 —— 有关 文档的信息。 三个必须的元数据元素如下:

  • _index
    一个 索引 应该是因共同的特性被分组到一起的文档集合。
    索引名字必须小写,不能以下划线开头,不能包含逗号。
  • _type
    Lucene 没有文档类型的概念,而是使用一个元数据字段_type 文档表示的对象类别,数据可能在索引中只是松散的组合在一起,但是通常明确定义一些数据中的子分区是很有用的,不同 types 的文档可能有不同的字段,但最好能够非常相似。
    一个 _type 命名可以是大写或者小写,但是不能以下划线或者句号开头,不应该包含逗号, 并且长度限制为 256 个字符。
    当我们要检索某个类型的文档时, Elasticsearch 通过在 _type 字段上使用过滤器限制只返回这个类型的文档。
  • _id
    文档唯一标识,和 _index 以及 _type 组合就可以唯一确定 Elasticsearch 中的一个文档。
    id 也可以由 Elasticsearch 自动生成。
  • _version
    在 Elasticsearch 中每个文档都有一个版本号。当每次对文档进行修改时(包括删除), _version 的值会递增。这个字段用来确保这些改变在跨多节点时以正确的顺序执行。
    版本号——不管是内部的还是引用外部的——都必须是在(0, 9.2E+18)范围内的一个 long 类型的正数。
  • _source
    即索引数据时发送给 Elasticsearch 的原始 JSON 文档。
  • _score
    相关性打分
  • _all
    整合所有字段内容到该字段,已被废除。

文档属性

文档里有几个最重要的设置:

  • type
    字段的数据类型,例如 string 或 date
  • index
    字段是否应当被当成全文来搜索(analyzed),或被当成一个准确的值(not_analyzed),还是完全不可被搜索( no )
  • analyzer
    确定在索引和搜索时全文字段使用的 analyzer
  • _source
    存储代表文档体的 JSON 字符串,和所有被存储的字段一样, _source 字段在被写入磁盘之前先会被压缩。这个字段有以下作用:
    1. 搜索结果包括了整个可用的文档——不需要额外的从另一个的数据仓库来取文档。
    2. 如果没有 _source 字段,部分 update 请求不会生效。
    3. 当你的映射改变时,你需要重新索引你的数据,有了_source 字段你可以直接从 Elasticsearch 这样做,而不必从另一个(通常是速度更慢的)数据仓库取回你的所有文档。
    4. 当你不需要看到整个文档时,单个字段可以从 _source 字段提取和通过 get 或者 search 请求返回。
      1
      2
      3
      4
      5
      GET /_search
      {
      "query": { "match_all": {}},
      "_source": [ "title", "created" ]
      }
    5. 调试查询语句更加简单,因为你可以直接看到每个文档包括什么,而不是从一列 id 猜测它们的内容。
      也可以调用下面的映射来禁用_source 字段:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      PUT /my_index
      {
      "mappings": {
      "my_type": {
      "_source": {
      "enabled": false
      }
      }
      }
      }

对象和文档

通常情况下,我们使用的术语 对象 和 文档 是可以互相替换的。不过,有一个区别:
一个对象仅仅是类似于 hash 、 hashmap 、字典或者关联数组的 JSON 对象,对象中也可以嵌套其他的对象。 对象可能包含了另外一些对象。
文档指最顶层或者根对象,这个根对象被序列化成 JSON 并存储到 Elasticsearch 中,指定了唯一 ID 及一些必须的文档元数据。

根对象

映射的最高一层被称为 根对象 ,它可能包含下面几项:

  • 一个 properties 节点,列出了文档中可能包含的每个字段的映射
  • 各种元数据字段,它们都以一个下划线开头,例如 _type 、 _id 和 _source
  • 设置项,控制如何动态处理新的字段,例如 analyzer 、 dynamic_date_formats 和 dynamic_templates
  • 其他设置,可以同时应用在根对象和其他 object 类型的字段上,例如 enabled 、 dynamic 和 include_in_all

操作类型

文档的CRUD:

  • Index
    Index操作——如果ID不存在——则创建新的文档,否则删除现有的再创建新的,版本号会增加
    PUT my_index/_doc/1
  • Create
    Create操作——如果ID已经存在——会失败
    PUT my_index/_create/1
    不指定ID,自动生成
    POST my_index/_doc
  • Read
    GET my_index/_doc/1
  • Update
    文档必须已经存在,更新只会对相应字段做增量修改
    POST my_index/_update/1
  • Delete
    my_index/_doc/1

常见返回

问题 原因
无法连接 网络故障或集群挂了
连接无法关闭 网络故障或节点出错
429 集群过于繁忙
4xx 请求体格式有问题
500 集群内部错误

更新 - PUT

更新现有的对象需要自己指定对象的 id,如果不存在将自动创建一个,文档更新后_version 字段的值也会相应提高。在内部,Elasticsearch 已将旧文档标记为已删除,并增加一个全新的文档。 尽管你不能再对旧版本的文档进行访问,但它并不会立即消失。当继续索引更多的数据,Elasticsearch 会在后台清理这些已删除文档。
检索 和 重建索引 步骤的间隔越小,变更冲突的机会越小。 但是它并不能完全消除冲突的可能性。 还是有可能在 update 设法重新索引之前,来自另一进程的请求修改了文档。为了避免数据丢失, update API 在 检索 步骤时检索得到文档当前的 _version 号,并传递版本号到 重建索引 步骤的 index 请求。 如果另一个进程修改了处于检索和重新索引步骤之间的文档,那么 _version 号将不匹配,更新请求将会失败。为了实现版本号控制只需要在请求参数中加入 version(如上所示)。

1
2
3
4
5
6
PUT /website/blog/123
{
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}

如果已经有自己的 _id 、而又想执行创建,那么我们必须告诉 Elasticsearch ,只有在相同的 _index 、 _type 和 _id 不存在时才接受我们的索引请求——而不是覆盖掉,有两种方式:

1
2
3
4
5
6
# 指定ID的index操作,其实是个upsert操作
PUT /website/blog/123?op_type=create
{ ... }
# 创建一个文档
PUT /website/blog/123/_create
{ ... }

文档是不可变的:他们不能被修改,只能被替换。 update API 必须遵循同样的规则。 从外部来看,我们在一个文档的某个位置进行部分更新。然而在内部, update API 简单使用与之前描述相同的 检索-修改-重建索引 的处理过程。 区别在于这个过程发生在分片内部,这样就避免了多次请求的网络开销。通过减少检索和重建索引步骤之间的时间,我们也减少了其他进程的变更带来冲突的可能性

创建 - POST

不需要指定对象 id,由 Elasticsearch 自动生成,自动生成的 ID 是 URL-safe、 基于 Base64 编码且长度为 20 个字符的 GUID 字符串。 这些 GUID 字符串由可修改的 FlakeID 模式生成,这种模式允许多个节点并行生成唯一 ID ,且互相之间的冲突概率几乎为零。

1
2
3
4
5
6
POST /website/blog/
{
"title": "My second blog entry",
"text": "Still trying this out...",
"date": "2014/01/01"
}

部分更新 - POST

update 请求最简单的一种形式是接收文档的一部分作为 doc 的参数, 它只是与现有的文档进行合并。对象被合并到一起,覆盖现有的字段,增加新的字段。

1
2
3
4
5
6
7
8
# 文档必须已经存在
POST /website/blog/1/_update
{
"doc" : {
"tags" : [ "testing" ],
"views": 0
}
}

使用脚本部分更新文档:脚本可以在 update API 中用来改变 _source 的字段内容, 它在更新脚本中称为 ctx._source ,运行在一个沙盒内,默认使用 Painless 语言作为脚本语言。下面这个脚本在页面不存在时执行新增并初始化 views=1(第一次运行这个请求时, upsert 值作为新文档被索引,初始化 views 字段为 1 ;在后续的运行中,由于文档已经存在, script 更新操作将替代 upsert 进行应用,对 views 计数器进行累加)、页面被浏览 2 次后执行删除,其他情况浏览量+1 并添加一个新标签:

1
2
3
4
5
6
7
8
9
10
11
12
13
POST /website/blog/zVmOW2EBsZ0GEqF92yf6/_update
{
"script" : {
"source" : "if(ctx._source.views == params.count) { ctx.op = 'delete'} ctx._source.views+=1; ctx._source.tags.add(params.new_tag)",
"params" : {
"new_tag" : "search",
"count": 2
}
},
"upsert": {
"views": 1
}
}

重试
正如之前所说,update 操作是检索-修改-重新索引的过程, 检索 和 重建索引 步骤的间隔越小,变更冲突的机会越小。 但是它并不能完全消除冲突的可能性。 还是有可能在 update 设法重新索引之前,来自另一进程的请求修改了文档。为了避免数据丢失, update API 在 检索 步骤时检索得到文档当前的 _version 号,并传递版本号到 重建索引 步骤的 index 请求。 如果另一个进程修改了处于检索和重新索引步骤之间的文档,那么 _version 号将不匹配,更新请求将会失败。
对于部分更新的很多使用场景,文档已经被改变也没有关系。 例如,如果两个进程都对页面访问量计数器进行递增操作,它们发生的先后顺序其实不太重要; 如果冲突发生了,我们唯一需要做的就是尝试再次更新。这可以通过 设置参数 retry_on_conflict 来自动完成, 这个参数规定了失败之前 update 应该重试的次数,它的默认值为 0

1
2
3
4
5
6
7
POST /website/blog/zVmOW2EBsZ0GEqF92yf6/_update?retry_on_conflict=5 
{
"script" : "ctx._source.views+=1",
"upsert": {
"views": 0
}
}

GET(搜索)

在请求的查询串参数中加上 pretty 参数,这将会调用 Elasticsearch 的 pretty-print 功能,该功能 使得 JSON 响应体更加可读,但其中的 _source 字段并不是被当成字符串打印出来,而是格式化成了 JSON 串:

1
2
3
GET /website/blog/123?pretty
GET /website/blog/123/_source
GET /website/blog/123?_source=title,text

将多个请求合并成一个,避免单独处理每个请求花费的网络延时和开销。 如果你需要从 Elasticsearch 检索很多文档,那么使用 multi-get 或者 mget API 来将这些检索请求放在一个请求中,将比逐个文档请求更快地检索到全部文档。
mget API 要求有一个 docs 数组作为参数,每个 元素包含需要检索文档的元数据, 包括 _index 、 _type 和 _id 。如果你想检索一个或者多个特定的字段,那么你可以通过 _source 参数来指定这些字段的名字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
GET /_mget
{
"docs" : [
{
"_index" : "website",
"_type" : "blog",
"_id" : "zVmOW2EBsZ0GEqF92yf6"
},
{
"_index" : "website",
"_type" : "blog",
"_id" : 1,
"_source": "views"
}
]
}
GET /website/blog/_mget
{
"ids" : [ "2", "1" ]
}

HEAD(ping)

如果只想检查一个文档是否存在——根本不想关心内容——那么用 HEAD 方法来代替 GET 方法。

1
HEAD /website/blog/124

DELETE(删除)

1
DELETE /website/blog/123

bulk(批量操作)

每一行——包括最后一行——都必须以换行符结尾,格式如下所示:

1
2
3
4
{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n

action/metadata 行指定 哪一个文档 做 什么操作 。action 必须是以下选项之一:
create:如果文档不存在,那么就创建它。类似POSTPUT /_create
index:创建一个新文档或者替换一个现有的文档。类似POSTPUT
update:部分更新一个文档。类似POST /_update
delete:删除一个文档。类似DELETE
metadata 应该 指定被索引、创建、更新或者删除的文档的 _index 、 _type 和 _id ,每个请求的 metadata 都会覆盖请求 URL 中带上的默认元数据。
request body 行由文档的 _source 本身组成–文档包含的字段和值。它是 index、create、update 操作所必需的。
为什么不直接用一个 JSON 数组来保存?主要是考虑效率问题,解析为数组需要有更多的 RAM 空间,且 JVM 要花时间进行 gc。而直接使用原始数据只需要多注意每条数据之间的间隔(换行符)。
每个子请求都是独立执行,因此某个子请求的失败不会对其他子请求的成功与否造成影响。 如果其中任何子请求失败,最顶层的 error 标志被设置为 true ,并且在相应的请求报告出错误明细。这也意味着 bulk 请求不是原子的: 不能用它来实现事务控制。每个请求是单独处理的,因此一个请求的成功或失败不会影响其他的请求。

1
2
3
4
5
6
7
8
POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "My first blog post" }
{ "index": { "_index": "website", "_type": "blog" }}
{ "title": "My second blog post" }
{ "update": { "_index": "webiite", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }

批量请求的大小有一个最佳值,大于这个值,性能将不再提升,甚至会下降。 但是最佳值不是一个固定的值,它完全取决于硬件、文档的大小和复杂度、索引和搜索的负载的整体情况
幸运的是,很容易找到这个 最佳点 :通过批量索引典型文档,并不断增加批量大小进行尝试。 当性能开始下降,那么你的批量大小就太大了。一个好的办法是开始时将 1,000 到 5,000 个文档作为一个批次, 如果你的文档非常大,那么就减少批量的文档个数。并且请求的文档也最好不要太大,一个好的批量大小在开始处理后所占用的物理大小约为 5-15 MB。

写入(POST、PUT、DELETE)流程源码分析

refresh和flush - 实时性和可靠性之间的权衡

刷盘流程

近实时性

ES中数据写入后并不能被马上查到,而是必须先执行refresh,默认是1s,最快可到100ms。

可靠性

搜索系统对可靠性要求都不高,一般数据的可靠性通过将原始数据存储在另一个存储系统来保证,当搜索系统的数据发生丢失时,再从其他存储系统导一份数据过来重新rebuild就可以了。
ES采用多副本模型,可以避免单机发生故障时丢失数据,但是ES同时为了提升读写性能,一般是每隔一段时间才会把Lucene的Segment flush到磁盘实现持久化,这样减少了磁盘IO,但是数据未flush期间,如果发生了宕机就很容易导致数据的丢失。对于这个问题,ES中的解决方法类似数据库中的CommitLog,ES中引入了一个TransLog。
可以通过设置TransLog的Flush频率来控制写入缓存的数据什么时候刷到磁盘上,要么是按请求,每次请求都Flush;要么是按时间,每隔一段时间Flush一次。一般为了性能考虑,会设置为每隔5秒或者1分钟Flush一次,Flush间隔时间越长,可靠性就会越低。

ES的刷盘流程

之前我们已经讨论了数据如何定位到某个node、某个shard。

  1. 在每个shard上,数据会先写入Lucene,此时数据还在内存里;
    写Lucene内存后还不可被搜索,需要先通过Refresh将内存对象转成完整Segment后,再次reopen后才可被搜索。
    但是简单的Get操作是GetById的,这种查询可以直接从TransLog中查询,因此这种情况下是实时的。
  2. 接着去写TransLog,写完TransLog后会刷新TransLog数据到磁盘上;

    和数据库不同,数据库是先写CommitLog再写内存,而ES是先写内存(Lucene)再写TransLog,原因是Lucene的内存写入有很复杂的逻辑,比如分词、字段长度超过限制等,很容易失败,为了避免TransLog中有大量无效记录,减少recover的复杂度和提高速度,所以把写Lucene放到了前面。

  3. 等到TransLog数据被刷新到磁盘上后,返回写成功给用户。
  4. 隔一段比较长的时间后,Lucene会把内存中新生成的Segment Flush到磁盘,之后就会把TransLog清空掉。

ES会丢失数据吗?

  • Lucene每隔1秒生成Segment文件,此时Segment还在缓存中,还未刷盘,如果这时挂掉,内存中的数据仍然可以从TransLog中恢复
  • TransLog中的数据是每隔5秒刷新到磁盘,显然这还不能保证数据安全,最多会导致丢失TransLog中5秒内的数据,可以通过配置增加TransLog刷磁盘的频率来增加数据可靠性,但是会对性能有比较大的影响
  • 即使Master分片所在节点宕掉,导致TransLog丢失了,仍然可以从副本恢复

文档更新(部分更新)

Lucene中不支持文档的部分更新,因此需要在Elasticsearch中实现该功能:

  1. 收到Update请求后,从Segment或TransLog中读取该id的完整文档,记录版本为V1;
  2. 将版本V1的文档和请求中的部分字段文档合并,同时更新内存中的versionMap,得到V2,之后Update请求就变成了对V2的Index请求;
  3. 加锁;
  4. 再次从versionMap中读取该id的最大版本号V2,如果没有再从Segment或TransLog中读取,但是versionMap中基本都可以获取到;
  5. 检查版本是否冲突(V1和V2),如果冲突则回退到开始的Update阶段重新执行,否则继续执行Index请求;
  6. 在Index阶段,首先版本+1得到V3,再将文档加入到Lucene中去,Lucene中会删除同id的旧文档,然后再新增文档。写入成功后,将V3更新到versionMap中;
  7. 释放锁。

文档操作类型

文档操作分为单个的(Index)和批量的(Bulk),它们最终都会被统一封装为批量操作请求(BulkRequest)。

请求入口

在ES中,所有action的入口都注册在ActionModule中,比如Bulk Request有两个注册入口:

1
2
3
4
actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);

registerHandler.accept(new RestBulkAction(settings, restController));

对于Rest请求,会在RestBulkAction中解析请求,并最终转换成TransportAction处理。

比如对请求:localhost:9200/website/blog/123

1
2
3
4
5
{
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}

会先被dispatch到RestIndexAction,然后转发给TransportBulkAction#doExecute,下面对文档写入流程的分析也将从这个入口开始。

文档写入流程

由上边对ES数据模型的讨论可知,ES文档的写入必须是先成功写入到主分片,然后才能复制到相关的副分片。
多节点多分片模型

  1. 第一个接收请求的节点是协调节点;
  2. 先根据_routing规则选择发给哪个shard(分片);
    优先使用IndexRequest中的设置,其次使用mapping中的配置,如果都没有则使用_id作为路由参数;
  3. 从集群的meta中找出该shard的节点,此时,请求会被转发到primary shard所在的节点;
  4. 请求接着发送给primary shard执行写操作;
  5. primary shard执行成功后再发送给多个replica shard;
  6. 请求在多个replica shard上执行成功并返回给协调节点后,写入执行成功,协调节点返回结果给客户端。

从上述写入的概述可知,写入流程具体的,可以分为协调节点主分片节点副分片节点三种角色的写入过程。
ES写入流程

协调节点处理流程

1、自动创建索引
入口:TransportBulkAction#doExecute
找出请求中需要自动创建的索引

1
2
3
4
5
6
7
8
9
10
11
12
for (String index : indices) {
boolean shouldAutoCreate;
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
autoCreateIndices.add(index);
}
}

执行创建索引的请求:

1
2
3
4
5
6
7
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
}

2、路由请求
入口:TransportBulkAction.BulkOperation#doRun
不同类型的请求路由逻辑也不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
switch (docWriteRequest.opType()) {
// 创建索引、mapping请求
case CREATE:
case INDEX:
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
Version indexCreated = indexMetaData.getCreationVersion();
indexRequest.resolveRouting(metaData);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
// 更新文档请求
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
// 删除文档操作
case DELETE:
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}

然后计算文档ID的hash值,将其分配给对应的shard:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 根据文档ID分配给对应的shardId
// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}

if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
return;
}

3、轮询分片,分发请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
// 当前节点ID
String nodeId = clusterService.localNode().getId();
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
// 对每个分片
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
// 创建该分片的批量操作请求
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
// 执行该请求
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
...
}

4、将分片请求发往节点
代码入口:TransportReplicationAction.ReroutePhase#doRun
将请求路由到主分片所在的节点上,并重试失败的操作。

1
2
3
4
5
6
7
8
9
10
11
12
// 找到主分片
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
// 主分片在当前节点就直接本地执行,否则就调用该远程节点执行
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
}

主分片节点处理流程

如上所述,协调节点会将请求发送给主分片所在节点,该节点接收请求,并执行该请求对应的处理器。
消息接收入口:TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived
主节点执行逻辑:ReplicationOperation#execute
1、判断活跃的shard是否足够
代码入口:ReplicationOperation#checkActiveShardCount
活跃的分片越多,执行写入后同步的备份也越多,数据也越不容易丢失;默认为1,表示主分片可用就执行写入。
2、主分片执行
代码入口:ReplicationOperation.Primary#perform

1
2
3
4
5
6
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
+ "] with a primary failure [" + result.finalFailure + "]";
return result;
}

3、主分片执行索引操作
代码中需要对请求进行dispatch,TransportShardBulkAction#executeBulkItemRequest,以UPDATE操作为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
BulkItemRequest primaryItemRequest = request.items()[requestIndex];
assert primaryItemRequest.request() == updateRequest
: "expected bulk item request to contain the original update request, got: " +
primaryItemRequest.request() + " and " + updateRequest;

BulkItemResultHolder holder = null;
// There must be at least one attempt
// 保证至少执行一次,因此重试
int maxAttempts = Math.max(1, updateRequest.retryOnConflict());
for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) {

holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper,
nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater);

// It was either a successful request, or it was a non-conflict failure
if (holder.isVersionConflict() == false) {
return holder;
}
}
// We ran out of tries and haven't returned a valid bulk item response, so return the last one generated
return holder;
}

路径很长,最后调用了InternalEngine#index

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
* if we have an autoGeneratedID that comes into the engine we can potentially optimize
* and just use addDocument instead of updateDocument and skip the entire version and index lookupVersion across the board.
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
* that:
* - doesn't change per document
* - is preserved in the transaction log
* - and is assigned before we start to index / replicate
* NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
* case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
* for instance:
* - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
* - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
*
* while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
* - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
*
* if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
* documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
* While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
*
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
* updateDocument.
*/
final IndexingStrategy plan;

if (index.origin() == Operation.Origin.PRIMARY) {
plan = planIndexingAsPrimary(index);
} else {
// non-primary mode (i.e., replica or recovery)
plan = planIndexingAsNonPrimary(index);
}

final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else if (plan.indexIntoLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}

副分片执行

同样是在主节点的ReplicationOperation#execute中,需要调用副分片的写入接口。
1、调用副分片
代码入口:ReplicationOperation#performOnReplicas

1
2
3
4
5
6
7
8
9
10
11
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}

totalShards.incrementAndGet();
pendingActions.incrementAndGet();
// 发HTTP请求给副分片
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
...
}

2、副分片接收请求写入文档的流程与主分片基本一致。
消息接收入口是:TransportReplicationAction.ReplicaOperationTransportHandler#messageReceived

GET流程源码分析

实时性

Elasticsearch中的GET请求也能保证是实时的,因为GET请求会直接读内存中尚未Flush到磁盘的TransLog。
但是GET请求只支持通过doc_id进行查询,所以对于条件查询(Search)依然无法实现实时。

GET执行流程

GET指的是单个文档的查询请求,文档的唯一标识是ID,因此GET就是根据ID来找到一个文档。
ES集群中的节点分为协调节点数据节点,GET请求会先打到协调节点,然后转发到数据节点上;如果一个节点执行失败,则转发到其他节点上进行读取。
ES的GET流程

协调节点执行流程

代码入口:TransportSingleShardAction.AsyncSingleAction#perform
注意AsyncSingleAction构造方法中会先准备集群状态、节点列表等信息。
1、计算文档所在的shardid,即它所在的分片;
在构造方法中,会先根据请求进行路由:`TransportGetAction#shards

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;

ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
// 集群nodes列表
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}

String concreteSingleIndex;
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
// 解析请求,更新自定义routing
resolveRequest(clusterState, internalRequest);

blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
// 根据路由算法计算得到目的shard迭代器,或者根据优先级选择目标节点
this.shardIt = shards(clusterState, internalRequest);
}

2、发送请求
检查目标节点是不是本地节点,如果是则直接调本地的TransportService#sendLocalRequest;如果是远程节点则执行远程调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}

public Transport.Connection getConnection(DiscoveryNode node) {
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return transport.getConnection(node);
}
}

数据节点执行流程

代码入口:TransportSingleShardAction.ShardTransportHandler#messageReceived

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}

protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());

if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}

GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}

1、读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
final Collection<String> types;
// 处理_all选项
if (type == null || type.equals("_all")) {
types = mapperService.types();
} else {
types = Collections.singleton(type);
}

Engine.GetResult get = null;
for (String typeX : types) {
Term uidTerm = mapperService.createUidTerm(typeX, id);
if (uidTerm != null) {
// 调用Engine读取数据
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, typeX, id, uidTerm)
.version(version).versionType(versionType));
if (get.exists()) {
type = typeX;
break;
} else {
get.release();
}
}
}

...

// 过滤返回结果
// 根据type、id、DocumentMapper等信息从刚刚获取的信息中获取数据,对指定的field、source进行过滤
// 把结果存于GetResult返回
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
}

2、从InternalEngine读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public Engine.GetResult get(Engine.Get get) {
readAllowed();
return getEngine().get(get, this::acquireSearcher);
}

public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
// 处理realtime选项,判断是否需要刷盘
if (get.realtime()) {
VersionValue versionValue = null;
// versionMap写入索引的时候添加的,不会写入磁盘
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
// 版本是否冲突
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
// 执行刷盘操作
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}
// 调用Searcher读取数据
// no version, get the version from the index, we know that we refresh on flush
return getFromSearcher(get, searcherFactory, scope);
}
}
0%