Merge branch 'stable-3.9'

* stable-3.9:
  Avoid duplicate indexing tasks for the same id

Change-Id: I9148522a46c44069733a285e2794a7277ade0412
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index b3fb27c..15f7c13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -56,34 +56,49 @@
 
   @Override
   protected void doIndex(String id, Optional<ChangeIndexEvent> indexEvent) {
-    attemptToIndex(id, indexEvent, 0);
+    scheduleIndexing(id, indexEvent, this::indexIfConsistent);
+  }
+
+  private void indexIfConsistent(String id) {
+    if (isChangeConsistent(id)) {
+      reindex(id);
+    }
+  }
+
+  private boolean isChangeConsistent(String id) {
+    ChangeChecker checker = changeCheckerFactory.create(id);
+    Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
+    return changeNotes.isPresent() && checker.isChangeConsistent();
   }
 
   @Override
-  protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
+  protected void attemptToIndex(String id) {
     ChangeChecker checker = changeCheckerFactory.create(id);
     Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
     boolean changeIsPresent = changeNotes.isPresent();
     boolean changeIsConsistent = checker.isChangeConsistent();
     if (changeIsPresent && changeIsConsistent) {
-      reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
+      reindexAndCheckIsUpToDate(id, checker);
     } else {
+      IndexingRetry retry = indexingRetryTaskMap.get(id);
       log.warn(
           "Change {} {} in local Git repository (event={}) after {} attempt(s)",
           id,
           !changeIsPresent
               ? "not present yet"
               : (changeIsConsistent ? "is" : "is not") + " consistent",
-          indexEvent,
-          retryCount);
-      if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
+          retry.getEvent(),
+          retry.getRetryNumber());
+
+      retry.incrementRetryNumber();
+      if (!rescheduleIndex(id)) {
         log.error(
             "Change {} {} in the local Git repository (event={})",
             id,
             !changeIsPresent
                 ? "could not be found"
                 : (changeIsConsistent ? "was" : "was not") + " consistent",
-            indexEvent);
+            retry.getEvent());
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
index c4906a9..d05405f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
@@ -52,7 +52,7 @@
 
   @Override
   protected void doIndex(String uuid, Optional<GroupIndexEvent> event) {
-    attemptToIndex(uuid, event, 0);
+    scheduleIndexing(uuid, event, this::reindex);
   }
 
   @Override
@@ -66,9 +66,8 @@
   }
 
   @Override
-  protected void attemptToIndex(
-      String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) {
-    reindexAndCheckIsUpToDate(uuid, groupIndexEvent, groupChecker, retryCount);
+  protected void attemptToIndex(String uuid) {
+    reindexAndCheckIsUpToDate(uuid, groupChecker);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
index 3787a80..2054f10 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
@@ -52,7 +52,7 @@
 
   @Override
   protected void doIndex(String projectName, Optional<ProjectIndexEvent> event) {
-    attemptToIndex(projectName, event, 0);
+    scheduleIndexing(projectName, event, this::reindex);
   }
 
   @Override
@@ -66,8 +66,8 @@
   }
 
   @Override
-  protected void attemptToIndex(String id, Optional<ProjectIndexEvent> indexEvent, int retryCount) {
-    reindexAndCheckIsUpToDate(id, indexEvent, projectChecker, retryCount);
+  protected void attemptToIndex(String id) {
+    reindexAndCheckIsUpToDate(id, projectChecker);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
index 5c64431..f3d4e70 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
@@ -19,10 +19,13 @@
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.index.UpToDateChecker;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * Base class to handle forwarded indexing. This class is meant to be extended by classes used on
@@ -37,6 +40,7 @@
   private final int maxTries;
   private final ScheduledExecutorService indexExecutor;
   protected final OneOffRequestContext oneOffCtx;
+  protected final Map<T, IndexingRetry> indexingRetryTaskMap = new ConcurrentHashMap<>();
 
   ForwardedIndexingHandlerWithRetries(
       ScheduledExecutorService indexExecutor,
@@ -55,22 +59,34 @@
 
   protected abstract String indexName();
 
-  protected abstract void attemptToIndex(T id, Optional<E> indexEvent, int retryCount);
+  protected abstract void attemptToIndex(T id);
 
-  protected boolean rescheduleIndex(T id, Optional<E> indexEvent, int retryCount) {
-    if (retryCount > maxTries) {
+  protected boolean rescheduleIndex(T id) {
+    IndexingRetry retry = indexingRetryTaskMap.get(id);
+    if (retry == null) {
+      log.info(
+          "{} {} successfully indexed by different task, rescheduling isn't needed",
+          indexName(),
+          id);
+      return true;
+    }
+    if (retry.getRetryNumber() > maxTries) {
       log.error(
           "{} {} could not be indexed after {} retries. {} index could be stale.",
           indexName(),
           id,
-          retryCount,
+          retry.getRetryNumber(),
           indexName());
+      if (!indexingRetryTaskMap.remove(id, retry)) {
+        log.debug(
+            "{} {} not removed from retry map because of racy addition of a new retry indexing retry");
+      }
       return false;
     }
 
     log.warn(
         "Retrying for the #{} time to index {} {} after {} msecs",
-        retryCount,
+        retry.getRetryNumber(),
         indexName(),
         id,
         retryInterval);
@@ -80,7 +96,7 @@
             () -> {
               try (ManualRequestContext ctx = oneOffCtx.open()) {
                 Context.setForwardedEvent(true);
-                attemptToIndex(id, indexEvent, retryCount);
+                attemptToIndex(id);
               } catch (Exception e) {
                 log.warn("{} {} could not be indexed", indexName(), id, e);
               }
@@ -90,20 +106,66 @@
     return true;
   }
 
-  public final void reindexAndCheckIsUpToDate(
-      T id, Optional<E> indexEvent, UpToDateChecker<E> upToDateChecker, int retryCount) {
-    reindex(id);
-
-    if (!upToDateChecker.isUpToDate(indexEvent)) {
-      log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
-      rescheduleIndex(id, indexEvent, retryCount + 1);
+  public void scheduleIndexing(T id, Optional<E> event, Consumer<T> indexOnce) {
+    IndexingRetry retry = new IndexingRetry(event);
+    if (indexingRetryTaskMap.put(id, retry) != null) {
+      indexOnce.accept(id);
+      log.info(
+          "Skipping indexing because there is already a running task for the specified id. Index name: {}, task id: {}",
+          indexName(),
+          id);
       return;
     }
-    if (retryCount > 0) {
+    attemptToIndex(id);
+  }
+
+  public final void reindexAndCheckIsUpToDate(T id, UpToDateChecker<E> upToDateChecker) {
+    reindex(id);
+    IndexingRetry retry = indexingRetryTaskMap.get(id);
+    if (retry == null) {
+      log.warn("{} {} successfully indexed by different task", indexName(), id);
+      return;
+    }
+    if (!upToDateChecker.isUpToDate(retry.getEvent())) {
+      log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
+      retry.incrementRetryNumber();
+      rescheduleIndex(id);
+      return;
+    }
+
+    if (retry.getRetryNumber() > 0) {
       log.warn(
-          "{} {} has been eventually indexed after {} attempt(s)", indexName(), id, retryCount);
+          "{} {} has been eventually indexed after {} attempt(s)",
+          indexName(),
+          id,
+          retry.getRetryNumber());
     } else {
       log.debug("{} {} successfully indexed", indexName(), id);
     }
+    if (!indexingRetryTaskMap.remove(id, retry)) {
+      log.debug(
+          "{} {} not removed from retry map because of racy addition of a new retry indexing retry");
+    }
+  }
+
+  public class IndexingRetry {
+    private final Optional<E> event;
+    private int retryNumber = 0;
+
+    public IndexingRetry(Optional<E> event) {
+      this.event = event;
+    }
+
+    public int getRetryNumber() {
+      return retryNumber;
+    }
+
+    public Optional<E> getEvent() {
+      return event;
+    }
+
+    public void incrementRetryNumber() {
+      ++retryNumber;
+    }
   }
 }