Introduce common index with retries and move group handler

Indexing should be attempted multiple times, until the result is
validated by an UpToDateChecker, which will assess whether the entity
being indexed can now be considered up-to-date.

Because of that, all index handlers share the same structure, they all
need a way of checking whether data is up-to-date, they all need a
scheduler to retry, they all need to read the same [index]
configuration.

Create a base class for index handler and start by moving group index
handler to use it.

Subsequent changes will address other index handlers.

Bug: Issue 14332
Change-Id: Ic46bef39bc72004dc2f5a39eac882db8aa414c55
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
index edc2364..c4906a9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
@@ -16,6 +16,7 @@
 
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -23,9 +24,7 @@
 import com.googlesource.gerrit.plugins.multisite.index.ForwardedIndexExecutor;
 import com.googlesource.gerrit.plugins.multisite.index.GroupChecker;
 import java.util.Optional;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Index a group using {@link GroupIndexer}. This class is meant to be used on the receiving side of
@@ -34,76 +33,42 @@
  * done for the same group uuid
  */
 @Singleton
-public class ForwardedIndexGroupHandler extends ForwardedIndexingHandler<String, GroupIndexEvent> {
+public class ForwardedIndexGroupHandler
+    extends ForwardedIndexingHandlerWithRetries<String, GroupIndexEvent> {
   private final GroupIndexer indexer;
   private final GroupChecker groupChecker;
-  private final ScheduledExecutorService indexExecutor;
-  private final int retryInterval;
-  private final int maxTries;
 
   @Inject
   ForwardedIndexGroupHandler(
       GroupIndexer indexer,
       Configuration config,
       GroupChecker groupChecker,
+      OneOffRequestContext oneOffRequestContext,
       @ForwardedIndexExecutor ScheduledExecutorService indexExecutor) {
-    super(config.index().numStripedLocks());
+    super(indexExecutor, config, oneOffRequestContext);
     this.indexer = indexer;
     this.groupChecker = groupChecker;
-    this.indexExecutor = indexExecutor;
-    Configuration.Index indexConfig = config.index();
-    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
-    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
   }
 
   @Override
   protected void doIndex(String uuid, Optional<GroupIndexEvent> event) {
-    doIndex(uuid, event, 0);
+    attemptToIndex(uuid, event, 0);
   }
 
-  protected void doIndex(String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) {
-    indexer.index(AccountGroup.uuid(uuid));
-    if (groupChecker.isGroupUpToDate(groupIndexEvent)) {
-      if (retryCount > 0) {
-        log.warn("Group '{}' has been eventually indexed after {} attempt(s)", uuid, retryCount);
-      } else {
-        log.debug("Group '{}' successfully indexed", uuid);
-      }
-    } else {
-      log.debug("Group '{}' rescheduling indexing", uuid);
-      rescheduleIndex(uuid, groupIndexEvent, retryCount + 1);
-    }
+  @Override
+  protected void reindex(String id) {
+    indexer.index(AccountGroup.uuid(id));
   }
 
-  private boolean rescheduleIndex(
-      String uuid, Optional<GroupIndexEvent> indexEvent, int retryCount) {
-    if (retryCount > maxTries) {
-      log.error(
-          "Group '{}' could not be indexed after {} retries. Group index could be stale.",
-          uuid,
-          retryCount);
-      return false;
-    }
+  @Override
+  protected String indexName() {
+    return "group";
+  }
 
-    log.warn(
-        "Retrying for the #{} time to index Group {} after {} msecs",
-        retryCount,
-        uuid,
-        retryInterval);
-    @SuppressWarnings("unused")
-    Future<?> possiblyIgnoredError =
-        indexExecutor.schedule(
-            () -> {
-              try {
-                Context.setForwardedEvent(true);
-                doIndex(uuid, indexEvent, retryCount);
-              } catch (Exception e) {
-                log.warn("Group {} could not be indexed", uuid, e);
-              }
-            },
-            retryInterval,
-            TimeUnit.MILLISECONDS);
-    return true;
+  @Override
+  protected void attemptToIndex(
+      String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) {
+    reindexAndCheckIsUpToDate(uuid, groupIndexEvent, groupChecker, retryCount);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
new file mode 100644
index 0000000..5c64431
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.index.UpToDateChecker;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class to handle forwarded indexing. This class is meant to be extended by classes used on
+ * the receiving side of the {@link IndexEvent} since it will prevent indexing to be forwarded again
+ * causing an infinite forwarding loop between the 2 nodes. It will also make sure no concurrent
+ * indexing is done for the same id.
+ */
+public abstract class ForwardedIndexingHandlerWithRetries<T, E extends IndexEvent>
+    extends ForwardedIndexingHandler<T, E> {
+
+  private final int retryInterval;
+  private final int maxTries;
+  private final ScheduledExecutorService indexExecutor;
+  protected final OneOffRequestContext oneOffCtx;
+
+  ForwardedIndexingHandlerWithRetries(
+      ScheduledExecutorService indexExecutor,
+      Configuration configuration,
+      OneOffRequestContext oneOffCtx) {
+    super(configuration.index().numStripedLocks());
+
+    Configuration.Index indexConfig = configuration.index();
+    this.oneOffCtx = oneOffCtx;
+    this.indexExecutor = indexExecutor;
+    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
+    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
+  }
+
+  protected abstract void reindex(T id);
+
+  protected abstract String indexName();
+
+  protected abstract void attemptToIndex(T id, Optional<E> indexEvent, int retryCount);
+
+  protected boolean rescheduleIndex(T id, Optional<E> indexEvent, int retryCount) {
+    if (retryCount > maxTries) {
+      log.error(
+          "{} {} could not be indexed after {} retries. {} index could be stale.",
+          indexName(),
+          id,
+          retryCount,
+          indexName());
+      return false;
+    }
+
+    log.warn(
+        "Retrying for the #{} time to index {} {} after {} msecs",
+        retryCount,
+        indexName(),
+        id,
+        retryInterval);
+    @SuppressWarnings("unused")
+    Future<?> possiblyIgnoredError =
+        indexExecutor.schedule(
+            () -> {
+              try (ManualRequestContext ctx = oneOffCtx.open()) {
+                Context.setForwardedEvent(true);
+                attemptToIndex(id, indexEvent, retryCount);
+              } catch (Exception e) {
+                log.warn("{} {} could not be indexed", indexName(), id, e);
+              }
+            },
+            retryInterval,
+            TimeUnit.MILLISECONDS);
+    return true;
+  }
+
+  public final void reindexAndCheckIsUpToDate(
+      T id, Optional<E> indexEvent, UpToDateChecker<E> upToDateChecker, int retryCount) {
+    reindex(id);
+
+    if (!upToDateChecker.isUpToDate(indexEvent)) {
+      log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
+      rescheduleIndex(id, indexEvent, retryCount + 1);
+      return;
+    }
+    if (retryCount > 0) {
+      log.warn(
+          "{} {} has been eventually indexed after {} attempt(s)", indexName(), id, retryCount);
+    } else {
+      log.debug("{} {} successfully indexed", indexName(), id);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java
index 03ac30c..c19d04a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java
@@ -18,8 +18,10 @@
 import java.util.Optional;
 import org.eclipse.jgit.lib.ObjectId;
 
-public interface GroupChecker {
-  boolean isGroupUpToDate(Optional<GroupIndexEvent> groupIndexEvent);
+public interface GroupChecker extends UpToDateChecker<GroupIndexEvent> {
+
+  @Override
+  boolean isUpToDate(Optional<GroupIndexEvent> groupIndexEvent);
 
   ObjectId getGroupHead(String groupUUID);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java
index f167e51..e9e40f3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java
@@ -44,7 +44,7 @@
   }
 
   @Override
-  public boolean isGroupUpToDate(Optional<GroupIndexEvent> groupIndexEvent) {
+  public boolean isUpToDate(Optional<GroupIndexEvent> groupIndexEvent) {
     if (!groupIndexEvent.isPresent()) {
       logger.atWarning().log("Group Index empty, considering this group up-to-date");
       return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/UpToDateChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/UpToDateChecker.java
new file mode 100644
index 0000000..8a7a231
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/UpToDateChecker.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import java.util.Optional;
+
+public interface UpToDateChecker<E extends IndexEvent> {
+  /**
+   * Check if the local Change is aligned with the indexEvent received.
+   *
+   * @param indexEvent indexing event
+   * @return true if the local Change is up-to-date, false otherwise.
+   */
+  boolean isUpToDate(Optional<E> indexEvent);
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index 3c0a3cf..982ac52 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -24,6 +24,7 @@
 
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
@@ -46,6 +47,7 @@
 
   @Rule public ExpectedException exception = ExpectedException.none();
   @Mock private GroupIndexer indexerMock;
+  @Mock private OneOffRequestContext ctxMock;
   @Mock private ScheduledExecutorService indexExecutorMock;
   @Mock private Configuration config;
   @Mock private Configuration.Index index;
@@ -135,7 +137,7 @@
 
   private ForwardedIndexGroupHandler groupHandler(boolean checkIsUpToDate) {
     return new ForwardedIndexGroupHandler(
-        indexerMock, config, new TestGroupChecker(checkIsUpToDate), indexExecutorMock);
+        indexerMock, config, new TestGroupChecker(checkIsUpToDate), ctxMock, indexExecutorMock);
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
index 13aeb9c..6403cce 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
@@ -57,15 +57,14 @@
 
   @Test
   public void isGroupUpToDate_shouldReturnTrueWhenEventIsEmpty() {
-    assertThat(objectUnderTest.isGroupUpToDate(Optional.empty())).isTrue();
+    assertThat(objectUnderTest.isUpToDate(Optional.empty())).isTrue();
   }
 
   @Test
   public void isGroupUpToDate_shouldReturnFalseWhenSha1DoesNotExistInAllUsers() {
     setCommitExistsInRepo(false);
     assertThat(
-            objectUnderTest.isGroupUpToDate(
-                groupIndexEvent(UUID.randomUUID().toString(), AN_OBJECT_ID)))
+            objectUnderTest.isUpToDate(groupIndexEvent(UUID.randomUUID().toString(), AN_OBJECT_ID)))
         .isFalse();
   }
 
@@ -73,8 +72,7 @@
   public void isGroupUpToDate_shouldReturnFalseWhenSha1ExistsInAllUsers() {
     setCommitExistsInRepo(true);
     assertThat(
-            objectUnderTest.isGroupUpToDate(
-                groupIndexEvent(UUID.randomUUID().toString(), AN_OBJECT_ID)))
+            objectUnderTest.isUpToDate(groupIndexEvent(UUID.randomUUID().toString(), AN_OBJECT_ID)))
         .isTrue();
   }
 
@@ -83,8 +81,7 @@
     UUID groupUUID = UUID.randomUUID();
     setCommitExistsInRepo(true);
 
-    assertThat(objectUnderTest.isGroupUpToDate(groupIndexEvent(groupUUID.toString(), null)))
-        .isTrue();
+    assertThat(objectUnderTest.isUpToDate(groupIndexEvent(groupUUID.toString(), null))).isTrue();
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java
index a6c2675..0af7e9e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java
@@ -31,7 +31,7 @@
   private static final String someObjectId = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
 
   @Override
-  public boolean isGroupUpToDate(Optional<GroupIndexEvent> groupIndexEvent) {
+  public boolean isUpToDate(Optional<GroupIndexEvent> groupIndexEvent) {
     return isUpToDate;
   }