Prevent concurrent fetches on the same repository

Running multiple fetches concurrently on the same repository
is unnecessary and often counterproductive, as one or more of
them are likely to fail.

To avoid this, ensure that only one fetch is executed at a time
per repository.

Bug: Issue 437805590
Change-Id: I7883e680e018705fcc0d307356b1c541fe8ecc93
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 4b3749d..11f78fb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -357,7 +357,7 @@
                 + " [{}].",
             taskIdHex,
             uri,
-            pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
+            pool.getInFlight(projectName).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
         pool.reschedule(this, Source.RetryReason.COLLISION);
         isCollision = true;
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0a75288..097f058 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -110,8 +110,8 @@
   private final ReplicationStateListener stateLog;
   private final UpdateHeadTask.Factory updateHeadFactory;
   private final Object stateLock = new Object();
-  private final Map<URIish, FetchOne> pending = new HashMap<>();
-  private final Map<URIish, FetchOne> inFlight = new HashMap<>();
+  private final Map<Project.NameKey, FetchOne> pending = new HashMap<>();
+  private final Map<Project.NameKey, FetchOne> inFlight = new HashMap<>();
   private final FetchOne.Factory opFactory;
   private final GitRepositoryManager gitManager;
   private final PermissionBackend permissionBackend;
@@ -134,10 +134,11 @@
   }
 
   public static class QueueInfo {
-    public final Map<URIish, FetchOne> pending;
-    public final Map<URIish, FetchOne> inFlight;
+    public final Map<Project.NameKey, FetchOne> pending;
+    public final Map<Project.NameKey, FetchOne> inFlight;
 
-    public QueueInfo(Map<URIish, FetchOne> pending, Map<URIish, FetchOne> inFlight) {
+    public QueueInfo(
+        Map<Project.NameKey, FetchOne> pending, Map<Project.NameKey, FetchOne> inFlight) {
       this.pending = ImmutableMap.copyOf(pending);
       this.inFlight = ImmutableMap.copyOf(inFlight);
     }
@@ -493,7 +494,7 @@
     if (!config.replicatePermissions()) {
       FetchOne e;
       synchronized (stateLock) {
-        e = pending.get(uri);
+        e = pending.get(project);
       }
       if (e == null) {
         try (Repository git = gitManager.openRepository(project)) {
@@ -519,13 +520,13 @@
     }
 
     synchronized (stateLock) {
-      FetchOne e = pending.get(uri);
+      FetchOne e = pending.get(project);
       Future<?> f = CompletableFuture.completedFuture(null);
       if (e == null || e.isRetrying()) {
         e = opFactory.create(project, uri, apiRequestMetrics);
         addRef(e, refSpec);
         e.addState(refSpec, state);
-        pending.put(uri, e);
+        pending.put(project, e);
         f =
             pool.schedule(
                 queueMetrics.runWithMetrics(this, e),
@@ -578,8 +579,7 @@
 
   void fetchWasCanceled(FetchOne fetchOp) {
     synchronized (stateLock) {
-      URIish uri = fetchOp.getURI();
-      pending.remove(uri);
+      pending.remove(fetchOp.getProjectNameKey());
       queueMetrics.incrementTaskCancelled(this);
     }
   }
@@ -612,11 +612,11 @@
    */
   void reschedule(FetchOne fetchOp, RetryReason reason) {
     synchronized (stateLock) {
-      URIish uri = fetchOp.getURI();
-      FetchOne pendingFetchOp = pending.get(uri);
+      Project.NameKey projectName = fetchOp.getProjectNameKey();
+      FetchOne pendingFetchOp = pending.get(projectName);
 
       if (pendingFetchOp != null) {
-        // There is one FetchOp instance already pending to same URI.
+        // There is one FetchOp instance already pending for the same project.
 
         if (pendingFetchOp.isRetrying()) {
           // The one pending is one already retrying, so it should
@@ -625,10 +625,10 @@
 
           // This scenario would happen if a FetchOp has started running
           // and then before it failed due transport exception, another
-          // one to same URI started. The first one would fail and would
+          // one to same project started. The first one would fail and would
           // be rescheduled, being present in pending list. When the
           // second one fails, it will also be rescheduled and then,
-          // here, find out replication to its URI is already pending
+          // here, find out replication to the project is already pending
           // for retry (blocking).
           pendingFetchOp.addRefs(fetchOp.getRefSpecs());
           pendingFetchOp.addStates(fetchOp.getStates());
@@ -654,7 +654,7 @@
           // it will see it was canceled and then it will do nothing with
           // pending list and it will not execute its run implementation.
           pendingFetchOp.canceledByReplication();
-          pending.remove(uri);
+          pending.remove(projectName);
 
           Set<FetchRefSpec> fetchOpRefSpecs = fetchOp.getRefSpecs();
           fetchOp.addRefs(pendingFetchOp.getRefSpecs());
@@ -685,7 +685,7 @@
       }
 
       if (pendingFetchOp == null || !pendingFetchOp.isRetrying()) {
-        pending.put(uri, fetchOp);
+        pending.put(projectName, fetchOp);
         switch (reason) {
           case COLLISION:
             queueMetrics.incrementTaskRescheduled(this);
@@ -712,7 +712,7 @@
               queueMetrics.incrementTaskRetrying(this);
             } else {
               fetchOp.canceledByReplication();
-              pending.remove(uri);
+              pending.remove(projectName);
               stateLog.error(
                   "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
                   fetchOp.getStatesAsArray());
@@ -729,22 +729,22 @@
       if (op.wasCanceled()) {
         return false;
       }
-      pending.remove(op.getURI());
-      if (inFlight.containsKey(op.getURI())) {
+      pending.remove(op.getProjectNameKey());
+      if (inFlight.containsKey(op.getProjectNameKey())) {
         return false;
       }
-      inFlight.put(op.getURI(), op);
+      inFlight.put(op.getProjectNameKey(), op);
     }
     return true;
   }
 
-  Optional<FetchOne> getInFlight(URIish uri) {
-    return Optional.ofNullable(inFlight.get(uri));
+  Optional<FetchOne> getInFlight(Project.NameKey projectName) {
+    return Optional.ofNullable(inFlight.get(projectName));
   }
 
   void notifyFinished(FetchOne op) {
     synchronized (stateLock) {
-      inFlight.remove(op.getURI());
+      inFlight.remove(op.getProjectNameKey());
     }
 
     Set<TransportException> fetchFailures = op.getFetchFailures();