Add per-project shared lock to avoid fetch collisions
Introduce a singleton ProjectsLock to coordinate fetches across sources
on a per-project basis without blocking. FetchOne now tries to acquire a
non-blocking lock for the project before proceeding; if the lock is busy
we reschedule the task to avoid colliding with an in-flight fetch. When
acquired, the lock is held via a try-with- resources token and we still
honor the async runway check.
Previous changes (I3c3d7b97, I7883e680) attempted to centralize
coordination with a shared projects queue, but it dropped the fetch
origin, which led to fetch events being lost. The new lock-based
approach keeps the source context intact and guarantees at most one
concurrent fetch per project across sources.
Bug: Issue 437805590
Change-Id: I2ce2f5a8532a7282f03f245f3670e8c318de0a61
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index d60c2b7..76ecd80 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -113,6 +113,7 @@
private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
private boolean succeeded;
private Map<String, AutoCloseable> fetchLocks;
+ private final ProjectsLock projectsLock;
@Inject
FetchOne(
@@ -126,6 +127,7 @@
FetchFactory fetchFactory,
FetchRefsDatabase fetchRefsDatabase,
DeleteRefCommand deleteRefCommand,
+ ProjectsLock projectsLock,
@Assisted Project.NameKey d,
@Assisted URIish u,
@Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
@@ -148,6 +150,7 @@
this.fetchFactory = fetchFactory;
maxRetries = s.getMaxRetries();
this.apiRequestMetrics = apiRequestMetrics;
+ this.projectsLock = projectsLock;
}
@Inject(optional = true)
@@ -350,126 +353,142 @@
// created and scheduled for a future point in time.)
//
isCollision = false;
- if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
- if (!canceled) {
- repLog.info(
- "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch task"
- + " [{}].",
- taskIdHex,
- uri,
- pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
- pool.reschedule(this, Source.RetryReason.COLLISION);
- isCollision = true;
+
+ try (ProjectsLock.LockToken unused = projectsLock.tryLock(projectName, getTaskIdHex())) {
+ if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
+ if (!canceled) {
+ repLog.info(
+ "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch"
+ + " task [{}].",
+ taskIdHex,
+ uri,
+ pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
+ pool.reschedule(this, Source.RetryReason.COLLISION);
+ isCollision = true;
+ }
+ return;
}
- return;
- }
- repLog.info(
- "[{}] {} replication from {} started for refs [{}] ...",
- taskIdHex,
- replicationType,
- uri,
- getRefSpecs());
- Timer1.Context<String> context = metrics.start(config.getName());
- try {
- long startedAt = context.getStartTime();
- long delay = NANOSECONDS.toMillis(startedAt - createdAt);
- git = gitManager.openRepository(projectName);
- List<FetchRefSpec> fetchRefSpecs = runImpl();
+ repLog.info(
+ "[{}] {} replication from {} started for refs [{}] ...",
+ taskIdHex,
+ replicationType,
+ uri,
+ getRefSpecs());
+ Timer1.Context<String> context = metrics.start(config.getName());
+ try {
+ long startedAt = context.getStartTime();
+ long delay = NANOSECONDS.toMillis(startedAt - createdAt);
+ git = gitManager.openRepository(projectName);
+ List<FetchRefSpec> fetchRefSpecs = runImpl();
- if (fetchRefSpecs.isEmpty()) {
- repLog.info(
- "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}"
- + " retries",
- taskIdHex,
- replicationType,
- uri,
- delay,
- retryCount);
- } else {
- metrics.record(config.getName(), delay, retryCount);
- long elapsed = NANOSECONDS.toMillis(context.stop());
- Optional<Long> elapsedEnd2End =
- apiRequestMetrics
- .flatMap(metrics -> metrics.stop(config.getName()))
- .map(NANOSECONDS::toMillis);
- repLog.info(
- "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
- taskIdHex,
- uri,
- elapsed,
- delay,
- retryCount,
- elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
- }
- } catch (RepositoryNotFoundException e) {
- stateLog.error(
- "["
- + taskIdHex
- + "] Cannot replicate "
- + projectName
- + "; Local repository error: "
- + e.getMessage(),
- getStatesAsArray());
+ if (fetchRefSpecs.isEmpty()) {
+ repLog.info(
+ "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}"
+ + " retries",
+ taskIdHex,
+ replicationType,
+ uri,
+ delay,
+ retryCount);
+ } else {
+ metrics.record(config.getName(), delay, retryCount);
+ long elapsed = NANOSECONDS.toMillis(context.stop());
+ Optional<Long> elapsedEnd2End =
+ apiRequestMetrics
+ .flatMap(metrics -> metrics.stop(config.getName()))
+ .map(NANOSECONDS::toMillis);
+ repLog.info(
+ "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
+ taskIdHex,
+ uri,
+ elapsed,
+ delay,
+ retryCount,
+ elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
+ }
+ } catch (RepositoryNotFoundException e) {
+ stateLog.error(
+ "["
+ + taskIdHex
+ + "] Cannot replicate "
+ + projectName
+ + "; Local repository error: "
+ + e.getMessage(),
+ getStatesAsArray());
- } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
- // Tried to replicate to a remote via anonymous git:// but the repository
- // does not exist. In this case NoRemoteRepositoryException is not
- // raised.
- String msg = e.getMessage();
- repLog.error(
- "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
- } catch (NotSupportedException e) {
- stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
- } catch (PermanentTransportException e) {
- repLog.error(
- String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
- } catch (TransportException e) {
- repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e);
- if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
- lockRetryCount++;
- // The LockFailureException message contains both URI and reason
- // for this failure.
+ } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
+ // Tried to replicate to a remote via anonymous git:// but the repository
+ // does not exist. In this case NoRemoteRepositoryException is not
+ // raised.
+ String msg = e.getMessage();
+ repLog.error(
+ "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
+ } catch (NotSupportedException e) {
+ stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
+ } catch (PermanentTransportException e) {
+ repLog.error(
+ String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
+ } catch (TransportException e) {
+ repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e);
+ if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
+ lockRetryCount++;
+ // The LockFailureException message contains both URI and reason
+ // for this failure.
- // The remote fetch operation should be retried.
- if (lockRetryCount <= maxLockRetries) {
+ // The remote fetch operation should be retried.
+ if (lockRetryCount <= maxLockRetries) {
+ if (canceledWhileRunning.get()) {
+ logCanceledWhileRunningException(e);
+ } else {
+ pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+ }
+ } else {
+ repLog.error(
+ "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]"
+ + " {}",
+ taskIdHex,
+ lockRetryCount,
+ e.getMessage(),
+ taskIdHex,
+ uri);
+ }
+ } else if (replicationType == ReplicationType.ASYNC) {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
+ // The remote fetch operation should be retried.
pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
}
- } else {
- repLog.error(
- "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]"
- + " {}",
- taskIdHex,
- lockRetryCount,
- e.getMessage(),
- taskIdHex,
- uri);
}
- } else if (replicationType == ReplicationType.ASYNC) {
- if (canceledWhileRunning.get()) {
- logCanceledWhileRunningException(e);
- } else {
- // The remote fetch operation should be retried.
- pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+ } catch (IOException e) {
+ stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
+ } catch (RuntimeException | Error e) {
+ stateLog.error(
+ "[" + taskIdHex + "] Unexpected error during replication from " + uri,
+ e,
+ getStatesAsArray());
+ } finally {
+ if (git != null) {
+ git.close();
}
- }
- } catch (IOException e) {
- stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
- } catch (RuntimeException | Error e) {
- stateLog.error(
- "[" + taskIdHex + "] Unexpected error during replication from " + uri,
- e,
- getStatesAsArray());
- } finally {
- if (git != null) {
- git.close();
- }
- if (replicationType == ReplicationType.ASYNC) {
- pool.notifyFinished(this);
+ if (replicationType == ReplicationType.ASYNC) {
+ pool.notifyFinished(this);
+ }
+ }
+ } catch (ProjectsLock.UnableToLockProjectException e) {
+ pool.fetchWasCanceled(this);
+ if (!canceled) {
+ repLog.info(
+ "[{}] Project {} is locked. Rescheduling replication from {} to avoid collision with an"
+ + " in-flight fetch task [{}].",
+ taskIdHex,
+ projectName,
+ uri,
+ e.getConflictingTaskId());
+ pool.reschedule(this, Source.RetryReason.COLLISION);
+ isCollision = true;
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
new file mode 100644
index 0000000..bfcc396
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class ProjectsLock {
+ public static final String EMPTY_TASK_ID = "";
+ private final Map<Project.NameKey, AtomicReference<String>> projectLocksHeldByTasks =
+ new ConcurrentHashMap<>();
+
+ LockToken tryLock(Project.NameKey project, String taskId) throws UnableToLockProjectException {
+ if (!getProjectAtomicReference(project).compareAndSet(EMPTY_TASK_ID, taskId)) {
+ throw new UnableToLockProjectException(project, projectLocksHeldByTasks.get(project).get());
+ }
+ return new LockToken(this, project, taskId);
+ }
+
+ boolean unlock(Project.NameKey project, String taskId) {
+ return getProjectAtomicReference(project).compareAndSet(taskId, EMPTY_TASK_ID);
+ }
+
+ private AtomicReference<String> getProjectAtomicReference(Project.NameKey project) {
+ return projectLocksHeldByTasks.computeIfAbsent(
+ project, k -> new AtomicReference<>(EMPTY_TASK_ID));
+ }
+
+ static final class LockToken implements AutoCloseable {
+ final Project.NameKey project;
+ final ProjectsLock projectsLock;
+ private final String taskId;
+ boolean closed;
+
+ private LockToken(ProjectsLock projectsLock, Project.NameKey project, String taskId) {
+ this.project = project;
+ this.projectsLock = projectsLock;
+ this.taskId = taskId;
+ }
+
+ @Override
+ public void close() {
+ if (!closed && projectsLock.unlock(project, taskId)) {
+ closed = true;
+ }
+ }
+ }
+
+ static class UnableToLockProjectException extends Exception {
+ private final String conflictingTaskId;
+
+ public UnableToLockProjectException(Project.NameKey project, String conflictingTaskId) {
+ super(
+ "Unable to lock project "
+ + project
+ + " because it is already locked by task "
+ + conflictingTaskId);
+
+ this.conflictingTaskId = conflictingTaskId;
+ }
+
+ public String getConflictingTaskId() {
+ return conflictingTaskId;
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 251f39f..4f32f26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -89,6 +89,7 @@
.implement(HttpClient.class, SourceHttpClient.class)
.build(SourceHttpClient.Factory.class));
+ bind(ProjectsLock.class).in(Scopes.SINGLETON);
install(new FactoryModuleBuilder().build(Source.Factory.class));
install(
new FactoryModuleBuilder()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 1104ff4..50d91ff 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -130,6 +130,7 @@
fetchFactory,
fetchRefsDatabase,
deleteRefCommand,
+ new ProjectsLock(),
PROJECT_NAME,
urIish,
Optional.of(pullReplicationApiRequestMetrics));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
new file mode 100644
index 0000000..89d1295
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
@@ -0,0 +1,65 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.gerrit.entities.Project;
+import org.junit.Test;
+
+public class ProjectsLockTest {
+ private final ProjectsLock projectsLock = new ProjectsLock();
+ private final Project.NameKey project1 = Project.nameKey("project1");
+ private final Project.NameKey project2 = Project.nameKey("project2");
+ private final String task1 = "task1";
+ private final String task2 = "task2";
+
+ @Test
+ public void shouldSuccessfullyLockAndUnlockProject() throws Exception {
+ ProjectsLock.LockToken lockToken = projectsLock.tryLock(project1, task1);
+ assertThatIsLocked(lockToken, project1);
+ assertThat(projectsLock.unlock(project1, task1)).isTrue();
+
+ try (ProjectsLock.LockToken newLockToken = projectsLock.tryLock(project1, task2)) {
+ assertThatIsLocked(newLockToken, project1);
+ }
+ }
+
+ @Test
+ public void shouldFailToLockFromAnotherTaskOnLockedProject() throws Exception {
+ try (ProjectsLock.LockToken unused = projectsLock.tryLock(project1, task1)) {
+ ProjectsLock.UnableToLockProjectException e =
+ assertThrows(
+ ProjectsLock.UnableToLockProjectException.class,
+ () -> projectsLock.tryLock(project1, task2).close());
+ assertThat(e.getConflictingTaskId()).isEqualTo(task1);
+ }
+ }
+
+ @Test
+ public void shouldHandleMultipleProjectsIndependently() throws Exception {
+ try (ProjectsLock.LockToken lockToken1 = projectsLock.tryLock(project1, task1);
+ ProjectsLock.LockToken lockToken2 = projectsLock.tryLock(project2, task2)) {
+ assertThatIsLocked(lockToken1, project1);
+ assertThatIsLocked(lockToken2, project2);
+ }
+ }
+
+ private void assertThatIsLocked(ProjectsLock.LockToken lockToken, Project.NameKey project1) {
+ assertThat(lockToken).isNotNull();
+ assertThat(lockToken.project).isEqualTo(project1);
+ }
+}