Add per-project shared lock to avoid fetch collisions Introduce a singleton ProjectsLock to coordinate fetches across sources on a per-project basis without blocking. FetchOne now tries to acquire a non-blocking lock for the project before proceeding; if the lock is busy we reschedule the task to avoid colliding with an in-flight fetch. When acquired, the lock is held via a try-with- resources token and we still honor the async runway check. Previous changes (I3c3d7b97, I7883e680) attempted to centralize coordination with a shared projects queue, but it dropped the fetch origin, which led to fetch events being lost. The new lock-based approach keeps the source context intact and guarantees at most one concurrent fetch per project across sources. Bug: Issue 437805590 Change-Id: I2ce2f5a8532a7282f03f245f3670e8c318de0a61
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java index d60c2b7..76ecd80 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -113,6 +113,7 @@ private DynamicItem<ReplicationFetchFilter> replicationFetchFilter; private boolean succeeded; private Map<String, AutoCloseable> fetchLocks; + private final ProjectsLock projectsLock; @Inject FetchOne( @@ -126,6 +127,7 @@ FetchFactory fetchFactory, FetchRefsDatabase fetchRefsDatabase, DeleteRefCommand deleteRefCommand, + ProjectsLock projectsLock, @Assisted Project.NameKey d, @Assisted URIish u, @Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) { @@ -148,6 +150,7 @@ this.fetchFactory = fetchFactory; maxRetries = s.getMaxRetries(); this.apiRequestMetrics = apiRequestMetrics; + this.projectsLock = projectsLock; } @Inject(optional = true) @@ -350,126 +353,142 @@ // created and scheduled for a future point in time.) // isCollision = false; - if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) { - if (!canceled) { - repLog.info( - "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch task" - + " [{}].", - taskIdHex, - uri, - pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>")); - pool.reschedule(this, Source.RetryReason.COLLISION); - isCollision = true; + + try (ProjectsLock.LockToken unused = projectsLock.tryLock(projectName, getTaskIdHex())) { + if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) { + if (!canceled) { + repLog.info( + "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch" + + " task [{}].", + taskIdHex, + uri, + pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>")); + pool.reschedule(this, Source.RetryReason.COLLISION); + isCollision = true; + } + return; } - return; - } - repLog.info( - "[{}] {} replication from {} started for refs [{}] ...", - taskIdHex, - replicationType, - uri, - getRefSpecs()); - Timer1.Context<String> context = metrics.start(config.getName()); - try { - long startedAt = context.getStartTime(); - long delay = NANOSECONDS.toMillis(startedAt - createdAt); - git = gitManager.openRepository(projectName); - List<FetchRefSpec> fetchRefSpecs = runImpl(); + repLog.info( + "[{}] {} replication from {} started for refs [{}] ...", + taskIdHex, + replicationType, + uri, + getRefSpecs()); + Timer1.Context<String> context = metrics.start(config.getName()); + try { + long startedAt = context.getStartTime(); + long delay = NANOSECONDS.toMillis(startedAt - createdAt); + git = gitManager.openRepository(projectName); + List<FetchRefSpec> fetchRefSpecs = runImpl(); - if (fetchRefSpecs.isEmpty()) { - repLog.info( - "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}" - + " retries", - taskIdHex, - replicationType, - uri, - delay, - retryCount); - } else { - metrics.record(config.getName(), delay, retryCount); - long elapsed = NANOSECONDS.toMillis(context.stop()); - Optional<Long> elapsedEnd2End = - apiRequestMetrics - .flatMap(metrics -> metrics.stop(config.getName())) - .map(NANOSECONDS::toMillis); - repLog.info( - "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}", - taskIdHex, - uri, - elapsed, - delay, - retryCount, - elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse("")); - } - } catch (RepositoryNotFoundException e) { - stateLog.error( - "[" - + taskIdHex - + "] Cannot replicate " - + projectName - + "; Local repository error: " - + e.getMessage(), - getStatesAsArray()); + if (fetchRefSpecs.isEmpty()) { + repLog.info( + "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}" + + " retries", + taskIdHex, + replicationType, + uri, + delay, + retryCount); + } else { + metrics.record(config.getName(), delay, retryCount); + long elapsed = NANOSECONDS.toMillis(context.stop()); + Optional<Long> elapsedEnd2End = + apiRequestMetrics + .flatMap(metrics -> metrics.stop(config.getName())) + .map(NANOSECONDS::toMillis); + repLog.info( + "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}", + taskIdHex, + uri, + elapsed, + delay, + retryCount, + elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse("")); + } + } catch (RepositoryNotFoundException e) { + stateLog.error( + "[" + + taskIdHex + + "] Cannot replicate " + + projectName + + "; Local repository error: " + + e.getMessage(), + getStatesAsArray()); - } catch (NoRemoteRepositoryException | RemoteRepositoryException e) { - // Tried to replicate to a remote via anonymous git:// but the repository - // does not exist. In this case NoRemoteRepositoryException is not - // raised. - String msg = e.getMessage(); - repLog.error( - "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg); - } catch (NotSupportedException e) { - stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray()); - } catch (PermanentTransportException e) { - repLog.error( - String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e); - } catch (TransportException e) { - repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e); - if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) { - lockRetryCount++; - // The LockFailureException message contains both URI and reason - // for this failure. + } catch (NoRemoteRepositoryException | RemoteRepositoryException e) { + // Tried to replicate to a remote via anonymous git:// but the repository + // does not exist. In this case NoRemoteRepositoryException is not + // raised. + String msg = e.getMessage(); + repLog.error( + "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg); + } catch (NotSupportedException e) { + stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray()); + } catch (PermanentTransportException e) { + repLog.error( + String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e); + } catch (TransportException e) { + repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e); + if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) { + lockRetryCount++; + // The LockFailureException message contains both URI and reason + // for this failure. - // The remote fetch operation should be retried. - if (lockRetryCount <= maxLockRetries) { + // The remote fetch operation should be retried. + if (lockRetryCount <= maxLockRetries) { + if (canceledWhileRunning.get()) { + logCanceledWhileRunningException(e); + } else { + pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR); + } + } else { + repLog.error( + "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]" + + " {}", + taskIdHex, + lockRetryCount, + e.getMessage(), + taskIdHex, + uri); + } + } else if (replicationType == ReplicationType.ASYNC) { if (canceledWhileRunning.get()) { logCanceledWhileRunningException(e); } else { + // The remote fetch operation should be retried. pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR); } - } else { - repLog.error( - "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]" - + " {}", - taskIdHex, - lockRetryCount, - e.getMessage(), - taskIdHex, - uri); } - } else if (replicationType == ReplicationType.ASYNC) { - if (canceledWhileRunning.get()) { - logCanceledWhileRunningException(e); - } else { - // The remote fetch operation should be retried. - pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR); + } catch (IOException e) { + stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray()); + } catch (RuntimeException | Error e) { + stateLog.error( + "[" + taskIdHex + "] Unexpected error during replication from " + uri, + e, + getStatesAsArray()); + } finally { + if (git != null) { + git.close(); } - } - } catch (IOException e) { - stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray()); - } catch (RuntimeException | Error e) { - stateLog.error( - "[" + taskIdHex + "] Unexpected error during replication from " + uri, - e, - getStatesAsArray()); - } finally { - if (git != null) { - git.close(); - } - if (replicationType == ReplicationType.ASYNC) { - pool.notifyFinished(this); + if (replicationType == ReplicationType.ASYNC) { + pool.notifyFinished(this); + } + } + } catch (ProjectsLock.UnableToLockProjectException e) { + pool.fetchWasCanceled(this); + if (!canceled) { + repLog.info( + "[{}] Project {} is locked. Rescheduling replication from {} to avoid collision with an" + + " in-flight fetch task [{}].", + taskIdHex, + projectName, + uri, + e.getConflictingTaskId()); + pool.reschedule(this, Source.RetryReason.COLLISION); + isCollision = true; } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java new file mode 100644 index 0000000..bfcc396 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
@@ -0,0 +1,80 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import com.google.gerrit.entities.Project; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +class ProjectsLock { + public static final String EMPTY_TASK_ID = ""; + private final Map<Project.NameKey, AtomicReference<String>> projectLocksHeldByTasks = + new ConcurrentHashMap<>(); + + LockToken tryLock(Project.NameKey project, String taskId) throws UnableToLockProjectException { + if (!getProjectAtomicReference(project).compareAndSet(EMPTY_TASK_ID, taskId)) { + throw new UnableToLockProjectException(project, projectLocksHeldByTasks.get(project).get()); + } + return new LockToken(this, project, taskId); + } + + boolean unlock(Project.NameKey project, String taskId) { + return getProjectAtomicReference(project).compareAndSet(taskId, EMPTY_TASK_ID); + } + + private AtomicReference<String> getProjectAtomicReference(Project.NameKey project) { + return projectLocksHeldByTasks.computeIfAbsent( + project, k -> new AtomicReference<>(EMPTY_TASK_ID)); + } + + static final class LockToken implements AutoCloseable { + final Project.NameKey project; + final ProjectsLock projectsLock; + private final String taskId; + boolean closed; + + private LockToken(ProjectsLock projectsLock, Project.NameKey project, String taskId) { + this.project = project; + this.projectsLock = projectsLock; + this.taskId = taskId; + } + + @Override + public void close() { + if (!closed && projectsLock.unlock(project, taskId)) { + closed = true; + } + } + } + + static class UnableToLockProjectException extends Exception { + private final String conflictingTaskId; + + public UnableToLockProjectException(Project.NameKey project, String conflictingTaskId) { + super( + "Unable to lock project " + + project + + " because it is already locked by task " + + conflictingTaskId); + + this.conflictingTaskId = conflictingTaskId; + } + + public String getConflictingTaskId() { + return conflictingTaskId; + } + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java index 251f39f..4f32f26 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -89,6 +89,7 @@ .implement(HttpClient.class, SourceHttpClient.class) .build(SourceHttpClient.Factory.class)); + bind(ProjectsLock.class).in(Scopes.SINGLETON); install(new FactoryModuleBuilder().build(Source.Factory.class)); install( new FactoryModuleBuilder()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java index 1104ff4..50d91ff 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -130,6 +130,7 @@ fetchFactory, fetchRefsDatabase, deleteRefCommand, + new ProjectsLock(), PROJECT_NAME, urIish, Optional.of(pullReplicationApiRequestMetrics));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java new file mode 100644 index 0000000..89d1295 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
@@ -0,0 +1,65 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.gerrit.testing.GerritJUnit.assertThrows; + +import com.google.gerrit.entities.Project; +import org.junit.Test; + +public class ProjectsLockTest { + private final ProjectsLock projectsLock = new ProjectsLock(); + private final Project.NameKey project1 = Project.nameKey("project1"); + private final Project.NameKey project2 = Project.nameKey("project2"); + private final String task1 = "task1"; + private final String task2 = "task2"; + + @Test + public void shouldSuccessfullyLockAndUnlockProject() throws Exception { + ProjectsLock.LockToken lockToken = projectsLock.tryLock(project1, task1); + assertThatIsLocked(lockToken, project1); + assertThat(projectsLock.unlock(project1, task1)).isTrue(); + + try (ProjectsLock.LockToken newLockToken = projectsLock.tryLock(project1, task2)) { + assertThatIsLocked(newLockToken, project1); + } + } + + @Test + public void shouldFailToLockFromAnotherTaskOnLockedProject() throws Exception { + try (ProjectsLock.LockToken unused = projectsLock.tryLock(project1, task1)) { + ProjectsLock.UnableToLockProjectException e = + assertThrows( + ProjectsLock.UnableToLockProjectException.class, + () -> projectsLock.tryLock(project1, task2).close()); + assertThat(e.getConflictingTaskId()).isEqualTo(task1); + } + } + + @Test + public void shouldHandleMultipleProjectsIndependently() throws Exception { + try (ProjectsLock.LockToken lockToken1 = projectsLock.tryLock(project1, task1); + ProjectsLock.LockToken lockToken2 = projectsLock.tryLock(project2, task2)) { + assertThatIsLocked(lockToken1, project1); + assertThatIsLocked(lockToken2, project2); + } + } + + private void assertThatIsLocked(ProjectsLock.LockToken lockToken, Project.NameKey project1) { + assertThat(lockToken).isNotNull(); + assertThat(lockToken.project).isEqualTo(project1); + } +}