Do not rely on async/wait for synchronous fetch replication

Before this change, the request for a sync FetchAction was triggering
an async schedule of a replication task, with a replication delay of
zero.

The consequence was the blocking of the client thread for an async
replication that, potentially, could have been stuck in the queue
waiting for other async replication to complete.
Scheduling a replication task does not guarantee that the fetch will
start immediately because it is influenced by the whole logic behind the
replication queue mechanism:
- waiting for the runway
- being aggregated with an existing pending task
- appended to the bottom of the replication queue

When the client requests a sync replication, it typically has other
client resources associated and locked that won't tolerate the async
execution. Faking a sync execution with async/wait for a Future to
complete would not allow the client resources to be released.

Skip the replication queue altogether and rely on the direct
FetchOp execution when the client REST-API FetchAction call has
requested synchronous execution.

Bug: Issue 304123378
Change-Id: Ia9171dc9525f2543ba16b1eb78308b0580839cbf
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 52f5422..50d6e80 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -293,22 +293,28 @@
     }
   }
 
+  public void runSync() {
+    try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
+      doRunFetchOperation(ReplicationType.SYNC);
+    }
+  }
+
   public Set<TransportException> getFetchFailures() {
     return fetchFailures;
   }
 
   private void runFetchOperation() {
     try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
-      doRunFetchOperation();
+      doRunFetchOperation(ReplicationType.ASYNC);
     }
   }
 
-  private void doRunFetchOperation() {
+  private void doRunFetchOperation(ReplicationType replicationType) {
     // Lock the queue, and remove ourselves, so we can't be modified once
     // we start replication (instead a new instance, with the same URI, is
     // created and scheduled for a future point in time.)
     //
-    if (!pool.requestRunway(this)) {
+    if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
       if (!canceled) {
         repLog.info(
             "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch task [{}].",
@@ -321,7 +327,8 @@
     }
 
     repLog.info(
-        "[{}] Replication from {} started for refs [{}] ...",
+        "[{}] {} replication from {} started for refs [{}] ...",
+        replicationType,
         taskIdHex,
         uri,
         String.join(",", getRefs()));
@@ -338,7 +345,8 @@
               .flatMap(metrics -> metrics.stop(config.getName()))
               .map(NANOSECONDS::toMillis);
       repLog.info(
-          "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
+          "[{}] {} replication from {} completed in {}ms, {}ms delay, {} retries{}",
+          replicationType,
           taskIdHex,
           uri,
           elapsed,
@@ -368,7 +376,7 @@
       repLog.error(
           String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
     } catch (TransportException e) {
-      if (e instanceof LockFailureException) {
+      if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
         lockRetryCount++;
         // The LockFailureException message contains both URI and reason
         // for this failure.
@@ -390,7 +398,7 @@
               taskIdHex,
               uri);
         }
-      } else {
+      } else if (replicationType == ReplicationType.ASYNC) {
         if (canceledWhileRunning.get()) {
           logCanceledWhileRunningException(e);
         } else {
@@ -410,7 +418,10 @@
       if (git != null) {
         git.close();
       }
-      pool.notifyFinished(this);
+
+      if (replicationType == ReplicationType.ASYNC) {
+        pool.notifyFinished(this);
+      }
     }
   }
 
@@ -442,7 +453,7 @@
     }
   }
 
-  private List<RefSpec> getFetchRefSpecs() {
+  public List<RefSpec> getFetchRefSpecs() {
     List<RefSpec> configRefSpecs = config.getFetchRefSpecs();
     if (delta.isEmpty()) {
       return configRefSpecs;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
index a62f369..6fa96b4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
@@ -47,6 +47,10 @@
   }
 
   private void stateWriteErr(String msg, ReplicationState[] states) {
+    if (states == null) {
+      return;
+    }
+
     for (ReplicationState rs : states) {
       if (rs != null) {
         rs.writeStdErr(msg);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 5e4314d..7e2e578 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -521,6 +521,22 @@
     }
   }
 
+  public Optional<FetchOne> fetchSync(
+      Project.NameKey project,
+      String ref,
+      Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
+    if (shouldReplicate(project, ref)
+        && (config.replicatePermissions() || !ref.equals(RefNames.REFS_CONFIG))) {
+
+      FetchOne e = opFactory.create(project, getURI(project), apiRequestMetrics);
+      e.addRef(ref);
+      e.runSync();
+      return Optional.of(e);
+    }
+
+    return Optional.empty();
+  }
+
   void scheduleDeleteProject(String uri, Project.NameKey project) {
     @SuppressWarnings("unused")
     ScheduledFuture<?> ignored =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index fdb4f8f..04797bd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -36,6 +36,7 @@
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.errors.TransportException;
 
 public class FetchAction implements RestModifyView<ProjectResource, Input> {
   private final FetchCommand command;
@@ -86,7 +87,8 @@
     } catch (InterruptedException
         | ExecutionException
         | IllegalStateException
-        | TimeoutException e) {
+        | TimeoutException
+        | TransportException e) {
       throw RestApiException.wrap(e.getMessage(), e);
     } catch (RemoteConfigurationMissingException e) {
       throw new UnprocessableEntityException(e.getMessage());
@@ -95,7 +97,7 @@
 
   private Response<?> applySync(Project.NameKey project, Input input)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
-          TimeoutException {
+          TimeoutException, TransportException {
     command.fetchSync(project, input.label, input.refName);
     return Response.created(input);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index 3a502ef..e89dc29 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.Command;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
@@ -29,11 +30,14 @@
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.transport.RefSpec;
 
 public class FetchCommand implements Command {
 
@@ -60,13 +64,13 @@
       String refName,
       PullReplicationApiRequestMetrics apiRequestMetrics)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
-          TimeoutException {
+          TimeoutException, TransportException {
     fetch(name, label, refName, ASYNC, Optional.of(apiRequestMetrics));
   }
 
   public void fetchSync(Project.NameKey name, String label, String refName)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
-          TimeoutException {
+          TimeoutException, TransportException {
     fetch(name, label, refName, SYNC, Optional.empty());
   }
 
@@ -77,10 +81,13 @@
       ReplicationType fetchType,
       Optional<PullReplicationApiRequestMetrics> apiRequestMetrics)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
-          TimeoutException {
+          TimeoutException, TransportException {
     ReplicationState state =
-        fetchReplicationStateFactory.create(
-            new FetchResultProcessing.CommandProcessing(this, eventDispatcher.get()));
+        fetchType == ReplicationType.ASYNC
+            ? fetchReplicationStateFactory.create(
+                new FetchResultProcessing.CommandProcessing(this, eventDispatcher.get()))
+            : null;
+
     Optional<Source> source =
         sources.getAll().stream().filter(s -> s.getRemoteConfigName().equals(label)).findFirst();
     if (!source.isPresent()) {
@@ -90,9 +97,21 @@
     }
 
     try {
-      state.markAllFetchTasksScheduled();
-      Future<?> future = source.get().schedule(name, refName, state, fetchType, apiRequestMetrics);
-      future.get(source.get().getTimeout(), TimeUnit.SECONDS);
+      if (fetchType == ReplicationType.ASYNC) {
+        state.markAllFetchTasksScheduled();
+        Future<?> future =
+            source.get().schedule(name, refName, state, fetchType, apiRequestMetrics);
+        future.get(source.get().getTimeout(), TimeUnit.SECONDS);
+      } else {
+        Optional<FetchOne> maybeFetch = source.get().fetchSync(name, refName, apiRequestMetrics);
+        if (maybeFetch.map(FetchOne::getFetchRefSpecs).filter(List::isEmpty).isPresent()) {
+          fetchStateLog.error(
+              String.format(
+                  "[%s] Nothing to fetch, ref-specs is empty", maybeFetch.get().getTaskIdHex()));
+        } else if (maybeFetch.map(fetch -> !fetch.hasSucceeded()).orElse(false)) {
+          throw newTransportException(maybeFetch.get());
+        }
+      }
     } catch (ExecutionException
         | IllegalStateException
         | TimeoutException
@@ -102,13 +121,24 @@
     }
 
     try {
-      state.waitForReplication(source.get().getTimeout());
+      if (state != null) {
+        state.waitForReplication(source.get().getTimeout());
+      }
     } catch (InterruptedException e) {
       writeStdErrSync("We are interrupted while waiting replication to complete");
       throw e;
     }
   }
 
+  private TransportException newTransportException(FetchOne fetchOne) {
+    List<RefSpec> fetchRefSpecs = fetchOne.getFetchRefSpecs();
+    String combinedErrorMessage =
+        fetchOne.getFetchFailures().stream()
+            .map(TransportException::getMessage)
+            .reduce("", (e1, e2) -> e1 + "\n" + e2);
+    return new TransportException(combinedErrorMessage + " trying to fetch " + fetchRefSpecs);
+  }
+
   @Override
   public void writeStdOutSync(String message) {}
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
index e15dd68..a613c0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
@@ -21,6 +21,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.errors.TransportException;
 
 public class FetchJob implements Runnable {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
@@ -54,7 +55,8 @@
     } catch (InterruptedException
         | ExecutionException
         | RemoteConfigurationMissingException
-        | TimeoutException e) {
+        | TimeoutException
+        | TransportException e) {
       log.atSevere().withCause(e).log(
           "Exception during the async fetch call for project %s, label %s and ref name %s",
           project.get(), input.label, input.refName);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index 7d8a164..22b3c86 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -21,6 +21,7 @@
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow;
 import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS;
 
+import com.google.common.base.Strings;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.config.GerritConfig;
@@ -40,15 +41,18 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
 import org.eclipse.jgit.junit.TestRepository;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.PushResult;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
+import org.eclipse.jgit.util.FS;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -62,6 +66,17 @@
     }
   }
 
+  protected boolean isAsyncReplication() {
+    FileBasedConfig config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    try {
+      config.load();
+    } catch (IOException | ConfigInvalidException e) {
+      throw new IllegalStateException(e);
+    }
+    return !Strings.isNullOrEmpty(config.getString("replication", null, "syncRefs"));
+  }
+
   @Override
   protected void setReplicationSource(
       String remoteName, List<String> replicaSuffixes, Optional<String> project)
@@ -113,10 +128,12 @@
   }
 
   private void assertTasksMetricScheduledAndCompleted(int numTasks) {
-    assertTasksMetric("scheduled", numTasks);
-    assertTasksMetric("started", numTasks);
-    assertTasksMetric("completed", numTasks);
-    assertEmptyTasksMetric("failed");
+    if (isAsyncReplication()) {
+      assertTasksMetric("scheduled", numTasks);
+      assertTasksMetric("started", numTasks);
+      assertTasksMetric("completed", numTasks);
+      assertEmptyTasksMetric("failed");
+    }
   }
 
   @Test
@@ -448,12 +465,14 @@
   }
 
   private void waitUntilReplicationTask(String status, int expected) throws Exception {
-    waitUntil(
-        () ->
-            inMemoryMetrics()
-                .counterValue("tasks/" + status, TEST_REPLICATION_REMOTE)
-                .filter(counter -> counter == expected)
-                .isPresent());
+    if (isAsyncReplication()) {
+      waitUntil(
+          () ->
+              inMemoryMetrics()
+                  .counterValue("tasks/" + status, TEST_REPLICATION_REMOTE)
+                  .filter(counter -> counter == expected)
+                  .isPresent());
+    }
   }
 
   private InMemoryMetricMaker inMemoryMetrics() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
index 156481b..d4cf52f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -91,6 +91,6 @@
     assertThrows(
         RemoteConfigurationMissingException.class,
         () -> objectUnderTest.fetchSync(projectName, "unknownLabel", REF_NAME_TO_FETCH));
-    verify(fetchStateLog, times(1)).error(anyString(), eq(state));
+    verify(fetchStateLog, times(1)).error(anyString(), eq(null));
   }
 }