Merge branch 'stable-3.12'
* stable-3.12:
Log cancelled fetch collisions without reschedule
Add per-project shared lock to avoid fetch collisions
Revert "Prevent concurrent fetches on the same repository"
Revert "Extract QueueInfo into its own class"
Extract QueueInfo into its own class
Throw LockFailureException from filterAndLock interface
Expose new constructor for LockFailureException
Prevent concurrent fetches on the same repository
Change-Id: I2b8240bb840056d1f836d8a98d651f7115e1d96b
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index b194f2c..a43ce4a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -113,6 +113,7 @@
private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
private boolean succeeded;
private Map<String, AutoCloseable> fetchLocks;
+ private final ProjectsLock projectsLock;
@Inject
FetchOne(
@@ -126,6 +127,7 @@
FetchFactory fetchFactory,
FetchRefsDatabase fetchRefsDatabase,
DeleteRefCommand deleteRefCommand,
+ ProjectsLock projectsLock,
@Assisted Project.NameKey d,
@Assisted URIish u,
@Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
@@ -148,6 +150,7 @@
this.fetchFactory = fetchFactory;
maxRetries = s.getMaxRetries();
this.apiRequestMetrics = apiRequestMetrics;
+ this.projectsLock = projectsLock;
}
@Inject(optional = true)
@@ -350,126 +353,150 @@
// created and scheduled for a future point in time.)
//
isCollision = false;
- if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
- if (!canceled) {
- repLog.info(
- "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch task"
- + " [{}].",
- taskIdHex,
- uri,
- pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
- pool.reschedule(this, Source.RetryReason.COLLISION);
- isCollision = true;
+
+ try (ProjectsLock.LockToken unused = projectsLock.tryLock(projectName, getTaskIdHex())) {
+ if (replicationType == ReplicationType.ASYNC && !pool.requestRunway(this)) {
+ if (!canceled) {
+ repLog.info(
+ "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch"
+ + " task [{}].",
+ taskIdHex,
+ uri,
+ pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
+ pool.reschedule(this, Source.RetryReason.COLLISION);
+ isCollision = true;
+ }
+ return;
}
- return;
- }
- repLog.info(
- "[{}] {} replication from {} started for refs [{}] ...",
- taskIdHex,
- replicationType,
- uri,
- getRefSpecs());
- Timer1.Context<String> context = metrics.start(config.getName());
- try {
- long startedAt = context.getStartTime();
- long delay = NANOSECONDS.toMillis(startedAt - createdAt);
- git = gitManager.openRepository(projectName);
- List<FetchRefSpec> fetchRefSpecs = runImpl();
+ repLog.info(
+ "[{}] {} replication from {} started for refs [{}] ...",
+ taskIdHex,
+ replicationType,
+ uri,
+ getRefSpecs());
+ Timer1.Context<String> context = metrics.start(config.getName());
+ try {
+ long startedAt = context.getStartTime();
+ long delay = NANOSECONDS.toMillis(startedAt - createdAt);
+ git = gitManager.openRepository(projectName);
+ List<FetchRefSpec> fetchRefSpecs = runImpl();
- if (fetchRefSpecs.isEmpty()) {
- repLog.info(
- "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}"
- + " retries",
- taskIdHex,
- replicationType,
- uri,
- delay,
- retryCount);
- } else {
- metrics.record(config.getName(), delay, retryCount);
- long elapsed = NANOSECONDS.toMillis(context.stop());
- Optional<Long> elapsedEnd2End =
- apiRequestMetrics
- .flatMap(metrics -> metrics.stop(config.getName()))
- .map(NANOSECONDS::toMillis);
- repLog.info(
- "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
- taskIdHex,
- uri,
- elapsed,
- delay,
- retryCount,
- elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
- }
- } catch (RepositoryNotFoundException e) {
- stateLog.error(
- "["
- + taskIdHex
- + "] Cannot replicate "
- + projectName
- + "; Local repository error: "
- + e.getMessage(),
- getStatesAsArray());
+ if (fetchRefSpecs.isEmpty()) {
+ repLog.info(
+ "[{}] {} replication from {} finished but no refs were replicated, {}ms delay, {}"
+ + " retries",
+ taskIdHex,
+ replicationType,
+ uri,
+ delay,
+ retryCount);
+ } else {
+ metrics.record(config.getName(), delay, retryCount);
+ long elapsed = NANOSECONDS.toMillis(context.stop());
+ Optional<Long> elapsedEnd2End =
+ apiRequestMetrics
+ .flatMap(metrics -> metrics.stop(config.getName()))
+ .map(NANOSECONDS::toMillis);
+ repLog.info(
+ "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
+ taskIdHex,
+ uri,
+ elapsed,
+ delay,
+ retryCount,
+ elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
+ }
+ } catch (RepositoryNotFoundException e) {
+ stateLog.error(
+ "["
+ + taskIdHex
+ + "] Cannot replicate "
+ + projectName
+ + "; Local repository error: "
+ + e.getMessage(),
+ getStatesAsArray());
- } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
- // Tried to replicate to a remote via anonymous git:// but the repository
- // does not exist. In this case NoRemoteRepositoryException is not
- // raised.
- String msg = e.getMessage();
- repLog.error(
- "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
- } catch (NotSupportedException e) {
- stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
- } catch (PermanentTransportException e) {
- repLog.error(
- String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
- } catch (TransportException e) {
- repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e);
- if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
- lockRetryCount++;
- // The LockFailureException message contains both URI and reason
- // for this failure.
+ } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
+ // Tried to replicate to a remote via anonymous git:// but the repository
+ // does not exist. In this case NoRemoteRepositoryException is not
+ // raised.
+ String msg = e.getMessage();
+ repLog.error(
+ "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
+ } catch (NotSupportedException e) {
+ stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
+ } catch (PermanentTransportException e) {
+ repLog.error(
+ String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
+ } catch (TransportException e) {
+ repLog.error(String.format("[%s] Cannot replicate from %s", taskIdHex, uri), e);
+ if (replicationType == ReplicationType.ASYNC && e instanceof LockFailureException) {
+ lockRetryCount++;
+ // The LockFailureException message contains both URI and reason
+ // for this failure.
- // The remote fetch operation should be retried.
- if (lockRetryCount <= maxLockRetries) {
+ // The remote fetch operation should be retried.
+ if (lockRetryCount <= maxLockRetries) {
+ if (canceledWhileRunning.get()) {
+ logCanceledWhileRunningException(e);
+ } else {
+ pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+ }
+ } else {
+ repLog.error(
+ "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]"
+ + " {}",
+ taskIdHex,
+ lockRetryCount,
+ e.getMessage(),
+ taskIdHex,
+ uri);
+ }
+ } else if (replicationType == ReplicationType.ASYNC) {
if (canceledWhileRunning.get()) {
logCanceledWhileRunningException(e);
} else {
+ // The remote fetch operation should be retried.
pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
}
- } else {
- repLog.error(
- "[{}] Giving up after {} occurrences of this error: {} during replication from [{}]"
- + " {}",
- taskIdHex,
- lockRetryCount,
- e.getMessage(),
- taskIdHex,
- uri);
}
- } else if (replicationType == ReplicationType.ASYNC) {
- if (canceledWhileRunning.get()) {
- logCanceledWhileRunningException(e);
- } else {
- // The remote fetch operation should be retried.
- pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+ } catch (IOException e) {
+ stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
+ } catch (RuntimeException | Error e) {
+ stateLog.error(
+ "[" + taskIdHex + "] Unexpected error during replication from " + uri,
+ e,
+ getStatesAsArray());
+ } finally {
+ if (git != null) {
+ git.close();
}
- }
- } catch (IOException e) {
- stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
- } catch (RuntimeException | Error e) {
- stateLog.error(
- "[" + taskIdHex + "] Unexpected error during replication from " + uri,
- e,
- getStatesAsArray());
- } finally {
- if (git != null) {
- git.close();
- }
- if (replicationType == ReplicationType.ASYNC) {
- pool.notifyFinished(this);
+ if (replicationType == ReplicationType.ASYNC) {
+ pool.notifyFinished(this);
+ }
+ }
+ } catch (ProjectsLock.UnableToLockProjectException e) {
+ pool.fetchWasCanceled(this);
+ if (!canceled) {
+ repLog.info(
+ "[{}] Project {} is locked. Rescheduling replication from {} to avoid collision with an"
+ + " in-flight fetch task [{}].",
+ taskIdHex,
+ projectName,
+ uri,
+ e.getConflictingTaskId());
+ pool.reschedule(this, Source.RetryReason.COLLISION);
+ isCollision = true;
+ } else {
+ repLog.info(
+ "[{}] Project {} is locked. Task is cancelled and conflicting with in-flight fetch task"
+ + " [{}]. NOT rescheduling replication from {}.",
+ taskIdHex,
+ projectName,
+ e.getConflictingTaskId(),
+ uri);
}
}
}
@@ -636,7 +663,7 @@
}
private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs, boolean lock)
- throws com.google.gerrit.git.LockFailureException {
+ throws LockFailureException {
Set<String> refsNames =
refs.stream().map(FetchRefSpec::refName).collect(Collectors.toUnmodifiableSet());
Optional<ReplicationFetchFilter> maybeFilter = replicationFetchFilter();
@@ -784,6 +811,10 @@
LockFailureException(URIish uri, String message) {
super(uri, message);
}
+
+ public LockFailureException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
public Optional<PullReplicationApiRequestMetrics> getRequestMetrics() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
new file mode 100644
index 0000000..bfcc396
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLock.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+class ProjectsLock {
+ public static final String EMPTY_TASK_ID = "";
+ private final Map<Project.NameKey, AtomicReference<String>> projectLocksHeldByTasks =
+ new ConcurrentHashMap<>();
+
+ LockToken tryLock(Project.NameKey project, String taskId) throws UnableToLockProjectException {
+ if (!getProjectAtomicReference(project).compareAndSet(EMPTY_TASK_ID, taskId)) {
+ throw new UnableToLockProjectException(project, projectLocksHeldByTasks.get(project).get());
+ }
+ return new LockToken(this, project, taskId);
+ }
+
+ boolean unlock(Project.NameKey project, String taskId) {
+ return getProjectAtomicReference(project).compareAndSet(taskId, EMPTY_TASK_ID);
+ }
+
+ private AtomicReference<String> getProjectAtomicReference(Project.NameKey project) {
+ return projectLocksHeldByTasks.computeIfAbsent(
+ project, k -> new AtomicReference<>(EMPTY_TASK_ID));
+ }
+
+ static final class LockToken implements AutoCloseable {
+ final Project.NameKey project;
+ final ProjectsLock projectsLock;
+ private final String taskId;
+ boolean closed;
+
+ private LockToken(ProjectsLock projectsLock, Project.NameKey project, String taskId) {
+ this.project = project;
+ this.projectsLock = projectsLock;
+ this.taskId = taskId;
+ }
+
+ @Override
+ public void close() {
+ if (!closed && projectsLock.unlock(project, taskId)) {
+ closed = true;
+ }
+ }
+ }
+
+ static class UnableToLockProjectException extends Exception {
+ private final String conflictingTaskId;
+
+ public UnableToLockProjectException(Project.NameKey project, String conflictingTaskId) {
+ super(
+ "Unable to lock project "
+ + project
+ + " because it is already locked by task "
+ + conflictingTaskId);
+
+ this.conflictingTaskId = conflictingTaskId;
+ }
+
+ public String getConflictingTaskId() {
+ return conflictingTaskId;
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index a01a6d8..d4eb9b8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -94,6 +94,7 @@
.implement(HttpClient.class, SourceHttpClient.class)
.build(SourceHttpClient.Factory.class));
+ bind(ProjectsLock.class).in(Scopes.SINGLETON);
install(new FactoryModuleBuilder().build(Source.Factory.class));
install(
new FactoryModuleBuilder()
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
index 19d4a5b..fca499d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.replication.pull;
import com.google.gerrit.extensions.annotations.ExtensionPoint;
-import com.google.gerrit.git.LockFailureException;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne.LockFailureException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 1104ff4..50d91ff 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -130,6 +130,7 @@
fetchFactory,
fetchRefsDatabase,
deleteRefCommand,
+ new ProjectsLock(),
PROJECT_NAME,
urIish,
Optional.of(pullReplicationApiRequestMetrics));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
new file mode 100644
index 0000000..89d1295
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ProjectsLockTest.java
@@ -0,0 +1,65 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.gerrit.entities.Project;
+import org.junit.Test;
+
+public class ProjectsLockTest {
+ private final ProjectsLock projectsLock = new ProjectsLock();
+ private final Project.NameKey project1 = Project.nameKey("project1");
+ private final Project.NameKey project2 = Project.nameKey("project2");
+ private final String task1 = "task1";
+ private final String task2 = "task2";
+
+ @Test
+ public void shouldSuccessfullyLockAndUnlockProject() throws Exception {
+ ProjectsLock.LockToken lockToken = projectsLock.tryLock(project1, task1);
+ assertThatIsLocked(lockToken, project1);
+ assertThat(projectsLock.unlock(project1, task1)).isTrue();
+
+ try (ProjectsLock.LockToken newLockToken = projectsLock.tryLock(project1, task2)) {
+ assertThatIsLocked(newLockToken, project1);
+ }
+ }
+
+ @Test
+ public void shouldFailToLockFromAnotherTaskOnLockedProject() throws Exception {
+ try (ProjectsLock.LockToken unused = projectsLock.tryLock(project1, task1)) {
+ ProjectsLock.UnableToLockProjectException e =
+ assertThrows(
+ ProjectsLock.UnableToLockProjectException.class,
+ () -> projectsLock.tryLock(project1, task2).close());
+ assertThat(e.getConflictingTaskId()).isEqualTo(task1);
+ }
+ }
+
+ @Test
+ public void shouldHandleMultipleProjectsIndependently() throws Exception {
+ try (ProjectsLock.LockToken lockToken1 = projectsLock.tryLock(project1, task1);
+ ProjectsLock.LockToken lockToken2 = projectsLock.tryLock(project2, task2)) {
+ assertThatIsLocked(lockToken1, project1);
+ assertThatIsLocked(lockToken2, project2);
+ }
+ }
+
+ private void assertThatIsLocked(ProjectsLock.LockToken lockToken, Project.NameKey project1) {
+ assertThat(lockToken).isNotNull();
+ assertThat(lockToken.project).isEqualTo(project1);
+ }
+}