Add replication delay for asynchronous fetch calls

Usually single change contains multiple ref updates. Adding replication
delay ensure that all refs are fetch together. This allows to avoid
issue with reindexing change with partially fetched refs.

Bug: Issue 13340
Change-Id: I4b25b940ab8b55faae2a5a712d6d697d8388c85e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
index baffff7..bcd86d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
@@ -29,7 +29,11 @@
   private final ReplicationStateListener stateLog;
 
   public interface Factory {
-    FetchAll create(String urlMatch, ReplicationFilter filter, ReplicationState state, boolean now);
+    FetchAll create(
+        String urlMatch,
+        ReplicationFilter filter,
+        ReplicationState state,
+        ReplicationType replicationType);
   }
 
   private final WorkQueue workQueue;
@@ -37,7 +41,7 @@
   private final String urlMatch;
   private final ReplicationFilter filter;
   private final ReplicationState state;
-  private final boolean now;
+  private final ReplicationType replicationType;
   private final SourcesCollection sources;
 
   @Inject
@@ -49,7 +53,7 @@
       @Assisted @Nullable String urlMatch,
       @Assisted ReplicationFilter filter,
       @Assisted ReplicationState state,
-      @Assisted boolean now) {
+      @Assisted ReplicationType replicationType) {
     this.workQueue = wq;
     this.projectCache = projectCache;
     this.stateLog = stateLog;
@@ -57,7 +61,7 @@
     this.urlMatch = urlMatch;
     this.filter = filter;
     this.state = state;
-    this.now = now;
+    this.replicationType = replicationType;
   }
 
   Future<?> schedule(long delay, TimeUnit unit) {
@@ -69,7 +73,7 @@
     try {
       for (Project.NameKey nameKey : projectCache.all()) {
         if (filter.matches(nameKey)) {
-          scheduleFullSync(nameKey, urlMatch, state, now);
+          scheduleFullSync(nameKey, urlMatch, state, replicationType);
         }
       }
     } catch (Exception e) {
@@ -79,12 +83,15 @@
   }
 
   private void scheduleFullSync(
-      Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
+      Project.NameKey project,
+      String urlMatch,
+      ReplicationState state,
+      ReplicationType replicationType) {
 
     for (Source cfg : sources.getAll()) {
       if (cfg.wouldFetchProject(project)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, now);
+          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, replicationType);
         }
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
index d8c4a8d..5cf8bb6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+
 import com.google.common.util.concurrent.Atomics;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -65,7 +67,7 @@
               new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get()));
       fetchAllFuture.set(
           fetchAll
-              .create(null, ReplicationFilter.all(), state, false)
+              .create(null, ReplicationFilter.all(), state, ASYNC)
               .schedule(30, TimeUnit.SECONDS));
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationType.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationType.java
new file mode 100644
index 0000000..308a90b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationType.java
@@ -0,0 +1,20 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+public enum ReplicationType {
+  SYNC,
+  ASYNC
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0d4c57d..3847426 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.resolveNodeName;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
@@ -383,13 +384,20 @@
   }
 
   public Future<?> schedule(
-      Project.NameKey project, String ref, ReplicationState state, boolean now) {
+      Project.NameKey project,
+      String ref,
+      ReplicationState state,
+      ReplicationType replicationType) {
     URIish uri = getURI(project);
-    return schedule(project, ref, uri, state, now);
+    return schedule(project, ref, uri, state, replicationType);
   }
 
   public Future<?> schedule(
-      Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
+      Project.NameKey project,
+      String ref,
+      URIish uri,
+      ReplicationState state,
+      ReplicationType replicationType) {
 
     repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
     if (!shouldReplicate(project, ref, state)) {
@@ -429,7 +437,7 @@
         addRef(e, ref);
         e.addState(ref, state);
         pending.put(uri, e);
-        f = pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        f = pool.schedule(e, isSyncCall(replicationType) ? 0 : config.getDelay(), TimeUnit.SECONDS);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
         e.addState(ref, state);
@@ -452,6 +460,10 @@
     postReplicationScheduledEvent(e, ref);
   }
 
+  private boolean isSyncCall(ReplicationType replicationType) {
+    return SYNC.equals(replicationType);
+  }
+
   /**
    * It schedules again a FetchOp instance.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index e50461d..8046d49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -23,7 +23,7 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 
 public class SourceConfiguration implements RemoteConfiguration {
-  static final int DEFAULT_REPLICATION_DELAY = 0;
+  static final int DEFAULT_REPLICATION_DELAY = 4;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
   static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
index 6ababe9..97f8e9e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
@@ -14,6 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
+
 import com.google.gerrit.extensions.annotations.RequiresCapability;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
@@ -77,7 +80,10 @@
       projectFilter = new ReplicationFilter(projectPatterns);
     }
 
-    future = fetchFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
+    future =
+        fetchFactory
+            .create(urlMatch, projectFilter, state, replicationType(now))
+            .schedule(0, TimeUnit.SECONDS);
 
     if (wait) {
       if (future != null) {
@@ -105,6 +111,10 @@
     }
   }
 
+  private ReplicationType replicationType(Boolean now) {
+    return now ? SYNC : ASYNC;
+  }
+
   @Override
   public void writeStdOutSync(String message) {
     if (wait) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 6970299..92cc180 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -93,7 +93,7 @@
   private Response<?> applySync(Project.NameKey project, Input input)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    command.fetch(project, input.label, input.refName);
+    command.fetchSync(project, input.label, input.refName);
     return Response.created(input);
   }
 
@@ -127,7 +127,7 @@
     @Override
     public void run() {
       try {
-        command.fetch(project, input.label, input.refName);
+        command.fetchAsync(project, input.label, input.refName);
       } catch (InterruptedException
           | ExecutionException
           | RemoteConfigurationMissingException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index 1088d9b..e1ac9ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -14,6 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
+
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
@@ -22,6 +25,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationType;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
@@ -50,7 +54,19 @@
     this.eventDispatcher = eventDispatcher;
   }
 
-  public void fetch(Project.NameKey name, String label, String refName)
+  public void fetchAsync(Project.NameKey name, String label, String refName)
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    fetch(name, label, refName, ASYNC);
+  }
+
+  public void fetchSync(Project.NameKey name, String label, String refName)
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    fetch(name, label, refName, SYNC);
+  }
+
+  private void fetch(Project.NameKey name, String label, String refName, ReplicationType fetchType)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
     ReplicationState state =
@@ -66,7 +82,7 @@
 
     try {
       state.markAllFetchTasksScheduled();
-      Future<?> future = source.get().schedule(name, refName, state, true);
+      Future<?> future = source.get().schedule(name, refName, state, fetchType);
       future.get(source.get().getTimeout(), TimeUnit.SECONDS);
     } catch (ExecutionException
         | IllegalStateException
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index affd094..126cd1b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -292,13 +292,14 @@
 	Defaults to 0 seconds, wait indefinitely.
 
 remote.NAME.replicationDelay
-:	Time to wait before scheduling a remote fetch operation. Setting
-	the delay to 0 effectively disables the delay, causing the fetch
-	to start as soon as possible.
+:	Time to wait before scheduling an asynchronous remote fetch
+	operation. Setting the delay to 0 effectively disables the delay,
+	causing the fetch to start as soon as possible.
 
 	This is a Gerrit specific extension to the Git remote block.
 
-	By default, 0 seconds.
+	By default for asynchronous fetch, 4 seconds. For a synchronous fetch
+	replicationDelay is zero.
 
 remote.NAME.rescheduleDelay
 :	Delay when rescheduling a fetch operation due to an in-flight fetch
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
index 2c888c8..fb7f3d1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -147,7 +147,7 @@
     inputParams.label = label;
     inputParams.refName = refName;
 
-    doThrow(new InterruptedException()).when(fetchCommand).fetch(any(), any(), any());
+    doThrow(new InterruptedException()).when(fetchCommand).fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -162,7 +162,7 @@
 
     doThrow(new RemoteConfigurationMissingException(""))
         .when(fetchCommand)
-        .fetch(any(), any(), any());
+        .fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -177,7 +177,7 @@
 
     doThrow(new ExecutionException(new RuntimeException()))
         .when(fetchCommand)
-        .fetch(any(), any(), any());
+        .fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -190,7 +190,7 @@
     inputParams.label = label;
     inputParams.refName = refName;
 
-    doThrow(new IllegalStateException()).when(fetchCommand).fetch(any(), any(), any());
+    doThrow(new IllegalStateException()).when(fetchCommand).fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -203,7 +203,7 @@
     inputParams.label = label;
     inputParams.refName = refName;
 
-    doThrow(new TimeoutException()).when(fetchCommand).fetch(any(), any(), any());
+    doThrow(new TimeoutException()).when(fetchCommand).fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
index 4cdad48..e1ad565 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -15,6 +15,8 @@
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anyString;
@@ -74,7 +76,7 @@
     when(fetchReplicationStateFactory.create(any())).thenReturn(state);
     when(source.getRemoteConfigName()).thenReturn(label);
     when(sources.getAll()).thenReturn(Lists.newArrayList(source));
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true))
+    when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any()))
         .thenReturn(CompletableFuture.completedFuture(null));
     objectUnderTest =
         new FetchCommand(fetchReplicationStateFactory, fetchStateLog, sources, eventDispatcher);
@@ -84,18 +86,27 @@
   public void shouldScheduleRefFetch()
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH);
+    objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, true);
+    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
+  }
+
+  @Test
+  public void shouldScheduleRefFetchWithDelay()
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    objectUnderTest.fetchAsync(projectName, label, REF_NAME_TO_FETCH);
+
+    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, ASYNC);
   }
 
   @Test
   public void shouldMarkAllFetchTasksScheduled()
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH);
+    objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, true);
+    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
     verify(state, times(1)).markAllFetchTasksScheduled();
   }
 
@@ -103,7 +114,7 @@
   public void shouldUpdateStateWhenRemoteConfigNameIsMissing() {
     assertThrows(
         RemoteConfigurationMissingException.class,
-        () -> objectUnderTest.fetch(projectName, "unknownLabel", REF_NAME_TO_FETCH));
+        () -> objectUnderTest.fetchSync(projectName, "unknownLabel", REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(state));
   }
 
@@ -112,12 +123,12 @@
   public void shouldUpdateStateWhenInterruptedException()
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new InterruptedException());
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
 
     InterruptedException e =
         assertThrows(
             InterruptedException.class,
-            () -> objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH));
+            () -> objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
   }
 
@@ -127,12 +138,12 @@
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS)))
         .thenThrow(new ExecutionException(new Exception()));
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
 
     ExecutionException e =
         assertThrows(
             ExecutionException.class,
-            () -> objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH));
+            () -> objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
   }
 
@@ -141,12 +152,12 @@
   public void shouldUpdateStateWhenTimeoutException()
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new TimeoutException());
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
 
     TimeoutException e =
         assertThrows(
             TimeoutException.class,
-            () -> objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH));
+            () -> objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
   }
 }