ES5_3IngestNode

Ingest Node

Ingest Node提供了一种类似Logstash的功能:

  • 预处理能力,可拦截Index或Bulk API的请求
  • 对数据进行转换,并重新返回给Index或Bulk API
    比如为某个字段设置默认值、重命名某个字段的字段名、对字段值进行Split操作
    支持设置Painless脚本,对数据进行更加复杂的加工。

相对Logstash来说:

- Logstash Ingest Node
数据输入与输出 支持从不同的数据源读取,并写入不同的数据源 支持从ES REST API获取数据,并且写入ES
数据缓冲 实现了简单的数据队列,支持重写 不支持缓冲
数据处理 支持大量插件、支持定制开发 内置插件,支持开发Plugin(但是添加Plugin需要重启)
配置和使用 增加了一定的架构复杂度 无需额外部署

构建Ingest Node - Pipeline & Processor

ES-IngestNode

  • Pipeline
    管道会对通过的数据(文档),按照顺序进行加工
  • Processor
    对加工的行为进行抽象封装

创建pipeline

为ES添加一个Pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},

{
"set":{
"field": "views",
"value": 0
}
}
]
}

查看Pipeline:

1
GET _ingest/pipeline/blog_pipeline

测试Pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
  • 可以看到tags被拆分成了数组
  • 最终文档中新增了一个views字段

使用Pipeline更新文档:

1
2
3
4
5
6
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}

但是使用_update_by_query更新文档时可能会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
POST /tech_blogs/_update_by_query?pipeline=blog_pipeline
{
}

{
...
"failures": [
{
"index": "tech_blogs",
"type": "doc",
"id": "1",
"cause": {
"type": "exception",
"reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [tags] of type [java.util.ArrayList] cannot be cast to [java.lang.String]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "java.lang.IllegalArgumentException: field [tags] of type [java.util.ArrayList] cannot be cast to [java.lang.String]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "field [tags] of type [java.util.ArrayList] cannot be cast to [java.lang.String]"
}
},
"header": {
"processor_type": "split"
}
},
"status": 500
}
]
}

是因为对已经拆分过的字段再用split processor拆分,相当于要对数组类型的字段做字符串切分操作。
为了避免这种情况,可以通过加条件来忽略已经处理过的文档:

1
2
3
4
5
6
7
8
9
10
11
12
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "views"
}
}
}
}
}

构建pipeline

processor的种类比较多,这里列出一部分。

字段拆分 - split

ES的_ingest命令可以分析pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"
}
},
{
"_index": "index",
"_id": "idxx",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
  • pipeline中只有一个processor,它将文档的tags字段按”,”拆分为数组
  • 文档有一个tags字段,但是原始值中多个标签被拼成了一个字符串

字段值重置 - set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},

{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}

]
}
  • 添加文档时,使用processor set来增加一个新字段views