Revert "Prevent concurrent fetches on the same repository" This reverts commit eebcfdd1c3ece403247a37f5a766cb7facdb94c0. Reason for revert: This would break replication consistency when having a single source with multiple URIs Bug: Issue 437805590 Change-Id: I499244441b01ddee42d61655b908576b3af316e4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java index d5dd6e0..d60c2b7 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -357,7 +357,7 @@ + " [{}].", taskIdHex, uri, - pool.getInFlight(projectName).map(FetchOne::getTaskIdHex).orElse("<unknown>")); + pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>")); pool.reschedule(this, Source.RetryReason.COLLISION); isCollision = true; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java index 097f058..0a75288 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -110,8 +110,8 @@ private final ReplicationStateListener stateLog; private final UpdateHeadTask.Factory updateHeadFactory; private final Object stateLock = new Object(); - private final Map<Project.NameKey, FetchOne> pending = new HashMap<>(); - private final Map<Project.NameKey, FetchOne> inFlight = new HashMap<>(); + private final Map<URIish, FetchOne> pending = new HashMap<>(); + private final Map<URIish, FetchOne> inFlight = new HashMap<>(); private final FetchOne.Factory opFactory; private final GitRepositoryManager gitManager; private final PermissionBackend permissionBackend; @@ -134,11 +134,10 @@ } public static class QueueInfo { - public final Map<Project.NameKey, FetchOne> pending; - public final Map<Project.NameKey, FetchOne> inFlight; + public final Map<URIish, FetchOne> pending; + public final Map<URIish, FetchOne> inFlight; - public QueueInfo( - Map<Project.NameKey, FetchOne> pending, Map<Project.NameKey, FetchOne> inFlight) { + public QueueInfo(Map<URIish, FetchOne> pending, Map<URIish, FetchOne> inFlight) { this.pending = ImmutableMap.copyOf(pending); this.inFlight = ImmutableMap.copyOf(inFlight); } @@ -494,7 +493,7 @@ if (!config.replicatePermissions()) { FetchOne e; synchronized (stateLock) { - e = pending.get(project); + e = pending.get(uri); } if (e == null) { try (Repository git = gitManager.openRepository(project)) { @@ -520,13 +519,13 @@ } synchronized (stateLock) { - FetchOne e = pending.get(project); + FetchOne e = pending.get(uri); Future<?> f = CompletableFuture.completedFuture(null); if (e == null || e.isRetrying()) { e = opFactory.create(project, uri, apiRequestMetrics); addRef(e, refSpec); e.addState(refSpec, state); - pending.put(project, e); + pending.put(uri, e); f = pool.schedule( queueMetrics.runWithMetrics(this, e), @@ -579,7 +578,8 @@ void fetchWasCanceled(FetchOne fetchOp) { synchronized (stateLock) { - pending.remove(fetchOp.getProjectNameKey()); + URIish uri = fetchOp.getURI(); + pending.remove(uri); queueMetrics.incrementTaskCancelled(this); } } @@ -612,11 +612,11 @@ */ void reschedule(FetchOne fetchOp, RetryReason reason) { synchronized (stateLock) { - Project.NameKey projectName = fetchOp.getProjectNameKey(); - FetchOne pendingFetchOp = pending.get(projectName); + URIish uri = fetchOp.getURI(); + FetchOne pendingFetchOp = pending.get(uri); if (pendingFetchOp != null) { - // There is one FetchOp instance already pending for the same project. + // There is one FetchOp instance already pending to same URI. if (pendingFetchOp.isRetrying()) { // The one pending is one already retrying, so it should @@ -625,10 +625,10 @@ // This scenario would happen if a FetchOp has started running // and then before it failed due transport exception, another - // one to same project started. The first one would fail and would + // one to same URI started. The first one would fail and would // be rescheduled, being present in pending list. When the // second one fails, it will also be rescheduled and then, - // here, find out replication to the project is already pending + // here, find out replication to its URI is already pending // for retry (blocking). pendingFetchOp.addRefs(fetchOp.getRefSpecs()); pendingFetchOp.addStates(fetchOp.getStates()); @@ -654,7 +654,7 @@ // it will see it was canceled and then it will do nothing with // pending list and it will not execute its run implementation. pendingFetchOp.canceledByReplication(); - pending.remove(projectName); + pending.remove(uri); Set<FetchRefSpec> fetchOpRefSpecs = fetchOp.getRefSpecs(); fetchOp.addRefs(pendingFetchOp.getRefSpecs()); @@ -685,7 +685,7 @@ } if (pendingFetchOp == null || !pendingFetchOp.isRetrying()) { - pending.put(projectName, fetchOp); + pending.put(uri, fetchOp); switch (reason) { case COLLISION: queueMetrics.incrementTaskRescheduled(this); @@ -712,7 +712,7 @@ queueMetrics.incrementTaskRetrying(this); } else { fetchOp.canceledByReplication(); - pending.remove(projectName); + pending.remove(uri); stateLog.error( "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries", fetchOp.getStatesAsArray()); @@ -729,22 +729,22 @@ if (op.wasCanceled()) { return false; } - pending.remove(op.getProjectNameKey()); - if (inFlight.containsKey(op.getProjectNameKey())) { + pending.remove(op.getURI()); + if (inFlight.containsKey(op.getURI())) { return false; } - inFlight.put(op.getProjectNameKey(), op); + inFlight.put(op.getURI(), op); } return true; } - Optional<FetchOne> getInFlight(Project.NameKey projectName) { - return Optional.ofNullable(inFlight.get(projectName)); + Optional<FetchOne> getInFlight(URIish uri) { + return Optional.ofNullable(inFlight.get(uri)); } void notifyFinished(FetchOne op) { synchronized (stateLock) { - inFlight.remove(op.getProjectNameKey()); + inFlight.remove(op.getURI()); } Set<TransportException> fetchFailures = op.getFetchFailures();