Parallelise REST API fetch calls

Allow configure number of threads for fetch calls. Fetch calls can be an
expensive operation because every fetch call is a blocking operation
and is waiting for git fetch to finish. Running fetch calls one by one
will multiply the time of a fetch by the number of remote destinations.
This have significant impacting one the overall performance. By allowing
parallel run of fetch calls we can reduce the overall time spent on
fetching and improve the performance.

Feature: Issue 13340
Change-Id: I5c961af2047365f330a6cad229d9870c533fc0ed
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 2158a59..70e5342 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -32,6 +32,10 @@
 import java.util.HashSet;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
@@ -43,6 +47,7 @@
   static final String PULL_REPLICATION_LOG_NAME = "pull_replication_log";
   static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
 
+  private static final Integer DEFAULT_FETCH_CALLS_TIMEOUT = 0;
   private final ReplicationStateListener stateLog;
 
   private final WorkQueue workQueue;
@@ -52,6 +57,7 @@
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
   private FetchRestApiClient.Factory fetchClientFactory;
+  private Integer fetchCallsTimeout;
 
   @Inject
   ReplicationQueue(
@@ -72,6 +78,13 @@
   public void start() {
     if (!running) {
       sources.get().startup(workQueue);
+      fetchCallsTimeout =
+          2
+              * sources.get().getAll().stream()
+                  .mapToInt(Source::getConnectionTimeout)
+                  .max()
+                  .orElse(DEFAULT_FETCH_CALLS_TIMEOUT);
+
       running = true;
       fireBeforeStartupEvents();
     }
@@ -116,33 +129,61 @@
       beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName, objectId));
       return;
     }
+    ForkJoinPool fetchCallsPool = null;
+    try {
+      fetchCallsPool = new ForkJoinPool(sources.get().getAll().size());
+      fetchCallsPool
+          .submit(
+              () ->
+                  sources
+                      .get()
+                      .getAll()
+                      .parallelStream()
+                      .forEach(
+                          source -> {
+                            callFetch(source, project, refName, state);
+                          }))
+          .get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      stateLog.error(
+          String.format(
+              "Exception during the pull replication fetch rest api call.  Message:%s",
+              e.getMessage()),
+          e,
+          state);
+    } finally {
+      if (fetchCallsPool != null) {
+        fetchCallsPool.shutdown();
+      }
+    }
+  }
 
-    for (Source cfg : sources.get().getAll()) {
-      if (cfg.wouldFetchProject(project) && cfg.wouldFetchRef(refName)) {
-        for (String apiUrl : cfg.getApis()) {
-          try {
-            URIish uri = new URIish(apiUrl);
-            FetchRestApiClient fetchClient = fetchClientFactory.create(cfg);
+  private void callFetch(
+      Source source, Project.NameKey project, String refName, ReplicationState state) {
+    if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
+      for (String apiUrl : source.getApis()) {
+        try {
+          URIish uri = new URIish(apiUrl);
+          FetchRestApiClient fetchClient = fetchClientFactory.create(source);
 
-            HttpResult result = fetchClient.callFetch(project, refName, uri);
+          HttpResult result = fetchClient.callFetch(project, refName, uri);
 
-            if (!result.isSuccessful()) {
-              stateLog.warn(
-                  String.format(
-                      "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s",
-                      apiUrl, result.getMessage().orElse("unknown")),
-                  state);
-            }
-          } catch (URISyntaxException e) {
-            stateLog.warn(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
-          } catch (Exception e) {
-            stateLog.error(
+          if (!result.isSuccessful()) {
+            stateLog.warn(
                 String.format(
-                    "Exception during the pull replication fetch rest api call. Endpoint url:%s, message:%s",
-                    apiUrl, e.getMessage()),
-                e,
+                    "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s",
+                    apiUrl, result.getMessage().orElse("unknown")),
                 state);
           }
+        } catch (URISyntaxException e) {
+          stateLog.warn(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
+        } catch (Exception e) {
+          stateLog.error(
+              String.format(
+                  "Exception during the pull replication fetch rest api call. Endpoint url:%s, message:%s",
+                  apiUrl, e.getMessage()),
+              e,
+              state);
         }
       }
     }