ES3_7搜索源码分析

搜索源码分析。

Search源码流程

GET请求只能查询单个文档,通过_index、_type、_id三元组来确定唯一文档。
而Search在查到匹配的文档之后,还需要将从多个分片上得到的结果组合成单个排序列表,这个过程在协调节点上执行,被称为query then fetch

  • 索引的所有分片(中的某个副本)都需要参与搜索,因为查询的时候不知道文档位于哪个分片;
  • 协调节点需要将结果合并,再根据ID获取文档内容。

建立索引和搜索的过程(单机视角)

ES中的搜索主要分为精确和全文两类:

  • 精确值:比如日期、用户ID、IP地址等字段的完全匹配;
    对精确值的搜索遵循“要么匹配、要么不匹配的原则”。
  • 全文:对文本内容的模糊搜索,比如一条日志、邮件的内容等。
    全文搜索不能确定目标是不是自己要搜索的,而是只能给出一个相似度的评分,评分越高,越有可能是自己要找的文档。

在ES中执行搜索需要建立索引执行搜索这两个步骤:
ES的索引和搜索
在ES中建立索引和执行搜索的大致流程如上图所示。

这个过程比较关注单机视角下的文档处理,因此对最后两步的查找匹配文档相关性得分进行了省略,这部分需要在下面的分布式搜索流程中再作分析。

建立索引

如果是全文数据,则对文本内容进行分析,这项工作在 ES 中由分析器实现。分析器实现如下功能:

  • 字符过滤器。主要是对字符串进行预处理,例如,去掉HTML,将&转换成and等。
  • 分词器(Tokenizer)。将字符串分割为单个词条,例如,根据空格和标点符号分割,输出的词条称为词元(Token)。
  • Token过滤器。根据停止词(Stop word)删除词元,例如,and、the等无用词,或者根据同义词表增加词条,例如,jump和leap。
  • 语言处理。对上一步得到的Token做一些和语言相关的处理,例如,转为小写,以及将单词转换为词根的形式。语言处理组件输出的结果称为词(Term)。分析完毕后,将分析器输出的词(Term)传递给索引组件,生成倒排和正排索引,再存储到文件系统中。

执行搜索

搜索依赖Lucene完成,对于全文搜索:

  • 对检索字段使用建立索引时相同的分析器进行分析,产生Token列表(词法分析);
  • 根据查询语句的语法规则转换成一棵语法树(语法分析);
  • 查找符合语法树的文档;
  • 对匹配到的文档列表进行相关性评分,评分策略一般使用TF/IDF;
  • 根据评分结果进行排序。

分布式搜索过程(集群视角)

ES中的search请求必须询问所请求的索引中的所有分片的某个副本来得到结果。比如一个索引有5个主分片,每个主分片有1个副分片,则一次搜索会访问共5个分片,但并不一定都是主分片。
search请求由协调节点接收请求,转发到数据节点查询结果,并最终在协调节点中合并结果返回。
搜索相关类:

  • InitialSearchPhase(Query流程)
  • FetchSearchPhase(Fatch流程)

搜索流程中最重要的就是上面两个类,分别负责Query和Fetch流程。

Query流程

代码入口:InitialSearchPhase#run
1、查询广播到索引中每一个分片副本(主分片或副分片)
将请求体解析为SearchRequest
RestSearchAction#prepareRequest
构建本节点的shard列表和远程节点的shard列表,即localShardsIteratorremoteShardIterators,最终通过。
TransportSearchAction#executeSearch
merge过的shard列表传给RestSearchAction的构造
TransportSearchAction#searchAsyncAction
遍历所有shard发送请求
InitialSearchPhase#run

1
2
3
4
5
for (int index = 0; index < maxConcurrentShardRequests; index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}

最终需要根据执行情况来判断是否要进入到下一阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
maybeFork(thread, () -> onShardResult(result, shardIt));
}

@Override
public void onFailure(Exception t) {
maybeFork(thread, () -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t));
}
});

private void onShardResult(FirstResult result, SearchShardIterator shardIt) {

onShardSuccess(result);

successfulShardExecution(shardIt);
}

private void successfulShardExecution(SearchShardIterator shardsIt) {
...
// 计数器累加
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
// 检查是否收到全部回复
if (xTotalOps == expectedTotalOps) {
// 调用executeNextPhase,从而开始执行取回阶段
onPhaseDone();
} else if (xTotalOps > expectedTotalOps) {
throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected ["
+ expectedTotalOps + "]");
} else if (shardsIt.skip() == false) {
maybeExecuteNext();
}
}

2、每个分片在本地执行查询,并使用本地的Term/Document Frequency信息进行打分,添加结果到大小为from+size的本地有序优先队列中;
优先队列是一个存有topN匹配文档的有序列表。优先队列大小为分页参数from + size。
TODO
3、每个分片返回各自优先队列中所有文档的ID和排序值给协调节点,协调节点合并这些值到自己的优先队列中,产生一个全局排序后的列表。
为了避免在协调节点中创建的number_of_shards * (from + size)优先队列过大,应尽量控制分页深度。
TODO

Fetch阶段

Fetch阶段的主要目的是通过文档ID获取完整的文档内容。
1、协调节点向相关节点发送GET请求(即根据ID获取文档);
由上面对Query阶段的分析可知收到所有节点的回复后协调节点会触发Fetch流程(executeNextPhase)。
FetchSearchPhase#run
遍历查询阶段的shard列表,跳过查询结果为空的shard,对特定目标执行executeFetch获取数据。
FetchSearchPhase#innerRun

1
2
3
4
5
6
7
8
9
10
for (int i = 0; i < docIdsToLoad.length; i++) {
...
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}

executeFetch执行查询请求,并在获取后调用countDown。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
final CountedCollector<FetchSearchResult> counter,
final ShardFetchSearchRequest fetchSearchRequest, final QuerySearchResult querySearchResult,
final Transport.Connection connection) {
context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),
new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
@Override
public void innerOnResponse(FetchSearchResult result) {
counter.onResult(result);
}

@Override
public void onFailure(Exception e) {
try {
logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
counter.onFailure(shardIndex, shardTarget, e);
} finally {
// the search context might not be cleared on the node where the fetch was executed for example
// because the action was rejected by the thread pool. in this case we need to send a dedicated
// request to clear the search context.
releaseIrrelevantSearchContext(querySearchResult);
}
}
});
}

2、分片所在节点接收到请求后,向协调节点返回数据;
3、协调节点等待所有文档获取完毕后,返回给客户端。
收集器定义如下,包括收到的shard数据存放在哪里,及收到完成后谁来处理:
FetchSearchPhase#innerRun

1
2
3
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase, context);

每有一个shard执行完毕就执行一次counter.countDown()

1
2
3
4
5
6
void countDown() {
assert counter.isCountedDown() == false : "more operations executed than specified";
if (counter.countDown()) {
onFinish.run();
}
}

最后一个执行完毕后,执行finishPhase:

1
2
3
final Runnable finishPhase = ()
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
queryResults : fetchResults);

moveToNextPhase开始执行下一个阶段,可以看到在FetchSearchPhase的构造函数中构建了下一个需要执行的Phase,即ExpandSearchPhase

1
2
3
4
5
6
7
FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context,
(response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
}

4、ExpandSearchPhase
判断是否启用字段折叠,然后处理折叠,并将处理后的结果返回给客户端。
5、执行完毕后,调用sendResponsePhase回复客户端。

执行搜索的数据节点流程

上面Query阶段和Fetch阶段都涉及到搜索流程,Query是根据条件过滤文档,Fetch是根据过滤后的结果取对应的文档内容。
1、注册搜索处理器
SearchTransportService#registerRequestHandler

1
org.elasticsearch.action.search.SearchTransportService#registerRequestHandler

2、响应Query请求
调用Lucene执行搜索,及聚合结果
Service#executeQueryPhase
3、响应Fetch请求
即GET操作