Merge branch 'stable-3.11'
* stable-3.11:
Do not lock refs when not executing a fetch
Allow locking refs whilst filtering for fetch requests
Remove unused Transport in FetchOneTest
Avoid mocking ReplicationFetchFilter
Remove redundant public modifier in ReplicationFetchFilter
Record replication metrics for retrying tasks
Change-Id: I63317ad5f0f98e988844bf40f81522fec014641a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 939a05b..9f94a6a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -73,6 +73,8 @@
public class FetchOne implements ProjectRunnable, CanceledWhileRunning, Completable {
private final ReplicationStateListener stateLog;
public static final String ALL_REFS = "..all..";
+ public static final boolean FILTER_AND_LOCK = true;
+ public static final boolean FILTER_ONLY = false;
static final String ID_KEY = "fetchOneId";
private final DeleteRefCommand deleteRefCommand;
@@ -109,6 +111,7 @@
private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
private boolean succeeded;
+ private Map<String, AutoCloseable> fetchLocks;
@Inject
FetchOne(
@@ -473,42 +476,46 @@
}
private List<FetchRefSpec> runImpl() throws IOException {
- Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
- List<FetchRefSpec> fetchRefSpecs = getFetchRefSpecs();
+ while (true) {
+ Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
+ List<FetchRefSpec> fetchRefSpecs = getFetchRefSpecs(FILTER_AND_LOCK);
- try {
- List<FetchRefSpec> toFetch =
- fetchRefSpecs.stream().filter(rs -> rs.getSource() != null).toList();
- Set<String> toDelete = refsToDelete(fetchRefSpecs);
- updateStates(fetch.fetch(toFetch));
+ try {
+ repLog.info("[{}] Running fetch from {} on {} ...", taskIdHex, uri, fetchRefSpecs);
- // JGit doesn't support a fetch of <empty> to a ref (e.g. :refs/to/delete) therefore we have
- // manage them separately and remove them one by one.
- if (!toDelete.isEmpty()) {
- updateStates(
- deleteRefCommand.deleteRefsSync(taskIdHex, projectName, toDelete, getRemoteName()));
+ List<FetchRefSpec> toFetch =
+ fetchRefSpecs.stream().filter(rs -> rs.getSource() != null).toList();
+ Set<String> toDelete = refsToDelete(fetchRefSpecs);
+ updateStates(fetch.fetch(toFetch));
+
+ // JGit doesn't support a fetch of <empty> to a ref (e.g. :refs/to/delete) therefore we have
+ // manage them separately and remove them one by one.
+ if (!toDelete.isEmpty()) {
+ updateStates(
+ deleteRefCommand.deleteRefsSync(taskIdHex, projectName, toDelete, getRemoteName()));
+ }
+ return fetchRefSpecs;
+ } catch (InexistentRefTransportException e) {
+ String inexistentRef = e.getInexistentRef();
+ repLog.info(
+ "[{}] Remote {} does not have ref {} in replication task, flagging as failed and"
+ + " removing from the replication task",
+ taskIdHex,
+ uri,
+ inexistentRef);
+ fetchFailures.add(e);
+ delta.remove(FetchRefSpec.fromRef(inexistentRef));
+ if (delta.isEmpty()) {
+ repLog.warn("[{}] Empty replication task, skipping.", taskIdHex);
+ return Collections.emptyList();
+ }
+ } catch (IOException e) {
+ notifyRefReplicatedIOException();
+ throw e;
+ } finally {
+ unlockRefSpecs(fetchLocks);
}
- } catch (InexistentRefTransportException e) {
- String inexistentRef = e.getInexistentRef();
- repLog.info(
- "[{}] Remote {} does not have ref {} in replication task, flagging as failed and removing"
- + " from the replication task",
- taskIdHex,
- uri,
- inexistentRef);
- fetchFailures.add(e);
- delta.remove(FetchRefSpec.fromRef(inexistentRef));
- if (delta.isEmpty()) {
- repLog.warn("[{}] Empty replication task, skipping.", taskIdHex);
- return Collections.emptyList();
- }
-
- runImpl();
- } catch (IOException e) {
- notifyRefReplicatedIOException();
- throw e;
}
- return fetchRefSpecs;
}
@VisibleForTesting
@@ -539,9 +546,10 @@
* <p>When {@link FetchOne#delta} is not empty and {@link FetchOne#replicationFetchFilter} was
* provided, the filtered refsSpecs are returned.
*
+ * @param lock true if the refs should also be locally locked upon filtering.
* @return The list of refSpecs to fetch
*/
- public List<FetchRefSpec> getFetchRefSpecs() throws IOException {
+ public List<FetchRefSpec> getFetchRefSpecs(boolean lock) throws IOException {
List<FetchRefSpec> configRefSpecs =
config.getFetchRefSpecs().stream().map(FetchRefSpec::fromRefSpec).toList();
@@ -549,16 +557,35 @@
return configRefSpecs;
}
- return runRefsFilter(computeDeltaIfNeeded()).stream()
+ return runRefsFilter(computeDeltaIfNeeded(), lock).stream()
.map(ref -> refToFetchRefSpec(ref, configRefSpecs))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
+ public void unlockRefSpecs(Map<String, AutoCloseable> locks) {
+ if (locks == null) {
+ return;
+ }
+
+ locks.forEach(
+ (key, value) -> {
+ try {
+ value.close();
+ } catch (Exception e) {
+ repLog.error(
+ String.format(
+ "[%s] Error when unlocking ref %s:%s", taskIdHex, projectName.get(), key),
+ e);
+ }
+ });
+ locks.clear();
+ }
+
public List<FetchRefSpec> safeGetFetchRefSpecs() {
try {
- return getFetchRefSpecs();
+ return getFetchRefSpecs(FILTER_ONLY);
} catch (IOException e) {
repLog.error("[{}] Error when evaluating refsSpecs: {}", taskIdHex, e.getMessage());
return Collections.emptyList();
@@ -605,13 +632,22 @@
.flatMap(filter -> Optional.ofNullable(filter.get()));
}
- private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs) {
+ private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs, boolean lock)
+ throws com.google.gerrit.git.LockFailureException {
Set<String> refsNames =
refs.stream().map(FetchRefSpec::refName).collect(Collectors.toUnmodifiableSet());
- Set<String> filteredRefNames =
- replicationFetchFilter()
- .map(f -> f.filter(this.projectName.get(), refsNames))
- .orElse(refsNames);
+ Optional<ReplicationFetchFilter> maybeFilter = replicationFetchFilter();
+ Set<String> filteredRefNames;
+ if (maybeFilter.isPresent()) {
+ if (lock) {
+ fetchLocks = maybeFilter.get().filterAndLock(this.projectName.get(), refsNames);
+ filteredRefNames = fetchLocks.keySet();
+ } else {
+ filteredRefNames = maybeFilter.get().filter(this.projectName.get(), refsNames);
+ }
+ } else {
+ filteredRefNames = refsNames;
+ }
return refs.stream()
.filter(refSpec -> filteredRefNames.contains(refSpec.refName()))
.collect(Collectors.toUnmodifiableSet());
@@ -754,7 +790,7 @@
@Override
public boolean hasSucceeded() {
try {
- return succeeded || getFetchRefSpecs().isEmpty();
+ return succeeded || getFetchRefSpecs(FILTER_ONLY).isEmpty();
} catch (IOException e) {
repLog.error("[{}] Error when evaluating refsSpecs: {}", taskIdHex, e.getMessage());
return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
index dfe6fbd..19d4a5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
@@ -15,7 +15,10 @@
package com.googlesource.gerrit.plugins.replication.pull;
import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import com.google.gerrit.git.LockFailureException;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Filter that is invoked before a set of remote refs are fetched from a remote instance.
@@ -25,5 +28,11 @@
@ExtensionPoint
public interface ReplicationFetchFilter {
- public Set<String> filter(String projectName, Set<String> fetchRefs);
+ Set<String> filter(String projectName, Set<String> fetchRefs);
+
+ default Map<String, AutoCloseable> filterAndLock(String projectName, Set<String> fetchRefs)
+ throws LockFailureException {
+ return filter(projectName, fetchRefs).stream()
+ .collect(Collectors.toMap(ref -> ref, ref -> () -> {}));
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 320e9ec..b1d553d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -710,7 +710,10 @@
if (fetchOp.setToRetry()) {
postReplicationScheduledEvent(fetchOp);
- pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES);
+ pool.schedule(
+ queueMetrics.runWithMetrics(this, fetchOp),
+ config.getRetryDelay(),
+ TimeUnit.MINUTES);
queueMetrics.incrementTaskRetrying(this);
} else {
fetchOp.canceledByReplication();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 6ce3dbe..1104ff4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.replication.pull;
import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.FetchOne.FILTER_ONLY;
import static com.googlesource.gerrit.plugins.replication.pull.FetchOne.refsToDelete;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyList;
@@ -48,6 +49,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import junit.framework.AssertionFailedError;
import org.eclipse.jgit.errors.PackProtocolException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -55,7 +57,6 @@
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.Transport;
import org.eclipse.jgit.transport.URIish;
import org.junit.Before;
import org.junit.Test;
@@ -91,8 +92,6 @@
@Mock private FetchRefsDatabase fetchRefsDatabase;
@Mock private DeleteRefCommand deleteRefCommand;
- @Mock private Transport transport;
-
private URIish urIish;
private FetchOne objectUnderTest;
@@ -184,7 +183,7 @@
objectUnderTest.addRefs(refSpecsSetOf(TEST_REF));
objectUnderTest.addRefs(refSpecsSetOf(TEST_DELETE_REF));
- assertThat(objectUnderTest.getFetchRefSpecs())
+ assertThat(objectUnderTest.getFetchRefSpecs(FILTER_ONLY))
.isEqualTo(List.of(FetchRefSpec.fromRef(TEST_DELETE_REF)));
}
@@ -194,7 +193,7 @@
objectUnderTest.addRefs(refSpecsSetOf(TEST_DELETE_REF));
objectUnderTest.addRefs(refSpecsSetOf(TEST_REF));
- assertThat(objectUnderTest.getFetchRefSpecs())
+ assertThat(objectUnderTest.getFetchRefSpecs(FetchOne.FILTER_ONLY))
.isEqualTo(List.of(FetchRefSpec.fromRef(TEST_REF + ":" + TEST_REF)));
}
@@ -347,9 +346,7 @@
Optional.of(List.of(TEST_REF)));
objectUnderTest.addRefs(refSpecs);
objectUnderTest.setReplicationFetchFilter(replicationFilter);
- ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
- when(replicationFilter.get()).thenReturn(mockFilter);
- when(mockFilter.filter(TEST_PROJECT_NAME, refs)).thenReturn(Set.of(TEST_REF));
+ when(replicationFilter.get()).thenReturn((projectName, fetchRefs) -> Set.of(TEST_REF));
objectUnderTest.run();
@@ -379,11 +376,9 @@
Map<String, Ref> remoteRefsMap = Map.of(REMOTE_REF, mock(Ref.class));
setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
- ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+ setupReplicationFilter(remoteRefs, remoteRefs);
objectUnderTest.run();
-
- verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs);
}
@Test
@@ -397,11 +392,9 @@
Map.of(REF, mockRef("badc0feebadc0feebadc0feebadc0feebadc0fee"));
setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
- ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+ setupReplicationFilter(remoteRefs, remoteRefs);
objectUnderTest.run();
-
- verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs);
}
@Test
@@ -414,11 +407,9 @@
Map<String, Ref> remoteRefsMap = Map.of(REF, refValue);
setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
- ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+ setupReplicationFilter(Set.of(), remoteRefs);
objectUnderTest.run();
-
- verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of());
}
@Test
@@ -430,11 +421,9 @@
setupRemoteConfigMock(List.of(DEV_REFS_SPEC));
setupFetchRefsDatabaseMock(Map.of(), Map.of(REF, mock(Ref.class)));
- ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+ setupReplicationFilter(Set.of(), remoteRefs);
objectUnderTest.run();
-
- verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of());
}
@Test
@@ -840,7 +829,7 @@
List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()),
Optional.of(List.of(TEST_REF)));
objectUnderTest.addRefs(refSpecs);
- setupReplicationFilterMock(Collections.emptySet());
+ setupReplicationFilter(Set.of(), Set.of());
objectUnderTest.run();
@@ -862,9 +851,7 @@
Optional.of(List.of(TEST_REF)));
objectUnderTest.addRefs(refSpecs);
objectUnderTest.setReplicationFetchFilter(replicationFilter);
- ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
- when(replicationFilter.get()).thenReturn(mockFilter);
- when(mockFilter.filter(TEST_PROJECT_NAME, refs)).thenReturn(Set.of(TEST_REF));
+ when(replicationFilter.get()).thenReturn((projectName, refNames) -> Set.of(TEST_REF));
objectUnderTest.run();
@@ -942,12 +929,17 @@
when(fetchRefsDatabase.getRemoteRefsMap(repository, urIish)).thenReturn(remote);
}
- private ReplicationFetchFilter setupReplicationFilterMock(Set<String> inRefs) {
+ private void setupReplicationFilter(Set<String> expectedRefs, Set<String> filteredRefs) {
objectUnderTest.setReplicationFetchFilter(replicationFilter);
- ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
- when(replicationFilter.get()).thenReturn(mockFilter);
- when(mockFilter.filter(TEST_PROJECT_NAME, inRefs)).thenReturn(inRefs);
- return mockFilter;
+ when(replicationFilter.get())
+ .thenReturn(
+ (projectName, refs) -> {
+ if (refs.equals(expectedRefs)) {
+ return filteredRefs;
+ }
+ throw new AssertionFailedError(
+ "Expected to filter " + expectedRefs + " but received " + refs);
+ });
}
private List<ReplicationState> createTestStates(FetchRefSpec refSpec, int numberOfStates) {