Revert "Prevent concurrent fetches on the same repository"

This reverts commit eebcfdd1c3ece403247a37f5a766cb7facdb94c0.

Reason for revert: This would break replication consistency when having a single source with multiple URIs

Bug: Issue 437805590
Change-Id: I499244441b01ddee42d61655b908576b3af316e4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index d5dd6e0..d60c2b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -357,7 +357,7 @@
                 + " [{}].",
             taskIdHex,
             uri,
-            pool.getInFlight(projectName).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
+            pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
         pool.reschedule(this, Source.RetryReason.COLLISION);
         isCollision = true;
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 097f058..0a75288 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -110,8 +110,8 @@
   private final ReplicationStateListener stateLog;
   private final UpdateHeadTask.Factory updateHeadFactory;
   private final Object stateLock = new Object();
-  private final Map<Project.NameKey, FetchOne> pending = new HashMap<>();
-  private final Map<Project.NameKey, FetchOne> inFlight = new HashMap<>();
+  private final Map<URIish, FetchOne> pending = new HashMap<>();
+  private final Map<URIish, FetchOne> inFlight = new HashMap<>();
   private final FetchOne.Factory opFactory;
   private final GitRepositoryManager gitManager;
   private final PermissionBackend permissionBackend;
@@ -134,11 +134,10 @@
   }
 
   public static class QueueInfo {
-    public final Map<Project.NameKey, FetchOne> pending;
-    public final Map<Project.NameKey, FetchOne> inFlight;
+    public final Map<URIish, FetchOne> pending;
+    public final Map<URIish, FetchOne> inFlight;
 
-    public QueueInfo(
-        Map<Project.NameKey, FetchOne> pending, Map<Project.NameKey, FetchOne> inFlight) {
+    public QueueInfo(Map<URIish, FetchOne> pending, Map<URIish, FetchOne> inFlight) {
       this.pending = ImmutableMap.copyOf(pending);
       this.inFlight = ImmutableMap.copyOf(inFlight);
     }
@@ -494,7 +493,7 @@
     if (!config.replicatePermissions()) {
       FetchOne e;
       synchronized (stateLock) {
-        e = pending.get(project);
+        e = pending.get(uri);
       }
       if (e == null) {
         try (Repository git = gitManager.openRepository(project)) {
@@ -520,13 +519,13 @@
     }
 
     synchronized (stateLock) {
-      FetchOne e = pending.get(project);
+      FetchOne e = pending.get(uri);
       Future<?> f = CompletableFuture.completedFuture(null);
       if (e == null || e.isRetrying()) {
         e = opFactory.create(project, uri, apiRequestMetrics);
         addRef(e, refSpec);
         e.addState(refSpec, state);
-        pending.put(project, e);
+        pending.put(uri, e);
         f =
             pool.schedule(
                 queueMetrics.runWithMetrics(this, e),
@@ -579,7 +578,8 @@
 
   void fetchWasCanceled(FetchOne fetchOp) {
     synchronized (stateLock) {
-      pending.remove(fetchOp.getProjectNameKey());
+      URIish uri = fetchOp.getURI();
+      pending.remove(uri);
       queueMetrics.incrementTaskCancelled(this);
     }
   }
@@ -612,11 +612,11 @@
    */
   void reschedule(FetchOne fetchOp, RetryReason reason) {
     synchronized (stateLock) {
-      Project.NameKey projectName = fetchOp.getProjectNameKey();
-      FetchOne pendingFetchOp = pending.get(projectName);
+      URIish uri = fetchOp.getURI();
+      FetchOne pendingFetchOp = pending.get(uri);
 
       if (pendingFetchOp != null) {
-        // There is one FetchOp instance already pending for the same project.
+        // There is one FetchOp instance already pending to same URI.
 
         if (pendingFetchOp.isRetrying()) {
           // The one pending is one already retrying, so it should
@@ -625,10 +625,10 @@
 
           // This scenario would happen if a FetchOp has started running
           // and then before it failed due transport exception, another
-          // one to same project started. The first one would fail and would
+          // one to same URI started. The first one would fail and would
           // be rescheduled, being present in pending list. When the
           // second one fails, it will also be rescheduled and then,
-          // here, find out replication to the project is already pending
+          // here, find out replication to its URI is already pending
           // for retry (blocking).
           pendingFetchOp.addRefs(fetchOp.getRefSpecs());
           pendingFetchOp.addStates(fetchOp.getStates());
@@ -654,7 +654,7 @@
           // it will see it was canceled and then it will do nothing with
           // pending list and it will not execute its run implementation.
           pendingFetchOp.canceledByReplication();
-          pending.remove(projectName);
+          pending.remove(uri);
 
           Set<FetchRefSpec> fetchOpRefSpecs = fetchOp.getRefSpecs();
           fetchOp.addRefs(pendingFetchOp.getRefSpecs());
@@ -685,7 +685,7 @@
       }
 
       if (pendingFetchOp == null || !pendingFetchOp.isRetrying()) {
-        pending.put(projectName, fetchOp);
+        pending.put(uri, fetchOp);
         switch (reason) {
           case COLLISION:
             queueMetrics.incrementTaskRescheduled(this);
@@ -712,7 +712,7 @@
               queueMetrics.incrementTaskRetrying(this);
             } else {
               fetchOp.canceledByReplication();
-              pending.remove(projectName);
+              pending.remove(uri);
               stateLog.error(
                   "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
                   fetchOp.getStatesAsArray());
@@ -729,22 +729,22 @@
       if (op.wasCanceled()) {
         return false;
       }
-      pending.remove(op.getProjectNameKey());
-      if (inFlight.containsKey(op.getProjectNameKey())) {
+      pending.remove(op.getURI());
+      if (inFlight.containsKey(op.getURI())) {
         return false;
       }
-      inFlight.put(op.getProjectNameKey(), op);
+      inFlight.put(op.getURI(), op);
     }
     return true;
   }
 
-  Optional<FetchOne> getInFlight(Project.NameKey projectName) {
-    return Optional.ofNullable(inFlight.get(projectName));
+  Optional<FetchOne> getInFlight(URIish uri) {
+    return Optional.ofNullable(inFlight.get(uri));
   }
 
   void notifyFinished(FetchOne op) {
     synchronized (stateLock) {
-      inFlight.remove(op.getProjectNameKey());
+      inFlight.remove(op.getURI());
     }
 
     Set<TransportException> fetchFailures = op.getFetchFailures();