Merge branch 'stable-3.10' into stable-3.11 * stable-3.10: Do not lock refs when not executing a fetch Allow locking refs whilst filtering for fetch requests Remove unused Transport in FetchOneTest Avoid mocking ReplicationFetchFilter Remove redundant public modifier in ReplicationFetchFilter Record replication metrics for retrying tasks Change-Id: I4bbbbd665e155205f691041ac0140b68ec041df6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java index 939a05b..9f94a6a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -73,6 +73,8 @@ public class FetchOne implements ProjectRunnable, CanceledWhileRunning, Completable { private final ReplicationStateListener stateLog; public static final String ALL_REFS = "..all.."; + public static final boolean FILTER_AND_LOCK = true; + public static final boolean FILTER_ONLY = false; static final String ID_KEY = "fetchOneId"; private final DeleteRefCommand deleteRefCommand; @@ -109,6 +111,7 @@ private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics; private DynamicItem<ReplicationFetchFilter> replicationFetchFilter; private boolean succeeded; + private Map<String, AutoCloseable> fetchLocks; @Inject FetchOne( @@ -473,42 +476,46 @@ } private List<FetchRefSpec> runImpl() throws IOException { - Fetch fetch = fetchFactory.create(taskIdHex, uri, git); - List<FetchRefSpec> fetchRefSpecs = getFetchRefSpecs(); + while (true) { + Fetch fetch = fetchFactory.create(taskIdHex, uri, git); + List<FetchRefSpec> fetchRefSpecs = getFetchRefSpecs(FILTER_AND_LOCK); - try { - List<FetchRefSpec> toFetch = - fetchRefSpecs.stream().filter(rs -> rs.getSource() != null).toList(); - Set<String> toDelete = refsToDelete(fetchRefSpecs); - updateStates(fetch.fetch(toFetch)); + try { + repLog.info("[{}] Running fetch from {} on {} ...", taskIdHex, uri, fetchRefSpecs); - // JGit doesn't support a fetch of <empty> to a ref (e.g. :refs/to/delete) therefore we have - // manage them separately and remove them one by one. - if (!toDelete.isEmpty()) { - updateStates( - deleteRefCommand.deleteRefsSync(taskIdHex, projectName, toDelete, getRemoteName())); + List<FetchRefSpec> toFetch = + fetchRefSpecs.stream().filter(rs -> rs.getSource() != null).toList(); + Set<String> toDelete = refsToDelete(fetchRefSpecs); + updateStates(fetch.fetch(toFetch)); + + // JGit doesn't support a fetch of <empty> to a ref (e.g. :refs/to/delete) therefore we have + // manage them separately and remove them one by one. + if (!toDelete.isEmpty()) { + updateStates( + deleteRefCommand.deleteRefsSync(taskIdHex, projectName, toDelete, getRemoteName())); + } + return fetchRefSpecs; + } catch (InexistentRefTransportException e) { + String inexistentRef = e.getInexistentRef(); + repLog.info( + "[{}] Remote {} does not have ref {} in replication task, flagging as failed and" + + " removing from the replication task", + taskIdHex, + uri, + inexistentRef); + fetchFailures.add(e); + delta.remove(FetchRefSpec.fromRef(inexistentRef)); + if (delta.isEmpty()) { + repLog.warn("[{}] Empty replication task, skipping.", taskIdHex); + return Collections.emptyList(); + } + } catch (IOException e) { + notifyRefReplicatedIOException(); + throw e; + } finally { + unlockRefSpecs(fetchLocks); } - } catch (InexistentRefTransportException e) { - String inexistentRef = e.getInexistentRef(); - repLog.info( - "[{}] Remote {} does not have ref {} in replication task, flagging as failed and removing" - + " from the replication task", - taskIdHex, - uri, - inexistentRef); - fetchFailures.add(e); - delta.remove(FetchRefSpec.fromRef(inexistentRef)); - if (delta.isEmpty()) { - repLog.warn("[{}] Empty replication task, skipping.", taskIdHex); - return Collections.emptyList(); - } - - runImpl(); - } catch (IOException e) { - notifyRefReplicatedIOException(); - throw e; } - return fetchRefSpecs; } @VisibleForTesting @@ -539,9 +546,10 @@ * <p>When {@link FetchOne#delta} is not empty and {@link FetchOne#replicationFetchFilter} was * provided, the filtered refsSpecs are returned. * + * @param lock true if the refs should also be locally locked upon filtering. * @return The list of refSpecs to fetch */ - public List<FetchRefSpec> getFetchRefSpecs() throws IOException { + public List<FetchRefSpec> getFetchRefSpecs(boolean lock) throws IOException { List<FetchRefSpec> configRefSpecs = config.getFetchRefSpecs().stream().map(FetchRefSpec::fromRefSpec).toList(); @@ -549,16 +557,35 @@ return configRefSpecs; } - return runRefsFilter(computeDeltaIfNeeded()).stream() + return runRefsFilter(computeDeltaIfNeeded(), lock).stream() .map(ref -> refToFetchRefSpec(ref, configRefSpecs)) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); } + public void unlockRefSpecs(Map<String, AutoCloseable> locks) { + if (locks == null) { + return; + } + + locks.forEach( + (key, value) -> { + try { + value.close(); + } catch (Exception e) { + repLog.error( + String.format( + "[%s] Error when unlocking ref %s:%s", taskIdHex, projectName.get(), key), + e); + } + }); + locks.clear(); + } + public List<FetchRefSpec> safeGetFetchRefSpecs() { try { - return getFetchRefSpecs(); + return getFetchRefSpecs(FILTER_ONLY); } catch (IOException e) { repLog.error("[{}] Error when evaluating refsSpecs: {}", taskIdHex, e.getMessage()); return Collections.emptyList(); @@ -605,13 +632,22 @@ .flatMap(filter -> Optional.ofNullable(filter.get())); } - private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs) { + private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs, boolean lock) + throws com.google.gerrit.git.LockFailureException { Set<String> refsNames = refs.stream().map(FetchRefSpec::refName).collect(Collectors.toUnmodifiableSet()); - Set<String> filteredRefNames = - replicationFetchFilter() - .map(f -> f.filter(this.projectName.get(), refsNames)) - .orElse(refsNames); + Optional<ReplicationFetchFilter> maybeFilter = replicationFetchFilter(); + Set<String> filteredRefNames; + if (maybeFilter.isPresent()) { + if (lock) { + fetchLocks = maybeFilter.get().filterAndLock(this.projectName.get(), refsNames); + filteredRefNames = fetchLocks.keySet(); + } else { + filteredRefNames = maybeFilter.get().filter(this.projectName.get(), refsNames); + } + } else { + filteredRefNames = refsNames; + } return refs.stream() .filter(refSpec -> filteredRefNames.contains(refSpec.refName())) .collect(Collectors.toUnmodifiableSet()); @@ -754,7 +790,7 @@ @Override public boolean hasSucceeded() { try { - return succeeded || getFetchRefSpecs().isEmpty(); + return succeeded || getFetchRefSpecs(FILTER_ONLY).isEmpty(); } catch (IOException e) { repLog.error("[{}] Error when evaluating refsSpecs: {}", taskIdHex, e.getMessage()); return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java index dfe6fbd..19d4a5b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
@@ -15,7 +15,10 @@ package com.googlesource.gerrit.plugins.replication.pull; import com.google.gerrit.extensions.annotations.ExtensionPoint; +import com.google.gerrit.git.LockFailureException; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Filter that is invoked before a set of remote refs are fetched from a remote instance. @@ -25,5 +28,11 @@ @ExtensionPoint public interface ReplicationFetchFilter { - public Set<String> filter(String projectName, Set<String> fetchRefs); + Set<String> filter(String projectName, Set<String> fetchRefs); + + default Map<String, AutoCloseable> filterAndLock(String projectName, Set<String> fetchRefs) + throws LockFailureException { + return filter(projectName, fetchRefs).stream() + .collect(Collectors.toMap(ref -> ref, ref -> () -> {})); + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java index 320e9ec..b1d553d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -710,7 +710,10 @@ if (fetchOp.setToRetry()) { postReplicationScheduledEvent(fetchOp); - pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES); + pool.schedule( + queueMetrics.runWithMetrics(this, fetchOp), + config.getRetryDelay(), + TimeUnit.MINUTES); queueMetrics.incrementTaskRetrying(this); } else { fetchOp.canceledByReplication();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java index 6ce3dbe..1104ff4 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -15,6 +15,7 @@ package com.googlesource.gerrit.plugins.replication.pull; import static com.google.common.truth.Truth.assertThat; +import static com.googlesource.gerrit.plugins.replication.pull.FetchOne.FILTER_ONLY; import static com.googlesource.gerrit.plugins.replication.pull.FetchOne.refsToDelete; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.anyList; @@ -48,6 +49,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import junit.framework.AssertionFailedError; import org.eclipse.jgit.errors.PackProtocolException; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; @@ -55,7 +57,6 @@ import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.transport.RefSpec; import org.eclipse.jgit.transport.RemoteConfig; -import org.eclipse.jgit.transport.Transport; import org.eclipse.jgit.transport.URIish; import org.junit.Before; import org.junit.Test; @@ -91,8 +92,6 @@ @Mock private FetchRefsDatabase fetchRefsDatabase; @Mock private DeleteRefCommand deleteRefCommand; - @Mock private Transport transport; - private URIish urIish; private FetchOne objectUnderTest; @@ -184,7 +183,7 @@ objectUnderTest.addRefs(refSpecsSetOf(TEST_REF)); objectUnderTest.addRefs(refSpecsSetOf(TEST_DELETE_REF)); - assertThat(objectUnderTest.getFetchRefSpecs()) + assertThat(objectUnderTest.getFetchRefSpecs(FILTER_ONLY)) .isEqualTo(List.of(FetchRefSpec.fromRef(TEST_DELETE_REF))); } @@ -194,7 +193,7 @@ objectUnderTest.addRefs(refSpecsSetOf(TEST_DELETE_REF)); objectUnderTest.addRefs(refSpecsSetOf(TEST_REF)); - assertThat(objectUnderTest.getFetchRefSpecs()) + assertThat(objectUnderTest.getFetchRefSpecs(FetchOne.FILTER_ONLY)) .isEqualTo(List.of(FetchRefSpec.fromRef(TEST_REF + ":" + TEST_REF))); } @@ -347,9 +346,7 @@ Optional.of(List.of(TEST_REF))); objectUnderTest.addRefs(refSpecs); objectUnderTest.setReplicationFetchFilter(replicationFilter); - ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class); - when(replicationFilter.get()).thenReturn(mockFilter); - when(mockFilter.filter(TEST_PROJECT_NAME, refs)).thenReturn(Set.of(TEST_REF)); + when(replicationFilter.get()).thenReturn((projectName, fetchRefs) -> Set.of(TEST_REF)); objectUnderTest.run(); @@ -379,11 +376,9 @@ Map<String, Ref> remoteRefsMap = Map.of(REMOTE_REF, mock(Ref.class)); setupRemoteConfigMock(List.of(ALL_REFS_SPEC)); setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap); - ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs); + setupReplicationFilter(remoteRefs, remoteRefs); objectUnderTest.run(); - - verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs); } @Test @@ -397,11 +392,9 @@ Map.of(REF, mockRef("badc0feebadc0feebadc0feebadc0feebadc0fee")); setupRemoteConfigMock(List.of(ALL_REFS_SPEC)); setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap); - ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs); + setupReplicationFilter(remoteRefs, remoteRefs); objectUnderTest.run(); - - verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs); } @Test @@ -414,11 +407,9 @@ Map<String, Ref> remoteRefsMap = Map.of(REF, refValue); setupRemoteConfigMock(List.of(ALL_REFS_SPEC)); setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap); - ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs); + setupReplicationFilter(Set.of(), remoteRefs); objectUnderTest.run(); - - verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of()); } @Test @@ -430,11 +421,9 @@ setupRemoteConfigMock(List.of(DEV_REFS_SPEC)); setupFetchRefsDatabaseMock(Map.of(), Map.of(REF, mock(Ref.class))); - ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs); + setupReplicationFilter(Set.of(), remoteRefs); objectUnderTest.run(); - - verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of()); } @Test @@ -840,7 +829,7 @@ List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()), Optional.of(List.of(TEST_REF))); objectUnderTest.addRefs(refSpecs); - setupReplicationFilterMock(Collections.emptySet()); + setupReplicationFilter(Set.of(), Set.of()); objectUnderTest.run(); @@ -862,9 +851,7 @@ Optional.of(List.of(TEST_REF))); objectUnderTest.addRefs(refSpecs); objectUnderTest.setReplicationFetchFilter(replicationFilter); - ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class); - when(replicationFilter.get()).thenReturn(mockFilter); - when(mockFilter.filter(TEST_PROJECT_NAME, refs)).thenReturn(Set.of(TEST_REF)); + when(replicationFilter.get()).thenReturn((projectName, refNames) -> Set.of(TEST_REF)); objectUnderTest.run(); @@ -942,12 +929,17 @@ when(fetchRefsDatabase.getRemoteRefsMap(repository, urIish)).thenReturn(remote); } - private ReplicationFetchFilter setupReplicationFilterMock(Set<String> inRefs) { + private void setupReplicationFilter(Set<String> expectedRefs, Set<String> filteredRefs) { objectUnderTest.setReplicationFetchFilter(replicationFilter); - ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class); - when(replicationFilter.get()).thenReturn(mockFilter); - when(mockFilter.filter(TEST_PROJECT_NAME, inRefs)).thenReturn(inRefs); - return mockFilter; + when(replicationFilter.get()) + .thenReturn( + (projectName, refs) -> { + if (refs.equals(expectedRefs)) { + return filteredRefs; + } + throw new AssertionFailedError( + "Expected to filter " + expectedRefs + " but received " + refs); + }); } private List<ReplicationState> createTestStates(FetchRefSpec refSpec, int numberOfStates) {