Add per-project shared lock to avoid fetch collisions

Introduce a singleton ProjectsLock to coordinate fetches across sources
on a per-project basis without blocking. FetchOne now tries to acquire a
non-blocking lock for the project before proceeding; if the lock is busy
we reschedule the task to avoid colliding with an in-flight fetch. When
acquired, the lock is held via a try-with- resources token and we still
honor the async runway check.

Previous changes (I3c3d7b97, I7883e680) attempted to centralize
coordination with a shared projects queue, but it dropped the fetch
origin, which led to fetch events being lost. The new lock-based
approach keeps the source context intact and guarantees at most one
concurrent fetch per project across sources.

Bug: Issue 437805590
Change-Id: I2ce2f5a8532a7282f03f245f3670e8c318de0a61
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index d60c2b7..76ecd80 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -113,6 +113,7 @@
   private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
   private boolean succeeded;
   private Map<String, AutoCloseable> fetchLocks;
+  private final ProjectsLock projectsLock;
 
   @Inject
   FetchOne(
@@ -126,6 +127,7 @@
       FetchFactory fetchFactory,
       FetchRefsDatabase fetchRefsDatabase,
       DeleteRefCommand deleteRefCommand,
+      ProjectsLock projectsLock,
       @Assisted Project.NameKey d,
       @Assisted URIish u,
       @Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
@@ -148,6 +150,7 @@
     this.fetchFactory = fetchFactory;
     maxRetries = s.getMaxRetries();
     this.apiRequestMetrics = apiRequestMetrics;
+    this.projectsLock = projectsLock;
   }
 
   @Inject(optional = true)
@@ -350,126 +353,142 @@
     // created and scheduled for a future point in time.)
     //
     isCollision = false;
-    if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
-      if (!canceled) {
-        repLog.info(
-            "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch task"
-                + " [{}].",
-            taskIdHex,
-            uri,
-            pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
-        pool.reschedule(this, Source.RetryReason.COLLISION);
-        isCollision = true;
+
+    try (ProjectsLock.LockToken unused = projectsLock.tryLock(projectName, getTaskIdHex())) {
+      if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
+        if (!canceled) {
+          repLog.info(
+              "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch"
+                  + " task [{}].",
+              taskIdHex,
+              uri,
+              pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
+          pool.reschedule(this, Source.RetryReason.COLLISION);
+          isCollision = true;
+        }
+        return;
       }
-      return;
-    }
 
-    repLog.info(
-        "[{}] {} replication from {} started for refs [{}] ...",
-        taskIdHex,
-        replicationType,
-        uri,
-        getRefSpecs());
-    Timer1.Context<String> context = metrics.start(config.getName());
-    try {
-      long startedAt = context.getStartTime();
-      long delay = NANOSECONDS.toMillis(startedAt - createdAt);
-      git = gitManager.openRepository(projectName);
-      List<FetchRefSpec> fetchRefSpecs = runImpl();
+      repLog.info(
+          "[{}] {} replication from {} started for refs [{}] ...",
+          taskIdHex,
+          replicationType,
+          uri,
+          getRefSpecs());
+      Timer1.Context<String> context = metrics.start(config.getName());
+      try {
+        long startedAt = context.getStartTime();
+        long delay = NANOSECONDS.toMillis(startedAt - createdAt);
+        git = gitManager.openRepository(projectName);
+        List<FetchRefSpec> fetchRefSpecs = runImpl();
 
-      if (fetchRefSpecs.isEmpty()) {
-        repLog.info(
-            "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}"
-                + " retries",
-            taskIdHex,
-            replicationType,
-            uri,
-            delay,
-            retryCount);
-      } else {
-        metrics.record(config.getName(), delay, retryCount);
-        long elapsed = NANOSECONDS.toMillis(context.stop());
-        Optional<Long> elapsedEnd2End =
-            apiRequestMetrics
-                .flatMap(metrics -> metrics.stop(config.getName()))
-                .map(NANOSECONDS::toMillis);
-        repLog.info(
-            "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
-            taskIdHex,
-            uri,
-            elapsed,
-            delay,
-            retryCount,
-            elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
-      }
-    } catch (RepositoryNotFoundException e) {
-      stateLog.error(
-          "["
-              + taskIdHex
-              + "] Cannot replicate "
-              + projectName
-              + "; Local repository error: "
-              + e.getMessage(),
-          getStatesAsArray());
+        if (fetchRefSpecs.isEmpty()) {
+          repLog.info(
+              "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}"
+                  + " retries",
+              taskIdHex,
+              replicationType,
+              uri,
+              delay,
+              retryCount);
+        } else {
+          metrics.record(config.getName(), delay, retryCount);
+          long elapsed = NANOSECONDS.toMillis(context.stop());
+          Optional<Long> elapsedEnd2End =
+              apiRequestMetrics
+                  .flatMap(metrics -> metrics.stop(config.getName()))
+                  .map(NANOSECONDS::toMillis);
+          repLog.info(
+              "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
+              taskIdHex,
+              uri,
+              elapsed,
+              delay,
+              retryCount,
+              elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
+        }
+      } catch (RepositoryNotFoundException e) {
+        stateLog.error(
+            "["
+                + taskIdHex
+                + "] Cannot replicate "
+                + projectName
+                + "; Local repository error: "
+                + e.getMessage(),
+            getStatesAsArray());
 
-    } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
-      // Tried to replicate to a remote via anonymous git:// but the repository
-      // does not exist. In this case NoRemoteRepositoryException is not
-      // raised.
-      String msg = e.getMessage();
-      repLog.error(
-          "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
-    } catch (NotSupportedException e) {
-      stateLog.error("[" + taskIdHex + "] Cannot replicate  from " + uri, e, getStatesAsArray());
-    } catch (PermanentTransportException e) {
-      repLog.error(
-          String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
-    } catch (TransportException e) {
-      repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e);
-      if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
-        lockRetryCount++;
-        // The LockFailureException message contains both URI and reason
-        // for this failure.
+      } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
+        // Tried to replicate to a remote via anonymous git:// but the repository
+        // does not exist. In this case NoRemoteRepositoryException is not
+        // raised.
+        String msg = e.getMessage();
+        repLog.error(
+            "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
+      } catch (NotSupportedException e) {
+        stateLog.error("[" + taskIdHex + "] Cannot replicate  from " + uri, e, getStatesAsArray());
+      } catch (PermanentTransportException e) {
+        repLog.error(
+            String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
+      } catch (TransportException e) {
+        repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e);
+        if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
+          lockRetryCount++;
+          // The LockFailureException message contains both URI and reason
+          // for this failure.
 
-        // The remote fetch operation should be retried.
-        if (lockRetryCount <= maxLockRetries) {
+          // The remote fetch operation should be retried.
+          if (lockRetryCount <= maxLockRetries) {
+            if (canceledWhileRunning.get()) {
+              logCanceledWhileRunningException(e);
+            } else {
+              pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+            }
+          } else {
+            repLog.error(
+                "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]"
+                    + " {}",
+                taskIdHex,
+                lockRetryCount,
+                e.getMessage(),
+                taskIdHex,
+                uri);
+          }
+        } else if (replicationType == ReplicationType.ASYNC) {
           if (canceledWhileRunning.get()) {
             logCanceledWhileRunningException(e);
           } else {
+            // The remote fetch operation should be retried.
             pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
           }
-        } else {
-          repLog.error(
-              "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]"
-                  + " {}",
-              taskIdHex,
-              lockRetryCount,
-              e.getMessage(),
-              taskIdHex,
-              uri);
         }
-      } else if (replicationType == ReplicationType.ASYNC) {
-        if (canceledWhileRunning.get()) {
-          logCanceledWhileRunningException(e);
-        } else {
-          // The remote fetch operation should be retried.
-          pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+      } catch (IOException e) {
+        stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
+      } catch (RuntimeException | Error e) {
+        stateLog.error(
+            "[" + taskIdHex + "] Unexpected error during replication from " + uri,
+            e,
+            getStatesAsArray());
+      } finally {
+        if (git != null) {
+          git.close();
         }
-      }
-    } catch (IOException e) {
-      stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
-    } catch (RuntimeException | Error e) {
-      stateLog.error(
-          "[" + taskIdHex + "] Unexpected error during replication from " + uri,
-          e,
-          getStatesAsArray());
-    } finally {
-      if (git != null) {
-        git.close();
-      }
 
-      if (replicationType == ReplicationType.ASYNC) {
-        pool.notifyFinished(this);
+        if (replicationType == ReplicationType.ASYNC) {
+          pool.notifyFinished(this);
+        }
+      }
+    } catch (ProjectsLock.UnableToLockProjectException e) {
+      pool.fetchWasCanceled(this);
+      if (!canceled) {
+        repLog.info(
+            "[{}] Project {} is locked. Rescheduling replication from {} to avoid collision with an"
+                + " in-flight fetch task [{}].",
+            taskIdHex,
+            projectName,
+            uri,
+            e.getConflictingTaskId());
+        pool.reschedule(this, Source.RetryReason.COLLISION);
+        isCollision = true;
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
new file mode 100644
index 0000000..bfcc396
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class ProjectsLock {
+  public static final String EMPTY_TASK_ID = "";
+  private final Map<Project.NameKey, AtomicReference<String>> projectLocksHeldByTasks =
+      new ConcurrentHashMap<>();
+
+  LockToken tryLock(Project.NameKey project, String taskId) throws UnableToLockProjectException {
+    if (!getProjectAtomicReference(project).compareAndSet(EMPTY_TASK_ID, taskId)) {
+      throw new UnableToLockProjectException(project, projectLocksHeldByTasks.get(project).get());
+    }
+    return new LockToken(this, project, taskId);
+  }
+
+  boolean unlock(Project.NameKey project, String taskId) {
+    return getProjectAtomicReference(project).compareAndSet(taskId, EMPTY_TASK_ID);
+  }
+
+  private AtomicReference<String> getProjectAtomicReference(Project.NameKey project) {
+    return projectLocksHeldByTasks.computeIfAbsent(
+        project, k -> new AtomicReference<>(EMPTY_TASK_ID));
+  }
+
+  static final class LockToken implements AutoCloseable {
+    final Project.NameKey project;
+    final ProjectsLock projectsLock;
+    private final String taskId;
+    boolean closed;
+
+    private LockToken(ProjectsLock projectsLock, Project.NameKey project, String taskId) {
+      this.project = project;
+      this.projectsLock = projectsLock;
+      this.taskId = taskId;
+    }
+
+    @Override
+    public void close() {
+      if (!closed && projectsLock.unlock(project, taskId)) {
+        closed = true;
+      }
+    }
+  }
+
+  static class UnableToLockProjectException extends Exception {
+    private final String conflictingTaskId;
+
+    public UnableToLockProjectException(Project.NameKey project, String conflictingTaskId) {
+      super(
+          "Unable to lock project "
+              + project
+              + " because it is already locked by task "
+              + conflictingTaskId);
+
+      this.conflictingTaskId = conflictingTaskId;
+    }
+
+    public String getConflictingTaskId() {
+      return conflictingTaskId;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 251f39f..4f32f26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -89,6 +89,7 @@
             .implement(HttpClient.class, SourceHttpClient.class)
             .build(SourceHttpClient.Factory.class));
 
+    bind(ProjectsLock.class).in(Scopes.SINGLETON);
     install(new FactoryModuleBuilder().build(Source.Factory.class));
     install(
         new FactoryModuleBuilder()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 1104ff4..50d91ff 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -130,6 +130,7 @@
             fetchFactory,
             fetchRefsDatabase,
             deleteRefCommand,
+            new ProjectsLock(),
             PROJECT_NAME,
             urIish,
             Optional.of(pullReplicationApiRequestMetrics));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
new file mode 100644
index 0000000..89d1295
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
@@ -0,0 +1,65 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.gerrit.entities.Project;
+import org.junit.Test;
+
+public class ProjectsLockTest {
+  private final ProjectsLock projectsLock = new ProjectsLock();
+  private final Project.NameKey project1 = Project.nameKey("project1");
+  private final Project.NameKey project2 = Project.nameKey("project2");
+  private final String task1 = "task1";
+  private final String task2 = "task2";
+
+  @Test
+  public void shouldSuccessfullyLockAndUnlockProject() throws Exception {
+    ProjectsLock.LockToken lockToken = projectsLock.tryLock(project1, task1);
+    assertThatIsLocked(lockToken, project1);
+    assertThat(projectsLock.unlock(project1, task1)).isTrue();
+
+    try (ProjectsLock.LockToken newLockToken = projectsLock.tryLock(project1, task2)) {
+      assertThatIsLocked(newLockToken, project1);
+    }
+  }
+
+  @Test
+  public void shouldFailToLockFromAnotherTaskOnLockedProject() throws Exception {
+    try (ProjectsLock.LockToken unused = projectsLock.tryLock(project1, task1)) {
+      ProjectsLock.UnableToLockProjectException e =
+          assertThrows(
+              ProjectsLock.UnableToLockProjectException.class,
+              () -> projectsLock.tryLock(project1, task2).close());
+      assertThat(e.getConflictingTaskId()).isEqualTo(task1);
+    }
+  }
+
+  @Test
+  public void shouldHandleMultipleProjectsIndependently() throws Exception {
+    try (ProjectsLock.LockToken lockToken1 = projectsLock.tryLock(project1, task1);
+        ProjectsLock.LockToken lockToken2 = projectsLock.tryLock(project2, task2)) {
+      assertThatIsLocked(lockToken1, project1);
+      assertThatIsLocked(lockToken2, project2);
+    }
+  }
+
+  private void assertThatIsLocked(ProjectsLock.LockToken lockToken, Project.NameKey project1) {
+    assertThat(lockToken).isNotNull();
+    assertThat(lockToken.project).isEqualTo(project1);
+  }
+}