Tallate

该吃吃该喝喝 啥事别往心里搁

Doc Values / fielddata - 正排索引

在搜索的时候,我们能通过搜索关键词快速得到结果集。当排序的时候,我们需要倒排索引里面某个字段值的集合,此时倒排索引无法发挥作用。换句话说,我们需要 转置 倒排索引。转置 结构在其他系统中经常被称作 列存储 。实质上,它将所有单字段的值存储在单数据列中,这使得对其进行操作是十分高效的,例如排序。
ES有2种方法实现:

  • Fielddata(可以存储Text类型)
  • Doc Values(列式存储,对Text类型无效)
Doc Values Field data
何时创建 索引时,和倒排索引一起创建 搜索时动态创建
创建位置 磁盘文件 JVM Heap
优点 避免大量内存占用 索引速度快,不占用额外的磁盘空间
缺点 降低索引速度,占用额外磁盘空间 文档过多时,动态创建开销大,占用过多JVM Heap
缺省值 ES 2.x 之后 ES 1.x 及之前

当 working set 远小于节点的可用内存,系统会自动将所有的文档值保存在内存中,使得其读写十分高速; 当其远大于可用内存,操作系统会自动把 Doc Values 加载到系统的页缓存中,从而避免了 jvm 堆内存溢出异常。

关闭 Doc Values

Doc Value默认是启用的,可以通过Mapping设置关闭

1
2
3
4
5
6
7
8
9
PUT test_keyword/_mapping
{
"properties": {
"user_name": {
"type": "keyword",
"doc_values": false
}
}
}
  • 关闭有什么好处?增加索引速度、减少磁盘占用空间
  • 关闭会有什么问题?如果后续需要重新打开,则需要重建索引
  • 什么时候需要关闭?明确不需要做排序及聚合分析

排序与相关性

  • sort将目标字段转换为排序所需的格式,date 字段的值表示为自 epoch (January 1, 1970 00:00:00 UTC)以来的毫秒数,通过 sort 字段的值进行返回。
  • 如果字段是一个数组(多值),可以使用 mode 指定其中 min 、 max 、 avg 或是 sum 进行排序;
  • 评分的计算方式取决于 查询类型 不同的查询语句用于不同的目的: fuzzy 查询会计算与关键词的拼写相似程度,terms 查询会计算 找到的内容与关键词组成部分匹配的百分比,但是通常我们说的 relevance 是我们用来计算全文本字段的值相对于全文本检索词相似程度的算法,Elasticsearch 的相似度算法 被定义为检索词频率/反向文档频率(TF/IDF),包括以下内容:
    • 检索词频率
        检索词在该字段出现的频率?出现频率越高,相关性也越高。 字段中出现过 5 次要比只出现过 1 次的相关性高。
    • 反向文档频率
        每个检索词在索引中出现的频率?频率越高,相关性越低。检索词出现在多数文档中会比出现在少数文档中的权重更低。
    • 字段长度准则
        字段的长度是多少?长度越长,相关性越低。 检索词出现在一个短的 title 要比同样的词出现在一个长的 content 字段权重更大。
  • 单个查询可以联合使用 TF/IDF 和其他方式,比如短语查询中检索词的距离或模糊查询里的检索词相似度。如果多条查询子句被合并为一条复合查询语句 ,比如 bool 查询,则每个查询子句计算得出的评分会被合并到总的相关性评分中。
  • 字符串索引后(analusis)会有变化,排序时希望使用原字段(not_analyzed)进行排序,我们想要对同一个字段索引两次,而不是在_source 中保存两份字符串字段,这可以通过为字段添加一个 not_analyzed 子字段来实现:主字段用于搜索、子字段用于排序:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    "tweet": { 
    "type": "string",
    "analyzer": "english",
    "fields": {
    "raw": {
    "type": "string",
    "index": "not_analyzed"
    }
    }
    }
    "sort" : {
    "date": {
    "order": "desc",
    "mode": "min"
    }
    }
  • 默认情况下,返回结果是按相关性倒序排列的
  • 可以在查询参数中加入 explain 参数解释排序结果
    1
    2
    3
    4
    GET /_search?explain 
    {
    "query" : { "match" : { "tweet" : "honeymoon" }}
    }

分页

几种分页方式及应用场景

  • Regular
    平时查询ES只会返回头部的10条数据,一般用于实时获取顶部的部分文档,例如查询最新的订单。
  • Scroll
    需要全部文档时,例如导出全部数据
  • Pagination
    from + size的方式
    如果需要深度分页,则选用Search After

from 和 size(分页)、Search After

ES中的分页是从每个分片上获取from + size条数据,然后协调节点聚合所有结果,再选取前from + size条数据。
因为是from + size,所以from特别大时会有深分页问题
解决办法是Search After

1
2
3
4
5
6
7
8
9
10
11
12
POST users/_search
{
"size": 1,
"query": {
"match_all": {}
},
"search_after": [13, "idididid"],
"sort": [
{"age": "desc"},
{"_id": "asc"}
]
}

缺点是:

  • 不支持指定页数(From)
  • 只能往下翻

需要指定搜索sort:

  • 需要保证值是唯一的,可以加入_id保证唯一性
  • 每次查询使用上一次查询得到的最后一个文档的sort值进行查询(即上边的13和”idididid”)

Search After会通过唯一排序值定位,将每次要处理的文档数都控制在size个。

游标查询 Scroll

scroll 查询 可以用来对 Elasticsearch 有效地执行大批量的文档查询,而又不用付出深度分页那种代价。
游标查询允许我们 先做查询初始化,然后再批量地拉取结果。 这有点儿像传统数据库中的 cursor
游标查询会取某个时间点的快照数据。 查询初始化之后索引上的任何变化会被它忽略。 它通过保存旧的数据文件来实现这个特性,结果就像保留初始化时的索引 视图 一样。
深度分页的代价根源是结果集全局排序,如果去掉全局排序的特性的话查询结果的成本就会很低。 游标查询用字段 _doc 来排序。 这个指令让 Elasticsearch 仅仅从还有结果的分片返回下一批结果。
启用游标查询可以通过在查询的时候设置参数 scroll 的值为我们期望的游标查询的过期时间。 游标查询的过期时间会在每次做查询的时候刷新,所以这个时间只需要足够处理当前批的结果就可以了,而不是处理查询结果的所有文档的所需时间。 这个过期时间的参数很重要,因为保持这个游标查询窗口需要消耗资源,所以我们期望如果不再需要维护这种资源就该早点儿释放掉。 设置这个超时能够让 Elasticsearch 在稍后空闲的时候自动释放这部分资源。

1
2
3
4
5
6
GET /old_index/_search?scroll=1m // 保持游标查询窗口一分钟。
{
"query": { "match_all": {}},
"sort" : ["_doc"], // 关键字 _doc 是最有效的排序顺序。
"size": 1000
}

这个查询的返回结果包括一个字段 _scroll_id, 它是一个 base64 编码的长字符串。现在我们能传递字段 _scroll_id_search/scroll 查询接口获取下一批结果:

1
2
3
4
5
GET /_search/scroll
{
"scroll": "1m", // 注意再次设置游标查询过期时间为一分钟。
"scroll_id" : "cXVlcnlUaGVuRmV0Y2g7NTsxMDk5NDpkUmpiR2FjOFNhNnlCM1ZDMWpWYnRROzEwOTk1OmRSamJHYWM4U2E2eUIzVkMxalZidFE7MTA5OTM6ZFJqYkdhYzhTYTZ5QjNWQzFqVmJ0UTsxMTE5MDpBVUtwN2lxc1FLZV8yRGVjWlI2QUVBOzEwOTk2OmRSamJHYWM4U2E2eUIzVkMxalZidFE7MDs="
}

这个游标查询返回的下一批结果。 尽管我们指定字段 size 的值为 1000,我们有可能取到超过这个值数量的文档。 当查询的时候, 字段 size 作用于单个分片,所以每个批次实际返回的文档数量最大为 size * number_of_primary_shards
当没有更多结果返回的时候,我们就处理完所有匹配的文档了。

缺点:

  • Scroll会创建一个快照,如果查询期间有新的数据写入以后,无法被查到

算分优化 - Function Score Query

算分函数

ES提供了几种默认的计算分值的函数:

weight

设置权重

field_value_factor

使用某个数值修改_score的值,比如乘以某个系数
原算分乘以某个字段得到最终结果,比如下面就是乘以原文档中的count字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET doc/_search
{
"query": {
"function_score": {
"query": {
"match": {
"title": "a"
}
},
"field_value_factor": {
"field": "count"
}
}
}
}

还可以根据某个函数来计算评分,比如如下命令新算分 = 老算分 * log(1 + factor * count)

1
2
3
4
5
6
7
8
9
10
11
12
13
GET doc/_search
{
"query": {
"function_score": {
...
"field_value_factor": {
"field": "count",
"modifier": "log1p",
"factor": 0.1
}
}
}
}

random_score

为每一个用户使用一个不同的,随机算分结果
使用场景:让每个用户能看到不同的随机排名,但是也希望同一个用户访问时,结果的相对顺序保持一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET doc/_search
{
"query": {
"function_score": {
"query": {
"match": {
"title": "a"
}
},
"random_score": {
"seed": 911119
}
}
}
}

衰减函数

以某个字段的值为标准,距离某个值越近,得分越高

script_score

自定义脚本完全控制所需逻辑
elasticsearch painless脚本评分
Elasticsearch中使用painless实现评分

boost

  • boost mode
    multiply:默认方式,算分与函数值的乘积
    sum:算分与函数的和
    min / max:算分与函数取最小/最大值
    replace:使用函数值取代算分
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    GET doc/_search
    {
    "query": {
    "function_score": {
    "query": {
    "match": {
    "title": "a"
    }
    },
    "field_value_factor": {
    "field": "count"
    },
    "boost_mode": "sum"
    }
    }
    }
  • max boost
    将算分控制在一个最大值

算分原理

计算目标文档和origin之间的距离

  • NumericFieldDataScoreFunction#distance
    数值距离=max(0, |doubleValue - origin| - offset)
  • GeoFieldDataScoreFunction#distance:
    地域距离

衰减函数

  • GaussDecayFunction
    高斯衰减=e^(distance^2 / 2scale)

为什么使用HBase

HBase是开源版的BigTable。

  • 高性能的列式存储
  • 高可靠的弹性伸缩

HBase VS RDBMS(传统关系数据库)

  • 数据类型
    RDBMS:关系模型,丰富的数据类型和存储方式
    HBase存的数据都是字符串,用户根据自己的需要解析字符串
  • 数据操作
    RDBMS:丰富的CRUD操作,多表连接
    HBase:不存在复杂的表与表之间的关系,只有简单的插入、查询、删除、清空等
  • 存储模式
    RDBMS:基于行模式存储,行被连续存储在磁盘页,在读取时需要顺序扫描行并筛选,如果每一行只有少量数据值对于查询是有用的,那么基于行模式的存储就会浪费许多磁盘空间和内存带宽;
    HBase:基于列存储,每个列族都由几个文件保存,不同列族的文件是分离的,可以支持更大并发的查询,因为仅需处理查询所需的列,而不需要像RDBMS那样处理整行;同一个列族的数据会被一起进行压缩,由于同一列族内的数据相似度较高,因此可以获得较高的压缩比。
  • 数据索引
    RDBMS会根据需要构建多个索引
    HBase只有一个索引:行键
  • 数据维护
    RDBMS更新后老数据会被替换
    HBase更新只会生成一个新版本,老版本数据仍然保留。
  • 可伸缩性
    RDBMS很难实现横向扩展。
    HBase可以灵活地水平扩展。
  • 事务
    HBase不支持事务,不能实现跨行更新的原子性。

使用HBase

  • Native Java API
  • HBase Shell
  • Thrift Gateway
  • REST Gateway
  • Pig
  • Hive

启动HBase

从官网下载HBase:https://hbase.apache.org/
注意兼容性:http://hbase.apache.org/book.html#hadoop
Hadoop安装后只包含HDFS和MapReduce,并不包含HBase,需要在Hadoop之上继续安装HBase。

编辑配置文件conf/hbase-site.xml,可以修改数据写入目录:

1
2
3
4
5
6
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///DIRECTORY/hbase</value>
</property>
</configuration>

将 DIRECTORY 替换成期望写文件的目录. 默认 hbase.rootdir 是指向 /tmp/hbase-${user.name} ,重启时数据会丢失。
编辑环境变量conf/hbase-env.sh

1
2
3
4
# 如果JAVA_HOME已经有了就不用设置了
export JAVA_HOME=...
# 表示由hbase自己管理ZooKeeper,不需要单独的ZooKeeper
export HBASE_MANAGES_ZK=true

启动hbase:

1
./bin/start-hbase.sh

关闭hbase:

1
./bin/stop-hbase.sh

查看日志

如果启动失败后者后续的命令执行失败了,可以查看根目录下的日志:

1
vim logs/hbase-hgc-master-hgc-X555LD.log

Shell

使用shell连接HBase:

1
./bin/hbase shell
1
2
# 查看命令列表,要注意的是表名,行和列需要加引号
> help

create - 创建表、列族:

1
2
3
4
5
6
7
8
9
10
11
# 创建表
> create 'test', 't1'
# 创建表t1,列族为f1,列族版本号为5
> create 't1', {NAME => 'f1', VERSIONS => 5}
# 创建表t1,有3个列族分别为f1、f2、f3
> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
> create 't1', 'f1', 'f2', 'f3'
# 创建表t1,根据分割算法HexStringSplit分布在15个Region里
> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
# 创建表t1,指定切分点
> create 't1', 'f1', {SPLIT => ['10', '20', '30', '40']}

list - 查询表信息

1
> list

put - 向表、行、列指定的单元格添加数据:

1
2
# 向表t1中的行row1和列f1:c1所对应的单元格中添加数据value1,时间戳为1421822284898
> put 't1', 'row1', 'f1:c1', 'value1', 1421822284898

get - 获取单元格数据

1
2
3
4
# 从表t1获取数据,行row1、列f1,时间范围为TIMERANGE,版本号为1的数据
> get 't1', 'row1', {COLUMN => 'f1:c1', TIMERANGE => [0, 1421822284900], VERSIONS => 1}
# 获取表t1、行row1、列f1上的数据
> get 't1', 'row1', 'f1'

Java API

原理

数据模型


  • HBase使用表来组织数据,表由行和列组成,列又划分为若干列族。

  • 每个表由若干行组成,每个行由行键标识。
    访问表中的行只能通过:单个行键、行键区间、全表扫描实现。
  • 列族
    列族是基本的访问控制单元。
    列族里的数据通过列限定符来定位。
  • 单元格
    在表中,可以通过行、列族和列限定符确定一个单元格Cell
    单元格中存储的数据没有数据类型,总是被视为字节数组。
    每个单元格中可以保存一个数据的多个版本,每个版本对应一个不同的时间戳。
  • 时间戳
    单元格中的数据通过时间戳进行索引,每次对一个单元格执行增删改操作都会隐式生成并存储一个时间戳。
  • 数据坐标
    HBase中可以根据<行键, 列族, 列限定符, 时间戳>的四元组来确定一条数据。

面向列存储

  • 行式数据库
    行式存储将每一行连续地存储在磁盘页中,要找一行数据就要连续地扫描磁盘。
    如果每行只有少量属性的值对查询有用,那么行式存储就会浪费非常多的磁盘空间和内存带宽。
    行式数据库主要适合于小批量的数据处理,如联机事务型数据处理,常见实现如MySQL。
  • 列式数据库
    以列为单位进行存储,关系中多个元组的同一列值会被存储到一起,而同一个元组中不同列则通常会被分别存储到不同的磁盘页中。
    列式数据库主要适用于批量数据处理Ad-Hoc Query,优点是可以降低IO开销,支持大量并发用户查询,因为仅需要处理可以回答这些查询的列,而不是分类整理与特定查询无关的数据行;具有较高的数据压缩比。
    列式数据库主要用于数据挖掘、决策支持和地理信息系统等查询密集型系统中,因为一次查询就可以得出结果,而不必每次都要遍历所有的数据库。
    缺点1:连接操作效率低,执行连接操作时需要昂贵的元组重构代价,因为一个元组的不同属性被分散到不同磁盘页中存储,当需要一个完整的元组时,就要从多个磁盘页中读取相应字段的值来重新组合得到原来的一个元组。
    缺点2:不适合频繁更新同一行元组的场景,理由同上,因为一个元组的不同属性分散到不同的磁盘页,因此写操作频繁会导致不能很好命中缓冲。因此HBase更适合数据被存储后不会发生修改的场景。

HBase架构

HBase系统架构

HBase的实现包含3个主要的功能组件:

  • 一个Master主服务器
    Master负责管理HBase表的分区(Region)信息
    比如一个表包含哪些Region,这些Region被划分到哪台Region服务器上。
    同时也负责维护Region服务器列表,实时监测集群中的Region服务器,把特定的Region分配到可用的Region服务器上,并确保整个集群内部不同Region服务器之间的负载均衡。
    负责Region集群的故障转移,当某个Region服务器因出现故障而失效时,Master会把故障服务器上存储的Region重新分配给其他可用的Region服务器。
    负责模式变化,如表和列族的创建。
  • 许多个Region服务器
    Region服务器负责存储和维护分配给自己的Region,处理来自客户端的读写请求。
  • 库函数
    链接到每个客户端
    客户端并不是直接从Master上读取数据,而是先获取Region的存储位置后再直接从Region服务器上读取数据。而且需要注意的是客户端不直接和Master交互,而是从ZooKeeper上获取Region信息,这可以保证Master的负载尽可能小。

客户端

客户端会访问HBase的服务端接口,并缓存已经访问过的Region位置信息,用来提高后续访问数据的速度。

ZooKeeper服务器

  • Master将Region服务器的状态注册到ZooKeeper
  • Master选举。
  • 保存-ROOT-表和Master的地址,然后客户端可以根据-ROOT-表来一级一级找到所需的数据

Master服务器

  • 管理对表的CRUD操作
  • 实现不同Region之间的负载均衡
  • 在Region分裂或合并后,重新调整Region的分布
  • 将发生故障失效的Region迁移到其他Region服务器

Region服务器

Region服务器的主要职责:

  • 维护分配给自己的Region
  • 响应用户的读写请求

Region一般采用HDFS作为底层文件存储系统,并依赖HDFS来实现数据复制维护数据副本的功能。

Region的定位 - 如何找到一个Region

每个Region都有一个RegionID来标识它的唯一性,要定位一个Region可以使用<表名, 开始主键, RegionID>的三元组。
HBase还会维护一张<Region标识符, Region服务器>的映射表,被称为元数据表,又名 .META.表
如果一个HBase表中的Region特别多,一个服务器存不下.META.表,则.META.表也会被分区存储到不同的服务器上,并用一张根数据表来维护所有元数据的具体位置,又名 -ROOT-表,-ROOT-表是不能被分割的,永远只会被存储到一个唯一的Region中。

HBase元数据的三层结构

Region与行

HBase中的行是根据行键的字典序进行维护的,表中包含的行的数量可能非常大,需要通过行键对表中的行进行分区(Region)。
Region包含了位于某个值区间内的所有数据,它是负载均衡数据分发的基本单位,这些Region会被Master分发到不同的Region服务器上。
每个Region的默认大小是100MB200MB,当一个Region包含的数据达到一个阈值时,会被自动分裂成两个新的Region,通常一个Region服务器上会放置101000个Region。

Region服务器的存储结构

Region服务器内部维护了一系列Region对象和一个HLog文件

  • 每个Region由多个Store组成,每个Store对应了表中的一个列族的存储。
    每个Store又包含一个MemStore和若干StoreFile,前者是内存缓存,后者是磁盘文件,使用B树结构组织,底层实现方式是HDFS的HFile(会对内容进行压缩)。

  • HLog是磁盘上的记录文件,记录着所有的更新操作。

  • 每个Store对应了表中一个一个列族,包含了一个MemStore若干个StoreFile
    其中,MemStore是在内存中的缓存,保存最近更新的数据;StoreFile由HDFS的HFile实现,底层是磁盘中的文件,这些文件都是B树结构,方便快速读取,而且HFile的数据块通常采用压缩方式存储,可以大大减少网络和磁盘IO。
    Region存储结构

流程 - 用户读写数据

  • 写入流程
    用户写入 -> 路由Region服务器 -> HLog -> MemStore -> commit()返回给客户端

    HLog是WAL(Write Ahead Log),因此在MemStore之前写入

  • 读取流程
    用户读取 -> 路由Region服务器 -> MemStore -> StoreFile

流程 - 缓存刷新

  • 周期性刷新
    周期性调用Region.flushcache() -> 将MemStore缓存中的内容写到磁盘StoreFile中 -> 清空缓存 -> 在HLog中写入一个标记表示缓存已刷到StoreFile
    每次缓存刷新都会在磁盘上生成一个新的StoreFile文件,因此每个Store会包含多个StoreFile文件
  • 启动刷新
    启动时检查HLog -> 确认最后一次刷新后是否还有发生写入 -> 如果有发生则将这些更新写入MemStore -> 刷新缓存写入到StoreFile -> 删除旧的HLog文件 -> 开始为用户提供数据访问服务

    如果最后一次刷新后没有新数据,说明所有数据已经被永久保存。

流程 - StoreFile合并

如《缓存刷新》流程所述,每次MemStore刷新都会在磁盘上生成一个新的StoreFile,这样系统中每个Store都会有多个StoreFile,要找到Store中某个值就必须查找所有这些StoreFile文件,非常耗时。
因此,为了减少耗时,系统会调用Store.compact()把多个StoreFile合并成一个大文件。
这个合并操作比较耗费资源,因此只会在StoreFile文件的数量达到一个阈值时才会触发合并操作。

Store的工作原理

如《Region存储结构》所示,Region服务器是HBase的核心模块,而Store是Region服务器的核心,每个Store对应了表中的一个列族的存储,每个Store包含一个MemStore缓存和若干个StoreFile文件。

  • 写入数据优先写入MemStore,写满时刷新到StoreFile
  • 随着StoreFile数量不断增加,达到阈值时触发文件合并操作
  • 当StoreFile文件越来越大,达到阈值时,会触发文件分裂操作,同时当前的一个父Region会被分裂成2个子Region,父Region会下线,新分裂出的2个子Region会被Master分配到相应的Region服务器上。

StoreFile的合并和分裂

HLog的工作原理

在分布式环境下,系统出错可能导致数据丢失,比如Region故障导致MemStore缓存中的数据被清空了。HBase采用HLog来保证系统故障时的恢复。

  • HLog是每个Region服务器仅配置一个
    一个Region服务器包含多个Region,这些一台Region服务器上的Region会共用一个HLog
    这样做的好处是:一台Region服务器不需要打开多个日志文件,减少磁盘寻址次数,提高写操作性能。
    这样的坏处是:如果一个Region服务器发生故障,为了恢复其上的Region对象,需要按所属Region对HLog进行拆分,然后分发到其他Region服务器上执行恢复操作。
  • HLog是WAL(Write Ahead Log)
    用户数据需要先写HLog才能写入MemStore,并且直到MemStore缓存内容对应的日志已经被写入磁盘,该缓存内容才会被刷新到磁盘。
  • 数据恢复
    ZooKeeper会实时监测每个Region服务器的状态,当某个Region服务器发生故障,ZooKeeper会通知Master。
    Master会处理该故障服务器上的HLog文件,注意HLog会包含来自多个Region对象的日志记录,系统会根据每条日志所属的Region对象对HLog数据进行拆分,分别放到对应Region对象的目录下,然后再将失效的Region重新分配到可用的Region服务器中,并把与该Region对象相关的HLog日志记录也发送给相应的Region服务器。
    Region服务器领取到分配给自己的Region对象以及与之相关的HLog日志记录以后,会重演一遍日志记录中的操作,把日志记录中的数据写入MemStore缓存,然后刷新到磁盘的StoreFile文件中,完成数据恢复。

Ingest Node

Ingest Node提供了一种类似Logstash的功能:

  • 预处理能力,可拦截Index或Bulk API的请求
  • 对数据进行转换,并重新返回给Index或Bulk API
    比如为某个字段设置默认值、重命名某个字段的字段名、对字段值进行Split操作
    支持设置Painless脚本,对数据进行更加复杂的加工。

相对Logstash来说:

- Logstash Ingest Node
数据输入与输出 支持从不同的数据源读取,并写入不同的数据源 支持从ES REST API获取数据,并且写入ES
数据缓冲 实现了简单的数据队列,支持重写 不支持缓冲
数据处理 支持大量插件、支持定制开发 内置插件,支持开发Plugin(但是添加Plugin需要重启)
配置和使用 增加了一定的架构复杂度 无需额外部署

构建Ingest Node - Pipeline & Processor

ES-IngestNode

  • Pipeline
    管道会对通过的数据(文档),按照顺序进行加工
  • Processor
    对加工的行为进行抽象封装

创建pipeline

为ES添加一个Pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},

{
"set":{
"field": "views",
"value": 0
}
}
]
}

查看Pipeline:

1
GET _ingest/pipeline/blog_pipeline

测试Pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
  • 可以看到tags被拆分成了数组
  • 最终文档中新增了一个views字段

使用Pipeline更新文档:

1
2
3
4
5
6
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}

但是使用_update_by_query更新文档时可能会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
POST /tech_blogs/_update_by_query?pipeline=blog_pipeline
{
}

{
...
"failures": [
{
"index": "tech_blogs",
"type": "doc",
"id": "1",
"cause": {
"type": "exception",
"reason": "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [tags] of type [java.util.ArrayList] cannot be cast to [java.lang.String]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "java.lang.IllegalArgumentException: field [tags] of type [java.util.ArrayList] cannot be cast to [java.lang.String]",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "field [tags] of type [java.util.ArrayList] cannot be cast to [java.lang.String]"
}
},
"header": {
"processor_type": "split"
}
},
"status": 500
}
]
}

是因为对已经拆分过的字段再用split processor拆分,相当于要对数组类型的字段做字符串切分操作。
为了避免这种情况,可以通过加条件来忽略已经处理过的文档:

1
2
3
4
5
6
7
8
9
10
11
12
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "views"
}
}
}
}
}

构建pipeline

processor的种类比较多,这里列出一部分。

字段拆分 - split

ES的_ingest命令可以分析pipeline:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"
}
},
{
"_index": "index",
"_id": "idxx",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
  • pipeline中只有一个processor,它将文档的tags字段按”,”拆分为数组
  • 文档有一个tags字段,但是原始值中多个标签被拼成了一个字符串

字段值重置 - set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},

{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}

]
}
  • 添加文档时,使用processor set来增加一个新字段views

文档关联

范式化

关系数据库一般会考虑Normalize数据,而在Elasticsearch中,往往考虑Denormalize。

  • 查询性能好
  • 无需表连接
  • 无需行锁

关联

ES不擅长处理关联关系,一般采用以下4种方法处理关联:

  • 对象类型
  • 嵌套对象(Nested Object)
  • 父子关联关系(Parent / Child)
  • 应用端关联
Nested Object Parent / Child
优点 文档存储在一起,读取性能高 父子文档可以独立更新
缺点 更新嵌套的子文档时,需要更新整个文档 需要额外的内存维护关系,读取性能相对较差
适用场景 子文档偶尔更新,以查询为主 子文档更新频繁

Nested

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 插入两条数据
PUT blog/doc/1
{
"content": "i love you",
"time": "2021-12-12T12:12:12",
"user": {
"userId": 1,
"userName": "Jack",
"city": "shanghai"
}
}
PUT blog/doc/2
{
"content": "i dont love you",
"time": "2021-12-12T12:12:12",
"user": [{
"userId": 2,
"userName": "Jack Mike",
"city": "hebei"
},
{
"userId": 3,
"userName": "Joe",
"city": "beijing"
}]
}

// 搜索
POST blog/_search
{
"query": {
"bool": {
"must": [
{"match": {"content": "you"}},
{"match": {"user.userName": "Jack"}},
{"match": {"user.city": "beijing"}}
]
}
}
}

上面的搜索初看没什么问题,但是实际上查出了我们不需要的数据。

  • **Jack Mike来自hebei,Joe来自beijing,并不符合查询条件的“来自beijing的Jack”。

出现这个问题的原因是:

  • ES存储时,Nested对象的边界没有被考虑在内,JSON格式被处理成了扁平式的键值对结构

Nested Data Type可以解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 先指定user是nested域
PUT blog
{
"mappings": {
"doc": {
"properties": {
"content": {
"type": "text"
},
"time": {
"type": "date"
},
"user": {
"type": "nested",
"properties": {
"userId": {
"type": "long"
},
"userName": {
"type": "text"
}
}
}
}
}
}
}
// 查询
POST blog/_search
{
"query": {
"bool": {
"must": [
{"match": {"content": "you"}},
{
"nested": { // 指定对嵌套对象的查询
"path": "user", // 指定路径,就是嵌套对象是哪个域
"query": {
"bool": {
"must": [
{"match": {"user.userName": "Jack"}},
{"match": {"user.city": "beijing"}}
]
}
}
}
}
]
}
}
}

像这么定义索引的话相同搜索条件就查不出来了,当然查询条件的city改成”hebei”的话就能重新查出来了。

  • 其中,Nested数据结构,允许对象数组中的对象被独立索引
  • 用nested和properties关键字将所有actors索引到了多个分隔的文档
  • 在内部,Nexted文档会被存到两个Lucene对象中,

Parent / Child

Nested方式关联的局限性:

  • 每次更新,需要重新索引整个对象(包括根对象和嵌套对象)

ES中Parent / Child的关系是类似关系数据库中的Join查询。

  • 父文档和子文档是两个独立的文档;
  • 更新父文档无需重新索引子文档,子文档被添加、更新或删除也不会影响到父文档和其他的子文档。

创建索引,设置mapping:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
PUT my_blogs
{
"mappings": {
"doc": {
"properties": {
"blog_comments_relation": {
"type": "join", // 声明join类型
"relations": { // 声明parent和child关系
"blog": "comment" // parent名称和child名称
}
}
"content": {
"type": "text"
},
"title": {
"type": "keyword"
}
}
}
}
}

索引父子文档并查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#索引父文档
PUT my_blogs/_doc/blog1
{
"title":"Learning Elasticsearch",
"content":"learning ELK @ geektime",
"blog_comments_relation":{
"name":"blog"
}
}

#索引父文档
PUT my_blogs/_doc/blog2
{
"title":"Learning Hadoop",
"content":"learning Hadoop",
"blog_comments_relation":{
"name":"blog"
}
}

#索引子文档
PUT my_blogs/_doc/comment1?routing=blog1
{
"comment":"I am learning ELK",
"username":"Jack",
"blog_comments_relation":{
"name":"comment",
"parent":"blog1"
}
}

#索引子文档
PUT my_blogs/_doc/comment2?routing=blog2
{
"comment":"I like Hadoop!!!!!",
"username":"Jack",
"blog_comments_relation":{
"name":"comment",
"parent":"blog2"
}
}

#索引子文档
PUT my_blogs/_doc/comment3?routing=blog2
{
"comment":"Hello Hadoop",
"username":"Bob",
"blog_comments_relation":{
"name":"comment",
"parent":"blog2"
}
}
  • 索引子文档时设置routing=父文档,保证父文档和子文档在一个分片上
  • 同时在parent中设置其父文档的

根据需要查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#根据父文档ID查看
GET my_blogs/_doc/blog2

# Parent Id 查询
POST my_blogs/_search
{
"query": {
"parent_id": {
"type": "comment",
"id": "blog2"
}
}
}

# Has Child 查询,返回父文档
POST my_blogs/_search
{
"query": {
"has_child": {
"type": "comment",
"query" : {
"match": {
"username" : "Jack"
}
}
}
}
}

# Has Parent 查询,返回相关的子文档
POST my_blogs/_search
{
"query": {
"has_parent": {
"parent_type": "blog",
"query" : {
"match": {
"title" : "Learning Hadoop"
}
}
}
}
}

#通过ID ,访问子文档,不会返回_source
GET my_blogs/_doc/comment3
#通过ID和routing ,访问子文档,能返回_source
GET my_blogs/_doc/comment3?routing=blog2

#更新子文档
PUT my_blogs/_doc/comment3?routing=blog2
{
"comment": "Hello Hadoop??",
"blog_comments_relation": {
"name": "comment",
"parent": "blog2"
}
}

查询子文档:

  • has_parent

根据子文档查父文档:

  • parent_id
  • has_child

数据建模

过程:

  1. 概念模型
  2. 逻辑模型
    实体属性
    实体之间的关系
    搜索相关的配置
  3. 数据模型
    索引分片数
    mapping字段配置、关系处理

字段建模

字段类型

text:

  • 用于全文本字段,文本会被Analyzer分词
  • 默认不支持聚合分析及排序,需要设置fielddata为true

keyword:

  • 用于id、枚举及不需要分词的文本
  • 适用于Filter(精确匹配)、sorting、aggregation

设置多字段类型:

  • 默认会为文本类型设置成text,并且设置一个keyword的子字段
  • 在处理人类语言时,通过增加“英文”、”拼音”、”标准”分词器,提高搜索体验

结构化数据:

  • 精确数据类型,可以用byte的情况下就不要用long
  • 枚举类型设置为keyword,即便是数字也应该设置成keyword,获取更好的性能

搜索和分词

  • 如果不需要搜索、排序和聚合分析,则设置enable为false
    不需要检索的话,将index设置为false
  • 对需要搜索的字段,设置存储粒度
    index_options / norms
    不需要归一化数据时,也可以关闭,节约磁盘存储

聚合及排序

  • 如果不需要聚合或排序
    设置Doc_values / fielddata为false
  • 对更新频繁、聚合查询频繁的keyword类型的字段
    设置eager_global_ordinals为true,利用缓存提高聚合性能

额外存储

_source设置enabled为false可以节约磁盘空间
但是一般不会把_source关掉,而是优先考虑增加压缩比,因为关掉后无法再看到_source字段,且无法做Reindex和Update

数据建模最佳实践

关联关系

  1. 优先考虑Denormalization
  2. 当数据包含多数值对象,同时有查询需求时,使用Nested Object
  3. 关联文档更新非常频繁时,使用Parent / Child

避免过多字段

过多字段带来的问题:

  1. 不容易维护
  2. Mapping信息保存在Cluster State中,数据量过大的话会对集群性能造成影响(Cluster State信息需要和所有节点同步)
  3. 删除或修改数据需要reindex

默认最大字段数是1000,可以设置index.mapping.total_fields.limit来修改

避免正则查询

正则查询存在的问题:

  • 正则、通配符查询、前缀查询属于Term查询,但是性能不够好
  • 特别是将通配符放在开头的话,性能极差

避免空值引起的聚合不准

比如下面插入两条文档,一条文档的rating值为null:

1
2
3
4
5
6
7
8
PUT ratings/doc/1
{
"rating":5
}
PUT ratings/doc/2
{
"rating":null
}

聚合分析结果中可以看到,total虽然是2,但是avg结果却是5:

1
2
3
4
5
6
7
8
9
10
11
POST ratings/_search
{
"size": 0,
"aggs": {
"avg": {
"avg": {
"field": "rating"
}
}
}
}

解决办法是给null取默认值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DELETE ratings
PUT ratings
{
"mappings": {
"doc": {
"properties": {
"rating": {
"type": "long",
"null_value": 1.0
}
}
}
}
}

为索引的Mapping加入Meta信息

1
2
3
4
5
6
7
8
PUT softwares
{
"mappings": {
"_meta": {
"software_version_mapping": "1.0"
}
}
}

Mapping的设置是一个迭代的过程:

  • 加入新的字段很容易(必要时需要update_by_query)
  • 更新删除字段不允许(需要Reindex重建数据)
  • 最好能对Mapping加入Meta信息,更好地进行版本管理
  • 可以考虑将Mapping文件上传git进行管理
0%