private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateRequest, IndexShard primary, IndexMetaData metaData, BulkShardRequest request, int requestIndex, UpdateHelper updateHelper, LongSupplier nowInMillis, final MappingUpdatePerformer mappingUpdater) throws Exception { BulkItemRequest primaryItemRequest = request.items()[requestIndex]; assert primaryItemRequest.request() == updateRequest : "expected bulk item request to contain the original update request, got: " + primaryItemRequest.request() + " and " + updateRequest;
BulkItemResultHolder holder = null; // There must be at least one attempt // 保证至少执行一次,因此重试 int maxAttempts = Math.max(1, updateRequest.retryOnConflict()); for (int attemptCount = 0; attemptCount < maxAttempts; attemptCount++) {
// It was either a successful request, or it was a non-conflict failure if (holder.isVersionConflict() == false) { return holder; } } // We ran out of tries and haven't returned a valid bulk item response, so return the last one generated return holder; }
public IndexResult index(Index index) throws IOException { assert Objects.equals(index.uid().field(), uidField) : index.uid().field(); final boolean doThrottle = index.origin().isRecovery() == false; try (ReleasableLock releasableLock = readLock.acquire()) { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); assert assertVersionType(index); try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); /* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS: * if we have an autoGeneratedID that comes into the engine we can potentially optimize * and just use addDocument instead of updateDocument and skip the entire version and index lookupVersion across the board. * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added * to detect if it has potentially been added before. We use the documents timestamp for this since it's something * that: * - doesn't change per document * - is preserved in the transaction log * - and is assigned before we start to index / replicate * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship. * for instance: * - doc A has autoGeneratedIdTimestamp = 10, isRetry = false * - doc B has autoGeneratedIdTimestamp = 9, isRetry = false * * while both docs are in in flight, we disconnect on one node, reconnect and send doc A again * - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true * * if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent * documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp. * While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case. * * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls * updateDocument. */ final IndexingStrategy plan;
if (index.origin() == Operation.Origin.PRIMARY) { plan = planIndexingAsPrimary(index); } else { // non-primary mode (i.e., replica or recovery) plan = planIndexingAsNonPrimary(index); }
final IndexResult indexResult; if (plan.earlyResultOnPreFlightError.isPresent()) { indexResult = plan.earlyResultOnPreFlightError.get(); assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType(); } else if (plan.indexIntoLucene) { indexResult = indexIntoLucene(index, plan); } else { indexResult = new IndexResult( plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translog.add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog with the generated seq_no location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString())); } else { location = null; } indexResult.setTranslogLocation(location); } if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; versionMap.maybePutIndexUnderLock(index.uid().bytes(), new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); } indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); return indexResult; } } catch (RuntimeException | IOException e) { try { maybeFailEngine("index", e); } catch (Exception inner) { e.addSuppressed(inner); } throw e; } }
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) { if (logger.isTraceEnabled()) { logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); }
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, final TransportResponseHandler<T> handler) { try { Transport.Connection connection = getConnection(node); sendRequest(connection, action, request, TransportRequestOptions.EMPTY, handler); } catch (NodeNotConnectedException ex) { // the caller might not handle this so we invoke the handler handler.handleException(ex); } }
public Transport.Connection getConnection(DiscoveryNode node) { if (isLocalNode(node)) { return localNodeConnection; } else { return transport.getConnection(node); } }
public Engine.GetResult get(Engine.Get get) { readAllowed(); return getEngine().get(get, this::acquireSearcher); }
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException { assert Objects.equals(get.uid().field(), uidField) : get.uid().field(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); SearcherScope scope; // 处理realtime选项,判断是否需要刷盘 if (get.realtime()) { VersionValue versionValue = null; // versionMap写入索引的时候添加的,不会写入磁盘 try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) { // we need to lock here to access the version map to do this truly in RT versionValue = getVersionFromMap(get.uid().bytes()); } if (versionValue != null) { if (versionValue.isDelete()) { return GetResult.NOT_EXISTS; } // 版本是否冲突 if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) { throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } if (get.isReadFromTranslog()) { // this is only used for updates - API _GET calls will always read form a reader for consistency // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 if (versionValue.getLocation() != null) { try { Translog.Operation operation = translog.readOperation(versionValue.getLocation()); if (operation != null) { // in the case of a already pruned translog generation we might get null here - yet very unlikely TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig .getIndexSettings().getIndexVersionCreated()); return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)), new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0)); } } catch (IOException e) { maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event throw new EngineException(shardId, "failed to read operation from translog", e); } } else { trackTranslogLocation.set(true); } } // 执行刷盘操作 refresh("realtime_get", SearcherScope.INTERNAL); } scope = SearcherScope.INTERNAL; } else { // we expose what has been externally expose in a point in time snapshot via an explicit refresh scope = SearcherScope.EXTERNAL; } // 调用Searcher读取数据 // no version, get the version from the index, we know that we refresh on flush return getFromSearcher(get, searcherFactory, scope); } }