Merge branch 'stable-3.6' into stable-3.7

* stable-3.6:
  Specify cache.threads = 0 in the multi-site config documentation

Change-Id: Ia18c481aeba170f5127877abd875f012bf50bc1b
diff --git a/docker-compose.kafka-broker.yaml b/docker-compose.kafka-broker.yaml
deleted file mode 100644
index d3fc713..0000000
--- a/docker-compose.kafka-broker.yaml
+++ /dev/null
@@ -1,14 +0,0 @@
-version: '3'
-services:
-  zookeeper:
-    image: wurstmeister/zookeeper:latest
-    ports:
-      - "2181:2181"
-  kafka:
-    image: wurstmeister/kafka:2.13-2.6.3
-    ports:
-      - "9092:9092"
-    environment:
-      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
-      KAFKA_CREATE_TOPICS: "gerrit:1:1"
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
diff --git a/e2e-tests/docker-compose-kafka.yaml b/e2e-tests/docker-compose-kafka.yaml
index addb8d5..b02b58d 100644
--- a/e2e-tests/docker-compose-kafka.yaml
+++ b/e2e-tests/docker-compose-kafka.yaml
@@ -1,12 +1,11 @@
 version: '3'
 services:
   kafka:
-    image: wurstmeister/kafka:2.13-2.6.3
+    image: bitnami/kafka:3.6.0
     depends_on:
       - zookeeper
     environment:
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 
   gerrit1:
     depends_on:
diff --git a/e2e-tests/docker-compose.yaml b/e2e-tests/docker-compose.yaml
index 3abb757..2ac7b06 100644
--- a/e2e-tests/docker-compose.yaml
+++ b/e2e-tests/docker-compose.yaml
@@ -1,7 +1,9 @@
 version: '3'
 services:
   zookeeper:
-    image: wurstmeister/zookeeper:latest
+    image: bitnami/zookeeper:3.8.3
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: "true"
 
   gerrit1:
     image: gerritcodereview/gerrit:${GERRIT_IMAGE}
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index b069565..58029ba 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,11 +16,11 @@
 
 LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.6
+GERRIT_BRANCH=stable-3.7
 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
-DEF_GERRIT_IMAGE=3.4.0-centos8
+DEF_GERRIT_IMAGE=3.7.4
 DEF_GERRIT_HEALTHCHECK_START_PERIOD=60s
 DEF_GERRIT_HEALTHCHECK_INTERVAL=5s
 DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
diff --git a/setup_local_env/docker-compose-core.yaml b/setup_local_env/docker-compose-core.yaml
index dab168d..f8ef318 100644
--- a/setup_local_env/docker-compose-core.yaml
+++ b/setup_local_env/docker-compose-core.yaml
@@ -1,10 +1,12 @@
 version: '3'
 services:
   zookeeper:
-    image: wurstmeister/zookeeper:latest
+    image: bitnami/zookeeper:3.8.3
     ports:
       - "2181:2181"
     container_name: zk_test_node
+    environment:
+      ALLOW_ANONYMOUS_LOGIN: "true"
   prometheus:
     container_name: prometheus_test_node
     image: prom/prometheus:v2.16.0
diff --git a/setup_local_env/docker-compose-kafka.yaml b/setup_local_env/docker-compose-kafka.yaml
index 2a5b0fc..70fda58 100644
--- a/setup_local_env/docker-compose-kafka.yaml
+++ b/setup_local_env/docker-compose-kafka.yaml
@@ -1,13 +1,13 @@
 version: '3'
 services:
   kafka:
-    image: wurstmeister/kafka:2.13-2.6.3
+    image: bitnami/kafka:3.6.0
     ports:
       - "9092:9092"
     container_name: kafka_test_node
     environment:
-      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
+      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
     networks:
       - setup_local_env_default
 networks:
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 7697f24..d5ded8c 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,7 +16,7 @@
 
 
 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.6
+GERRIT_BRANCH=stable-3.7
 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index 522a78f..15f7c13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -56,34 +56,49 @@
 
   @Override
   protected void doIndex(String id, Optional<ChangeIndexEvent> indexEvent) {
-    attemptToIndex(id, indexEvent, 0);
+    scheduleIndexing(id, indexEvent, this::indexIfConsistent);
+  }
+
+  private void indexIfConsistent(String id) {
+    if (isChangeConsistent(id)) {
+      reindex(id);
+    }
+  }
+
+  private boolean isChangeConsistent(String id) {
+    ChangeChecker checker = changeCheckerFactory.create(id);
+    Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
+    return changeNotes.isPresent() && checker.isChangeConsistent();
   }
 
   @Override
-  protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
+  protected void attemptToIndex(String id) {
     ChangeChecker checker = changeCheckerFactory.create(id);
     Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
     boolean changeIsPresent = changeNotes.isPresent();
     boolean changeIsConsistent = checker.isChangeConsistent();
     if (changeIsPresent && changeIsConsistent) {
-      reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
+      reindexAndCheckIsUpToDate(id, checker);
     } else {
+      IndexingRetry retry = indexingRetryTaskMap.get(id);
       log.warn(
           "Change {} {} in local Git repository (event={}) after {} attempt(s)",
           id,
           !changeIsPresent
               ? "not present yet"
               : (changeIsConsistent ? "is" : "is not") + " consistent",
-          indexEvent,
-          retryCount);
-      if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
+          retry.getEvent(),
+          retry.getRetryNumber());
+
+      retry.incrementRetryNumber();
+      if (!rescheduleIndex(id)) {
         log.error(
             "Change {} {} in the local Git repository (event={})",
             id,
             !changeIsPresent
                 ? "could not be found"
                 : (changeIsConsistent ? "was" : "was not") + " consistent",
-            indexEvent);
+            retry.getEvent());
       }
     }
   }
@@ -95,7 +110,7 @@
       Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
       ChangeNotes notes = changeNotes.get();
       notes.reload();
-      indexer.index(notes.getChange());
+      indexer.index(notes);
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
index c4906a9..d05405f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
@@ -52,7 +52,7 @@
 
   @Override
   protected void doIndex(String uuid, Optional<GroupIndexEvent> event) {
-    attemptToIndex(uuid, event, 0);
+    scheduleIndexing(uuid, event, this::reindex);
   }
 
   @Override
@@ -66,9 +66,8 @@
   }
 
   @Override
-  protected void attemptToIndex(
-      String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) {
-    reindexAndCheckIsUpToDate(uuid, groupIndexEvent, groupChecker, retryCount);
+  protected void attemptToIndex(String uuid) {
+    reindexAndCheckIsUpToDate(uuid, groupChecker);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
index 3787a80..2054f10 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
@@ -52,7 +52,7 @@
 
   @Override
   protected void doIndex(String projectName, Optional<ProjectIndexEvent> event) {
-    attemptToIndex(projectName, event, 0);
+    scheduleIndexing(projectName, event, this::reindex);
   }
 
   @Override
@@ -66,8 +66,8 @@
   }
 
   @Override
-  protected void attemptToIndex(String id, Optional<ProjectIndexEvent> indexEvent, int retryCount) {
-    reindexAndCheckIsUpToDate(id, indexEvent, projectChecker, retryCount);
+  protected void attemptToIndex(String id) {
+    reindexAndCheckIsUpToDate(id, projectChecker);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
index 5c64431..f3d4e70 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
@@ -19,10 +19,13 @@
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.index.UpToDateChecker;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * Base class to handle forwarded indexing. This class is meant to be extended by classes used on
@@ -37,6 +40,7 @@
   private final int maxTries;
   private final ScheduledExecutorService indexExecutor;
   protected final OneOffRequestContext oneOffCtx;
+  protected final Map<T, IndexingRetry> indexingRetryTaskMap = new ConcurrentHashMap<>();
 
   ForwardedIndexingHandlerWithRetries(
       ScheduledExecutorService indexExecutor,
@@ -55,22 +59,34 @@
 
   protected abstract String indexName();
 
-  protected abstract void attemptToIndex(T id, Optional<E> indexEvent, int retryCount);
+  protected abstract void attemptToIndex(T id);
 
-  protected boolean rescheduleIndex(T id, Optional<E> indexEvent, int retryCount) {
-    if (retryCount > maxTries) {
+  protected boolean rescheduleIndex(T id) {
+    IndexingRetry retry = indexingRetryTaskMap.get(id);
+    if (retry == null) {
+      log.info(
+          "{} {} successfully indexed by different task, rescheduling isn't needed",
+          indexName(),
+          id);
+      return true;
+    }
+    if (retry.getRetryNumber() > maxTries) {
       log.error(
           "{} {} could not be indexed after {} retries. {} index could be stale.",
           indexName(),
           id,
-          retryCount,
+          retry.getRetryNumber(),
           indexName());
+      if (!indexingRetryTaskMap.remove(id, retry)) {
+        log.debug(
+            "{} {} not removed from retry map because of racy addition of a new retry indexing retry");
+      }
       return false;
     }
 
     log.warn(
         "Retrying for the #{} time to index {} {} after {} msecs",
-        retryCount,
+        retry.getRetryNumber(),
         indexName(),
         id,
         retryInterval);
@@ -80,7 +96,7 @@
             () -> {
               try (ManualRequestContext ctx = oneOffCtx.open()) {
                 Context.setForwardedEvent(true);
-                attemptToIndex(id, indexEvent, retryCount);
+                attemptToIndex(id);
               } catch (Exception e) {
                 log.warn("{} {} could not be indexed", indexName(), id, e);
               }
@@ -90,20 +106,66 @@
     return true;
   }
 
-  public final void reindexAndCheckIsUpToDate(
-      T id, Optional<E> indexEvent, UpToDateChecker<E> upToDateChecker, int retryCount) {
-    reindex(id);
-
-    if (!upToDateChecker.isUpToDate(indexEvent)) {
-      log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
-      rescheduleIndex(id, indexEvent, retryCount + 1);
+  public void scheduleIndexing(T id, Optional<E> event, Consumer<T> indexOnce) {
+    IndexingRetry retry = new IndexingRetry(event);
+    if (indexingRetryTaskMap.put(id, retry) != null) {
+      indexOnce.accept(id);
+      log.info(
+          "Skipping indexing because there is already a running task for the specified id. Index name: {}, task id: {}",
+          indexName(),
+          id);
       return;
     }
-    if (retryCount > 0) {
+    attemptToIndex(id);
+  }
+
+  public final void reindexAndCheckIsUpToDate(T id, UpToDateChecker<E> upToDateChecker) {
+    reindex(id);
+    IndexingRetry retry = indexingRetryTaskMap.get(id);
+    if (retry == null) {
+      log.warn("{} {} successfully indexed by different task", indexName(), id);
+      return;
+    }
+    if (!upToDateChecker.isUpToDate(retry.getEvent())) {
+      log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
+      retry.incrementRetryNumber();
+      rescheduleIndex(id);
+      return;
+    }
+
+    if (retry.getRetryNumber() > 0) {
       log.warn(
-          "{} {} has been eventually indexed after {} attempt(s)", indexName(), id, retryCount);
+          "{} {} has been eventually indexed after {} attempt(s)",
+          indexName(),
+          id,
+          retry.getRetryNumber());
     } else {
       log.debug("{} {} successfully indexed", indexName(), id);
     }
+    if (!indexingRetryTaskMap.remove(id, retry)) {
+      log.debug(
+          "{} {} not removed from retry map because of racy addition of a new retry indexing retry");
+    }
+  }
+
+  public class IndexingRetry {
+    private final Optional<E> event;
+    private int retryNumber = 0;
+
+    public IndexingRetry(Optional<E> event) {
+      this.event = event;
+    }
+
+    public int getRetryNumber() {
+      return retryNumber;
+    }
+
+    public Optional<E> getEvent() {
+      return event;
+    }
+
+    public void incrementRetryNumber() {
+      ++retryNumber;
+    }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index 2c4b157..7133a0d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -88,7 +88,6 @@
     when(ctxMock.open()).thenReturn(manualRequestContextMock);
     id = Change.id(TEST_CHANGE_NUMBER);
     change = new Change(null, id, null, null, TimeUtil.now());
-    when(changeNotes.getChange()).thenReturn(change);
     when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
     when(configurationMock.index()).thenReturn(index);
     when(index.numStripedLocks()).thenReturn(10);
@@ -102,7 +101,7 @@
   public void changeIsIndexedWhenUpToDate() throws Exception {
     setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
-    verify(indexerMock, times(1)).index(any(Change.class));
+    verify(indexerMock, times(1)).index(any(ChangeNotes.class));
   }
 
   @Test
@@ -112,7 +111,7 @@
         TEST_CHANGE_ID,
         Operation.INDEX,
         Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
-    verify(indexerMock, times(1)).index(any(Change.class));
+    verify(indexerMock, times(1)).index(any(ChangeNotes.class));
   }
 
   @Test
@@ -127,14 +126,14 @@
         TEST_CHANGE_ID,
         Operation.INDEX,
         Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
-    verify(indexerMock, never()).index(any(Change.class));
+    verify(indexerMock, never()).index(any(ChangeNotes.class));
     verify(indexExecutorMock, times(1)).schedule(any(Runnable.class), anyLong(), any());
 
     handler.index(
         TEST_CHANGE_ID,
         Operation.INDEX,
         Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
-    verify(indexerMock, times(1)).index(any(Change.class));
+    verify(indexerMock, times(1)).index(any(ChangeNotes.class));
   }
 
   @Test
@@ -172,13 +171,13 @@
                   return null;
                 })
         .when(indexerMock)
-        .index(any(Change.class));
+        .index(any(ChangeNotes.class));
 
     assertThat(Context.isForwardedEvent()).isFalse();
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
     assertThat(Context.isForwardedEvent()).isFalse();
 
-    verify(indexerMock, times(1)).index(any(Change.class));
+    verify(indexerMock, times(1)).index(any(ChangeNotes.class));
   }
 
   @Test
@@ -191,7 +190,7 @@
                   throw new IOException("someMessage");
                 })
         .when(indexerMock)
-        .index(any(Change.class));
+        .index(any(ChangeNotes.class));
 
     assertThat(Context.isForwardedEvent()).isFalse();
     IOException thrown =
@@ -201,7 +200,7 @@
     assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
     assertThat(Context.isForwardedEvent()).isFalse();
 
-    verify(indexerMock, times(1)).index(any(Change.class));
+    verify(indexerMock, times(1)).index(any(ChangeNotes.class));
   }
 
   private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate)
@@ -226,7 +225,7 @@
       when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock);
       when(changeCheckerPresentMock.getChangeNotes()).thenReturn(Optional.of(changeNotes));
       if (storageException) {
-        doThrow(new StorageException("io-error")).when(indexerMock).index(any(Change.class));
+        doThrow(new StorageException("io-error")).when(indexerMock).index(any(ChangeNotes.class));
       }
     }