Retry upon consuming project-index messages

This change fixes an issue whereby project creation wouldn't be
propagated to consuming nodes.

When consuming a project-index message, the ProjectIndexerImpl.index
method is eventuall called.
The latter only writes the project in index when the project has
already been inserted into the ProjectCache.

At indexing time, if the project is not in the ProjectCache, then the
Project Indexer will assume it has been deleted and thus it will remove
it from the index, rather than adding it.

The fix in this change is to wait for the project to be in cache before
attemping to call the Project Indexer.

Change-Id: I4db4ce8add56626343195b91ba79742b88668143
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
index df3ac1f..7ac9307 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
@@ -20,8 +20,12 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.index.ForwardedIndexExecutor;
+import com.googlesource.gerrit.plugins.multisite.index.ProjectChecker;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Index a project using {@link ProjectIndexer}. This class is meant to be used on the receiving
@@ -33,17 +37,81 @@
 public class ForwardedIndexProjectHandler
     extends ForwardedIndexingHandler<String, ProjectIndexEvent> {
   private final ProjectIndexer indexer;
+  private final int retryInterval;
+  private final int maxTries;
+  private final ProjectChecker projectChecker;
+  private final ScheduledExecutorService indexExecutor;
 
   @Inject
-  ForwardedIndexProjectHandler(ProjectIndexer indexer, Configuration config) {
+  ForwardedIndexProjectHandler(
+      ProjectIndexer indexer,
+      ProjectChecker projectChecker,
+      @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
+      Configuration config) {
     super(config.index().numStripedLocks());
     this.indexer = indexer;
+    Configuration.Index indexConfig = config.index();
+    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
+    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
+    this.indexExecutor = indexExecutor;
+    this.projectChecker = projectChecker;
   }
 
   @Override
   protected void doIndex(String projectName, Optional<ProjectIndexEvent> event) throws IOException {
-    indexer.index(new Project.NameKey(projectName));
-    log.debug("Project {} successfully indexed", projectName);
+    if (!attemptIndex(projectName, event)) {
+      log.warn("First Attempt failed, scheduling again after {} msecs", retryInterval);
+      rescheduleIndex(projectName, event, 1);
+    }
+  }
+
+  public boolean attemptIndex(String projectName, Optional<ProjectIndexEvent> event)
+      throws IOException {
+    log.debug("Attempt to index project {}, event: [{}]", projectName, event);
+    final Project.NameKey projectNameKey = new Project.NameKey(projectName);
+    if (projectChecker.isProjectUpToDate(projectNameKey)) {
+      indexer.index(projectNameKey);
+      log.debug("Project {} successfully indexed", projectName);
+      return true;
+    }
+    return false;
+  }
+
+  public void rescheduleIndex(
+      String projectName, Optional<ProjectIndexEvent> event, int retryCount) {
+    if (retryCount > maxTries) {
+      log.error(
+          "Project {} could not be indexed after {} retries. index could be stale.",
+          projectName,
+          retryCount);
+
+      return;
+    }
+
+    log.warn(
+        "Retrying for the #{} time to index {} project {} after {} msecs",
+        retryCount,
+        projectName,
+        retryInterval);
+
+    indexExecutor.schedule(
+        () -> {
+          Context.setForwardedEvent(true);
+          try {
+            if (!attemptIndex(projectName, event)) {
+              log.warn(
+                  "Attempt {} to index project {} failed, scheduling again after {} msecs",
+                  retryCount,
+                  projectName,
+                  retryInterval);
+              rescheduleIndex(projectName, event, retryCount + 1);
+            }
+          } catch (IOException e) {
+            log.warn("Project {} could not be indexed", projectName, e);
+          }
+        },
+        retryInterval,
+        TimeUnit.MILLISECONDS);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
index a9183d1..1f54385 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
@@ -38,6 +38,8 @@
     DynamicSet.bind(binder(), GroupIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), ProjectIndexedListener.class).to(IndexEventHandler.class);
 
+    bind(ProjectChecker.class).to(ProjectCheckerImpl.class);
+
     install(
         new FactoryModuleBuilder()
             .implement(ChangeChecker.class, ChangeCheckerImpl.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java
new file mode 100644
index 0000000..3fabbbd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java
@@ -0,0 +1,22 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.gerrit.reviewdb.client.Project;
+
+/** Encapsulates the logic of verifying the up-to-date status of a project. */
+public interface ProjectChecker {
+  boolean isProjectUpToDate(Project.NameKey projectName);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java
new file mode 100644
index 0000000..606ff9d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+
+public class ProjectCheckerImpl implements ProjectChecker {
+  private final ProjectCache projectCache;
+
+  @Inject
+  ProjectCheckerImpl(ProjectCache projectCache) {
+    this.projectCache = projectCache;
+  }
+
+  @Override
+  public boolean isProjectUpToDate(Project.NameKey projectName) {
+    return projectCache.get(projectName) != null;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
index 35ff4d3..9f94d5f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -24,8 +25,10 @@
 import com.google.gerrit.reviewdb.client.Project;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation;
+import com.googlesource.gerrit.plugins.multisite.index.ProjectChecker;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,7 +44,9 @@
   @Rule public ExpectedException exception = ExpectedException.none();
   @Mock private ProjectIndexer indexerMock;
   @Mock private Configuration configMock;
+  @Mock private ProjectChecker projectCheckerMock;
   @Mock private Configuration.Index indexMock;
+  @Mock private ScheduledExecutorService indexExecutorMock;
   private ForwardedIndexProjectHandler handler;
   private String nameKey;
 
@@ -49,7 +54,12 @@
   public void setUp() {
     when(configMock.index()).thenReturn(indexMock);
     when(indexMock.numStripedLocks()).thenReturn(10);
-    handler = new ForwardedIndexProjectHandler(indexerMock, configMock);
+    when(indexMock.retryInterval()).thenReturn(0);
+    when(indexMock.maxTries()).thenReturn(2);
+    when(projectCheckerMock.isProjectUpToDate(any())).thenReturn(true);
+    handler =
+        new ForwardedIndexProjectHandler(
+            indexerMock, projectCheckerMock, indexExecutorMock, configMock);
     nameKey = "project/name";
   }
 
@@ -108,4 +118,13 @@
 
     verify(indexerMock).index(new Project.NameKey(nameKey));
   }
+
+  @Test
+  public void indexAttemptShouldFailWhenCheckerFails() throws Exception {
+    handler =
+        new ForwardedIndexProjectHandler(
+            indexerMock, (projectName) -> false, indexExecutorMock, configMock);
+
+    assertThat(handler.attemptIndex(nameKey, Optional.empty())).isFalse();
+  }
 }