Revert "Prevent concurrent fetches on the same repository"
This reverts commit eebcfdd1c3ece403247a37f5a766cb7facdb94c0.
Reason for revert: This would break replication consistency when having a single source with multiple URIs
Bug: Issue 437805590
Change-Id: I499244441b01ddee42d61655b908576b3af316e4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index d5dd6e0..d60c2b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -357,7 +357,7 @@
+ " [{}].",
taskIdHex,
uri,
- pool.getInFlight(projectName).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
+ pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
pool.reschedule(this, Source.RetryReason.COLLISION);
isCollision = true;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 097f058..0a75288 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -110,8 +110,8 @@
private final ReplicationStateListener stateLog;
private final UpdateHeadTask.Factory updateHeadFactory;
private final Object stateLock = new Object();
- private final Map<Project.NameKey, FetchOne> pending = new HashMap<>();
- private final Map<Project.NameKey, FetchOne> inFlight = new HashMap<>();
+ private final Map<URIish, FetchOne> pending = new HashMap<>();
+ private final Map<URIish, FetchOne> inFlight = new HashMap<>();
private final FetchOne.Factory opFactory;
private final GitRepositoryManager gitManager;
private final PermissionBackend permissionBackend;
@@ -134,11 +134,10 @@
}
public static class QueueInfo {
- public final Map<Project.NameKey, FetchOne> pending;
- public final Map<Project.NameKey, FetchOne> inFlight;
+ public final Map<URIish, FetchOne> pending;
+ public final Map<URIish, FetchOne> inFlight;
- public QueueInfo(
- Map<Project.NameKey, FetchOne> pending, Map<Project.NameKey, FetchOne> inFlight) {
+ public QueueInfo(Map<URIish, FetchOne> pending, Map<URIish, FetchOne> inFlight) {
this.pending = ImmutableMap.copyOf(pending);
this.inFlight = ImmutableMap.copyOf(inFlight);
}
@@ -494,7 +493,7 @@
if (!config.replicatePermissions()) {
FetchOne e;
synchronized (stateLock) {
- e = pending.get(project);
+ e = pending.get(uri);
}
if (e == null) {
try (Repository git = gitManager.openRepository(project)) {
@@ -520,13 +519,13 @@
}
synchronized (stateLock) {
- FetchOne e = pending.get(project);
+ FetchOne e = pending.get(uri);
Future<?> f = CompletableFuture.completedFuture(null);
if (e == null || e.isRetrying()) {
e = opFactory.create(project, uri, apiRequestMetrics);
addRef(e, refSpec);
e.addState(refSpec, state);
- pending.put(project, e);
+ pending.put(uri, e);
f =
pool.schedule(
queueMetrics.runWithMetrics(this, e),
@@ -579,7 +578,8 @@
void fetchWasCanceled(FetchOne fetchOp) {
synchronized (stateLock) {
- pending.remove(fetchOp.getProjectNameKey());
+ URIish uri = fetchOp.getURI();
+ pending.remove(uri);
queueMetrics.incrementTaskCancelled(this);
}
}
@@ -612,11 +612,11 @@
*/
void reschedule(FetchOne fetchOp, RetryReason reason) {
synchronized (stateLock) {
- Project.NameKey projectName = fetchOp.getProjectNameKey();
- FetchOne pendingFetchOp = pending.get(projectName);
+ URIish uri = fetchOp.getURI();
+ FetchOne pendingFetchOp = pending.get(uri);
if (pendingFetchOp != null) {
- // There is one FetchOp instance already pending for the same project.
+ // There is one FetchOp instance already pending to same URI.
if (pendingFetchOp.isRetrying()) {
// The one pending is one already retrying, so it should
@@ -625,10 +625,10 @@
// This scenario would happen if a FetchOp has started running
// and then before it failed due transport exception, another
- // one to same project started. The first one would fail and would
+ // one to same URI started. The first one would fail and would
// be rescheduled, being present in pending list. When the
// second one fails, it will also be rescheduled and then,
- // here, find out replication to the project is already pending
+ // here, find out replication to its URI is already pending
// for retry (blocking).
pendingFetchOp.addRefs(fetchOp.getRefSpecs());
pendingFetchOp.addStates(fetchOp.getStates());
@@ -654,7 +654,7 @@
// it will see it was canceled and then it will do nothing with
// pending list and it will not execute its run implementation.
pendingFetchOp.canceledByReplication();
- pending.remove(projectName);
+ pending.remove(uri);
Set<FetchRefSpec> fetchOpRefSpecs = fetchOp.getRefSpecs();
fetchOp.addRefs(pendingFetchOp.getRefSpecs());
@@ -685,7 +685,7 @@
}
if (pendingFetchOp == null || !pendingFetchOp.isRetrying()) {
- pending.put(projectName, fetchOp);
+ pending.put(uri, fetchOp);
switch (reason) {
case COLLISION:
queueMetrics.incrementTaskRescheduled(this);
@@ -712,7 +712,7 @@
queueMetrics.incrementTaskRetrying(this);
} else {
fetchOp.canceledByReplication();
- pending.remove(projectName);
+ pending.remove(uri);
stateLog.error(
"Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
fetchOp.getStatesAsArray());
@@ -729,22 +729,22 @@
if (op.wasCanceled()) {
return false;
}
- pending.remove(op.getProjectNameKey());
- if (inFlight.containsKey(op.getProjectNameKey())) {
+ pending.remove(op.getURI());
+ if (inFlight.containsKey(op.getURI())) {
return false;
}
- inFlight.put(op.getProjectNameKey(), op);
+ inFlight.put(op.getURI(), op);
}
return true;
}
- Optional<FetchOne> getInFlight(Project.NameKey projectName) {
- return Optional.ofNullable(inFlight.get(projectName));
+ Optional<FetchOne> getInFlight(URIish uri) {
+ return Optional.ofNullable(inFlight.get(uri));
}
void notifyFinished(FetchOne op) {
synchronized (stateLock) {
- inFlight.remove(op.getProjectNameKey());
+ inFlight.remove(op.getURI());
}
Set<TransportException> fetchFailures = op.getFetchFailures();