Merge branch 'stable-3.9'
* stable-3.9:
Avoid duplicate indexing tasks for the same id
Change-Id: I9148522a46c44069733a285e2794a7277ade0412
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index b3fb27c..15f7c13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -56,34 +56,49 @@
@Override
protected void doIndex(String id, Optional<ChangeIndexEvent> indexEvent) {
- attemptToIndex(id, indexEvent, 0);
+ scheduleIndexing(id, indexEvent, this::indexIfConsistent);
+ }
+
+ private void indexIfConsistent(String id) {
+ if (isChangeConsistent(id)) {
+ reindex(id);
+ }
+ }
+
+ private boolean isChangeConsistent(String id) {
+ ChangeChecker checker = changeCheckerFactory.create(id);
+ Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
+ return changeNotes.isPresent() && checker.isChangeConsistent();
}
@Override
- protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
+ protected void attemptToIndex(String id) {
ChangeChecker checker = changeCheckerFactory.create(id);
Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
boolean changeIsPresent = changeNotes.isPresent();
boolean changeIsConsistent = checker.isChangeConsistent();
if (changeIsPresent && changeIsConsistent) {
- reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
+ reindexAndCheckIsUpToDate(id, checker);
} else {
+ IndexingRetry retry = indexingRetryTaskMap.get(id);
log.warn(
"Change {} {} in local Git repository (event={}) after {} attempt(s)",
id,
!changeIsPresent
? "not present yet"
: (changeIsConsistent ? "is" : "is not") + " consistent",
- indexEvent,
- retryCount);
- if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
+ retry.getEvent(),
+ retry.getRetryNumber());
+
+ retry.incrementRetryNumber();
+ if (!rescheduleIndex(id)) {
log.error(
"Change {} {} in the local Git repository (event={})",
id,
!changeIsPresent
? "could not be found"
: (changeIsConsistent ? "was" : "was not") + " consistent",
- indexEvent);
+ retry.getEvent());
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
index c4906a9..d05405f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
@@ -52,7 +52,7 @@
@Override
protected void doIndex(String uuid, Optional<GroupIndexEvent> event) {
- attemptToIndex(uuid, event, 0);
+ scheduleIndexing(uuid, event, this::reindex);
}
@Override
@@ -66,9 +66,8 @@
}
@Override
- protected void attemptToIndex(
- String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) {
- reindexAndCheckIsUpToDate(uuid, groupIndexEvent, groupChecker, retryCount);
+ protected void attemptToIndex(String uuid) {
+ reindexAndCheckIsUpToDate(uuid, groupChecker);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
index 3787a80..2054f10 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
@@ -52,7 +52,7 @@
@Override
protected void doIndex(String projectName, Optional<ProjectIndexEvent> event) {
- attemptToIndex(projectName, event, 0);
+ scheduleIndexing(projectName, event, this::reindex);
}
@Override
@@ -66,8 +66,8 @@
}
@Override
- protected void attemptToIndex(String id, Optional<ProjectIndexEvent> indexEvent, int retryCount) {
- reindexAndCheckIsUpToDate(id, indexEvent, projectChecker, retryCount);
+ protected void attemptToIndex(String id) {
+ reindexAndCheckIsUpToDate(id, projectChecker);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
index 5c64431..f3d4e70 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
@@ -19,10 +19,13 @@
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
import com.googlesource.gerrit.plugins.multisite.index.UpToDateChecker;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* Base class to handle forwarded indexing. This class is meant to be extended by classes used on
@@ -37,6 +40,7 @@
private final int maxTries;
private final ScheduledExecutorService indexExecutor;
protected final OneOffRequestContext oneOffCtx;
+ protected final Map<T, IndexingRetry> indexingRetryTaskMap = new ConcurrentHashMap<>();
ForwardedIndexingHandlerWithRetries(
ScheduledExecutorService indexExecutor,
@@ -55,22 +59,34 @@
protected abstract String indexName();
- protected abstract void attemptToIndex(T id, Optional<E> indexEvent, int retryCount);
+ protected abstract void attemptToIndex(T id);
- protected boolean rescheduleIndex(T id, Optional<E> indexEvent, int retryCount) {
- if (retryCount > maxTries) {
+ protected boolean rescheduleIndex(T id) {
+ IndexingRetry retry = indexingRetryTaskMap.get(id);
+ if (retry == null) {
+ log.info(
+ "{} {} successfully indexed by different task, rescheduling isn't needed",
+ indexName(),
+ id);
+ return true;
+ }
+ if (retry.getRetryNumber() > maxTries) {
log.error(
"{} {} could not be indexed after {} retries. {} index could be stale.",
indexName(),
id,
- retryCount,
+ retry.getRetryNumber(),
indexName());
+ if (!indexingRetryTaskMap.remove(id, retry)) {
+ log.debug(
+ "{} {} not removed from retry map because of racy addition of a new retry indexing retry");
+ }
return false;
}
log.warn(
"Retrying for the #{} time to index {} {} after {} msecs",
- retryCount,
+ retry.getRetryNumber(),
indexName(),
id,
retryInterval);
@@ -80,7 +96,7 @@
() -> {
try (ManualRequestContext ctx = oneOffCtx.open()) {
Context.setForwardedEvent(true);
- attemptToIndex(id, indexEvent, retryCount);
+ attemptToIndex(id);
} catch (Exception e) {
log.warn("{} {} could not be indexed", indexName(), id, e);
}
@@ -90,20 +106,66 @@
return true;
}
- public final void reindexAndCheckIsUpToDate(
- T id, Optional<E> indexEvent, UpToDateChecker<E> upToDateChecker, int retryCount) {
- reindex(id);
-
- if (!upToDateChecker.isUpToDate(indexEvent)) {
- log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
- rescheduleIndex(id, indexEvent, retryCount + 1);
+ public void scheduleIndexing(T id, Optional<E> event, Consumer<T> indexOnce) {
+ IndexingRetry retry = new IndexingRetry(event);
+ if (indexingRetryTaskMap.put(id, retry) != null) {
+ indexOnce.accept(id);
+ log.info(
+ "Skipping indexing because there is already a running task for the specified id. Index name: {}, task id: {}",
+ indexName(),
+ id);
return;
}
- if (retryCount > 0) {
+ attemptToIndex(id);
+ }
+
+ public final void reindexAndCheckIsUpToDate(T id, UpToDateChecker<E> upToDateChecker) {
+ reindex(id);
+ IndexingRetry retry = indexingRetryTaskMap.get(id);
+ if (retry == null) {
+ log.warn("{} {} successfully indexed by different task", indexName(), id);
+ return;
+ }
+ if (!upToDateChecker.isUpToDate(retry.getEvent())) {
+ log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
+ retry.incrementRetryNumber();
+ rescheduleIndex(id);
+ return;
+ }
+
+ if (retry.getRetryNumber() > 0) {
log.warn(
- "{} {} has been eventually indexed after {} attempt(s)", indexName(), id, retryCount);
+ "{} {} has been eventually indexed after {} attempt(s)",
+ indexName(),
+ id,
+ retry.getRetryNumber());
} else {
log.debug("{} {} successfully indexed", indexName(), id);
}
+ if (!indexingRetryTaskMap.remove(id, retry)) {
+ log.debug(
+ "{} {} not removed from retry map because of racy addition of a new retry indexing retry");
+ }
+ }
+
+ public class IndexingRetry {
+ private final Optional<E> event;
+ private int retryNumber = 0;
+
+ public IndexingRetry(Optional<E> event) {
+ this.event = event;
+ }
+
+ public int getRetryNumber() {
+ return retryNumber;
+ }
+
+ public Optional<E> getEvent() {
+ return event;
+ }
+
+ public void incrementRetryNumber() {
+ ++retryNumber;
+ }
}
}