ES2_3文档操作源码分析

写入(POST、PUT、DELETE)流程源码分析

refresh和flush - 实时性和可靠性之间的权衡

刷盘流程

近实时性

ES中数据写入后并不能被马上查到,而是必须先执行refresh,默认是1s,最快可到100ms。

可靠性

搜索系统对可靠性要求都不高,一般数据的可靠性通过将原始数据存储在另一个存储系统来保证,当搜索系统的数据发生丢失时,再从其他存储系统导一份数据过来重新rebuild就可以了。
ES采用多副本模型,可以避免单机发生故障时丢失数据,但是ES同时为了提升读写性能,一般是每隔一段时间才会把Lucene的Segment flush到磁盘实现持久化,这样减少了磁盘IO,但是数据未flush期间,如果发生了宕机就很容易导致数据的丢失。对于这个问题,ES中的解决方法类似数据库中的CommitLog,ES中引入了一个TransLog。
可以通过设置TransLog的Flush频率来控制写入缓存的数据什么时候刷到磁盘上,要么是按请求,每次请求都Flush;要么是按时间,每隔一段时间Flush一次。一般为了性能考虑,会设置为每隔5秒或者1分钟Flush一次,Flush间隔时间越长,可靠性就会越低。

ES的刷盘流程

之前我们已经讨论了数据如何定位到某个node、某个shard。

  1. 在每个shard上,数据会先写入Lucene,此时数据还在内存里;
    写Lucene内存后还不可被搜索,需要先通过Refresh将内存对象转成完整Segment后,再次reopen后才可被搜索。
    但是简单的Get操作是GetById的,这种查询可以直接从TransLog中查询,因此这种情况下是实时的。
  2. 接着去写TransLog,写完TransLog后会刷新TransLog数据到磁盘上;

    和数据库不同,数据库是先写CommitLog再写内存,而ES是先写内存(Lucene)再写TransLog,原因是Lucene的内存写入有很复杂的逻辑,比如分词、字段长度超过限制等,很容易失败,为了避免TransLog中有大量无效记录,减少recover的复杂度和提高速度,所以把写Lucene放到了前面。

  3. 等到TransLog数据被刷新到磁盘上后,返回写成功给用户。
  4. 隔一段比较长的时间后,Lucene会把内存中新生成的Segment Flush到磁盘,之后就会把TransLog清空掉。

ES会丢失数据吗?

  • Lucene每隔1秒生成Segment文件,此时Segment还在缓存中,还未刷盘,如果这时挂掉,内存中的数据仍然可以从TransLog中恢复
  • TransLog中的数据是每隔5秒刷新到磁盘,显然这还不能保证数据安全,最多会导致丢失TransLog中5秒内的数据,可以通过配置增加TransLog刷磁盘的频率来增加数据可靠性,但是会对性能有比较大的影响
  • 即使Master分片所在节点宕掉,导致TransLog丢失了,仍然可以从副本恢复

文档更新(部分更新)

Lucene中不支持文档的部分更新,因此需要在Elasticsearch中实现该功能:

  1. 收到Update请求后,从Segment或TransLog中读取该id的完整文档,记录版本为V1;
  2. 将版本V1的文档和请求中的部分字段文档合并,同时更新内存中的versionMap,得到V2,之后Update请求就变成了对V2的Index请求;
  3. 加锁;
  4. 再次从versionMap中读取该id的最大版本号V2,如果没有再从Segment或TransLog中读取,但是versionMap中基本都可以获取到;
  5. 检查版本是否冲突(V1和V2),如果冲突则回退到开始的Update阶段重新执行,否则继续执行Index请求;
  6. 在Index阶段,首先版本+1得到V3,再将文档加入到Lucene中去,Lucene中会删除同id的旧文档,然后再新增文档。写入成功后,将V3更新到versionMap中;
  7. 释放锁。

文档操作类型

文档操作分为单个的(Index)和批量的(Bulk),它们最终都会被统一封装为批量操作请求(BulkRequest)。

请求入口

在ES中,所有action的入口都注册在ActionModule中,比如Bulk Request有两个注册入口:

1
2
3
4
actions.register(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);

registerHandler.accept(new RestBulkAction(settings, restController));

对于Rest请求,会在RestBulkAction中解析请求,并最终转换成TransportAction处理。

比如对请求:localhost:9200/website/blog/123

1
2
3
4
5
{
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}

会先被dispatch到RestIndexAction,然后转发给TransportBulkAction#doExecute,下面对文档写入流程的分析也将从这个入口开始。

文档写入流程

由上边对ES数据模型的讨论可知,ES文档的写入必须是先成功写入到主分片,然后才能复制到相关的副分片。
多节点多分片模型

  1. 第一个接收请求的节点是协调节点;
  2. 先根据_routing规则选择发给哪个shard(分片);
    优先使用IndexRequest中的设置,其次使用mapping中的配置,如果都没有则使用_id作为路由参数;
  3. 从集群的meta中找出该shard的节点,此时,请求会被转发到primary shard所在的节点;
  4. 请求接着发送给primary shard执行写操作;
  5. primary shard执行成功后再发送给多个replica shard;
  6. 请求在多个replica shard上执行成功并返回给协调节点后,写入执行成功,协调节点返回结果给客户端。

从上述写入的概述可知,写入流程具体的,可以分为协调节点主分片节点副分片节点三种角色的写入过程。
ES写入流程

协调节点处理流程

1、自动创建索引
入口:TransportBulkAction#doExecute
找出请求中需要自动创建的索引

1
2
3
4
5
6
7
8
9
10
11
12
for (String index : indices) {
boolean shouldAutoCreate;
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
autoCreateIndices.add(index);
}
}

执行创建索引的请求:

1
2
3
4
5
6
7
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
}

2、路由请求
入口:TransportBulkAction.BulkOperation#doRun
不同类型的请求路由逻辑也不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
switch (docWriteRequest.opType()) {
// 创建索引、mapping请求
case CREATE:
case INDEX:
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
Version indexCreated = indexMetaData.getCreationVersion();
indexRequest.resolveRouting(metaData);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
// 更新文档请求
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
// 删除文档操作
case DELETE:
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}

然后计算文档ID的hash值,将其分配给对应的shard:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 根据文档ID分配给对应的shardId
// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}

if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
return;
}

3、轮询分片,分发请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
// 当前节点ID
String nodeId = clusterService.localNode().getId();
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
// 对每个分片
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
// 创建该分片的批量操作请求
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
// 执行该请求
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
...
}

4、将分片请求发往节点
代码入口:TransportReplicationAction.ReroutePhase#doRun
将请求路由到主分片所在的节点上,并重试失败的操作。

1
2
3
4
5
6
7
8
9
10
11
12
// 找到主分片
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
// 主分片在当前节点就直接本地执行,否则就调用该远程节点执行
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
}

主分片节点处理流程

如上所述,协调节点会将请求发送给主分片所在节点,该节点接收请求,并执行该请求对应的处理器。
消息接收入口:TransportReplicationAction.PrimaryOperationTransportHandler#messageReceived
主节点执行逻辑:ReplicationOperation#execute
1、判断活跃的shard是否足够
代码入口:ReplicationOperation#checkActiveShardCount
活跃的分片越多,执行写入后同步的备份也越多,数据也越不容易丢失;默认为1,表示主分片可用就执行写入。
2、主分片执行
代码入口:ReplicationOperation.Primary#perform

1
2
3
4
5
6
public PrimaryResult perform(Request request) throws Exception {
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
+ "] with a primary failure [" + result.finalFailure + "]";
return result;
}

3、主分片执行索引操作
代码中需要对请求进行dispatch,TransportShardBulkAction#executeBulkItemRequest,以UPDATE操作为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary,
IndexMetaData metaData, BulkShardRequest request,
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
BulkItemRequest primaryItemRequest = request.items()[requestIndex];
assert primaryItemRequest.request() == updateRequest
: "expected bulk item request to contain the original update request, got: " +
primaryItemRequest.request() + " and " + updateRequest;

BulkItemResultHolder holder = null;
// There must be at least one attempt
// 保证至少执行一次,因此重试
int maxAttempts = Math.max(1, updateRequest.retryOnConflict());
for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) {

holder = executeUpdateRequestOnce(updateRequest, primary, metaData, request.index(), updateHelper,
nowInMillis, primaryItemRequest, request.items()[requestIndex].id(), mappingUpdater);

// It was either a successful request, or it was a non-conflict failure
if (holder.isVersionConflict() == false) {
return holder;
}
}
// We ran out of tries and haven't returned a valid bulk item response, so return the last one generated
return holder;
}

路径很长,最后调用了InternalEngine#index

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
* if we have an autoGeneratedID that comes into the engine we can potentially optimize
* and just use addDocument instead of updateDocument and skip the entire version and index lookupVersion across the board.
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
* that:
* - doesn't change per document
* - is preserved in the transaction log
* - and is assigned before we start to index / replicate
* NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
* case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
* for instance:
* - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
* - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
*
* while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
* - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
*
* if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
* documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
* While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
*
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
* updateDocument.
*/
final IndexingStrategy plan;

if (index.origin() == Operation.Origin.PRIMARY) {
plan = planIndexingAsPrimary(index);
} else {
// non-primary mode (i.e., replica or recovery)
plan = planIndexingAsNonPrimary(index);
}

final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else if (plan.indexIntoLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.getResultType() == Result.Type.SUCCESS) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}

副分片执行

同样是在主节点的ReplicationOperation#execute中,需要调用副分片的写入接口。
1、调用副分片
代码入口:ReplicationOperation#performOnReplicas

1
2
3
4
5
6
7
8
9
10
11
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}

totalShards.incrementAndGet();
pendingActions.incrementAndGet();
// 发HTTP请求给副分片
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
...
}

2、副分片接收请求写入文档的流程与主分片基本一致。
消息接收入口是:TransportReplicationAction.ReplicaOperationTransportHandler#messageReceived

GET流程源码分析

实时性

Elasticsearch中的GET请求也能保证是实时的,因为GET请求会直接读内存中尚未Flush到磁盘的TransLog。
但是GET请求只支持通过doc_id进行查询,所以对于条件查询(Search)依然无法实现实时。

GET执行流程

GET指的是单个文档的查询请求,文档的唯一标识是ID,因此GET就是根据ID来找到一个文档。
ES集群中的节点分为协调节点数据节点,GET请求会先打到协调节点,然后转发到数据节点上;如果一个节点执行失败,则转发到其他节点上进行读取。
ES的GET流程

协调节点执行流程

代码入口:TransportSingleShardAction.AsyncSingleAction#perform
注意AsyncSingleAction构造方法中会先准备集群状态、节点列表等信息。
1、计算文档所在的shardid,即它所在的分片;
在构造方法中,会先根据请求进行路由:`TransportGetAction#shards

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.listener = listener;

ClusterState clusterState = clusterService.state();
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
// 集群nodes列表
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}

String concreteSingleIndex;
if (resolveIndex(request)) {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
} else {
concreteSingleIndex = request.index();
}
this.internalRequest = new InternalRequest(request, concreteSingleIndex);
// 解析请求,更新自定义routing
resolveRequest(clusterState, internalRequest);

blockException = checkRequestBlock(clusterState, internalRequest);
if (blockException != null) {
throw blockException;
}
// 根据路由算法计算得到目的shard迭代器,或者根据优先级选择目标节点
this.shardIt = shards(clusterState, internalRequest);
}

2、发送请求
检查目标节点是不是本地节点,如果是则直接调本地的TransportService#sendLocalRequest;如果是远程节点则执行远程调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportResponseHandler<T> handler) {
try {
Transport.Connection connection = getConnection(node);
sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler);
} catch (NodeNotConnectedException ex) {
// the caller might not handle this so we invoke the handler
handler.handleException(ex);
}
}

public Transport.Connection getConnection(DiscoveryNode node) {
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return transport.getConnection(node);
}
}

数据节点执行流程

代码入口:TransportSingleShardAction.ShardTransportHandler#messageReceived

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}
Response response = shardOperation(request, request.internalShardId);
channel.sendResponse(response);
}

protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());

if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}

GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}

1、读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
final Collection<String> types;
// 处理_all选项
if (type == null || type.equals("_all")) {
types = mapperService.types();
} else {
types = Collections.singleton(type);
}

Engine.GetResult get = null;
for (String typeX : types) {
Term uidTerm = mapperService.createUidTerm(typeX, id);
if (uidTerm != null) {
// 调用Engine读取数据
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, typeX, id, uidTerm)
.version(version).versionType(versionType));
if (get.exists()) {
type = typeX;
break;
} else {
get.release();
}
}
}

...

// 过滤返回结果
// 根据type、id、DocumentMapper等信息从刚刚获取的信息中获取数据,对指定的field、source进行过滤
// 把结果存于GetResult返回
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
}

2、从InternalEngine读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public Engine.GetResult get(Engine.Get get) {
readAllowed();
return getEngine().get(get, this::acquireSearcher);
}

public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
SearcherScope scope;
// 处理realtime选项,判断是否需要刷盘
if (get.realtime()) {
VersionValue versionValue = null;
// versionMap写入索引的时候添加的,不会写入磁盘
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
// we need to lock here to access the version map to do this truly in RT
versionValue = getVersionFromMap(get.uid().bytes());
}
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
// 版本是否冲突
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
if (get.isReadFromTranslog()) {
// this is only used for updates - API _GET calls will always read form a reader for consistency
// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
if (versionValue.getLocation() != null) {
try {
Translog.Operation operation = translog.readOperation(versionValue.getLocation());
if (operation != null) {
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
throw new EngineException(shardId, "failed to read operation from translog", e);
}
} else {
trackTranslogLocation.set(true);
}
}
// 执行刷盘操作
refresh("realtime_get", SearcherScope.INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
// we expose what has been externally expose in a point in time snapshot via an explicit refresh
scope = SearcherScope.EXTERNAL;
}
// 调用Searcher读取数据
// no version, get the version from the index, we know that we refresh on flush
return getFromSearcher(get, searcherFactory, scope);
}
}