Merge branch 'stable-3.11'

* stable-3.11:
  Do not lock refs when not executing a fetch
  Allow locking refs whilst filtering for fetch requests
  Remove unused Transport in FetchOneTest
  Avoid mocking ReplicationFetchFilter
  Remove redundant public modifier in ReplicationFetchFilter
  Record replication metrics for retrying tasks

Change-Id: I63317ad5f0f98e988844bf40f81522fec014641a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 939a05b..9f94a6a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -73,6 +73,8 @@
 public class FetchOne implements ProjectRunnable, CanceledWhileRunning, Completable {
   private final ReplicationStateListener stateLog;
   public static final String ALL_REFS = "..all..";
+  public static final boolean FILTER_AND_LOCK = true;
+  public static final boolean FILTER_ONLY = false;
   static final String ID_KEY = "fetchOneId";
   private final DeleteRefCommand deleteRefCommand;
 
@@ -109,6 +111,7 @@
   private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
   private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
   private boolean succeeded;
+  private Map<String, AutoCloseable> fetchLocks;
 
   @Inject
   FetchOne(
@@ -473,42 +476,46 @@
   }
 
   private List<FetchRefSpec> runImpl() throws IOException {
-    Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
-    List<FetchRefSpec> fetchRefSpecs = getFetchRefSpecs();
+    while (true) {
+      Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
+      List<FetchRefSpec> fetchRefSpecs = getFetchRefSpecs(FILTER_AND_LOCK);
 
-    try {
-      List<FetchRefSpec> toFetch =
-          fetchRefSpecs.stream().filter(rs -> rs.getSource() != null).toList();
-      Set<String> toDelete = refsToDelete(fetchRefSpecs);
-      updateStates(fetch.fetch(toFetch));
+      try {
+        repLog.info("[{}] Running fetch from {} on {} ...", taskIdHex, uri, fetchRefSpecs);
 
-      // JGit doesn't support a fetch of <empty> to a ref (e.g. :refs/to/delete) therefore we have
-      // manage them separately and remove them one by one.
-      if (!toDelete.isEmpty()) {
-        updateStates(
-            deleteRefCommand.deleteRefsSync(taskIdHex, projectName, toDelete, getRemoteName()));
+        List<FetchRefSpec> toFetch =
+            fetchRefSpecs.stream().filter(rs -> rs.getSource() != null).toList();
+        Set<String> toDelete = refsToDelete(fetchRefSpecs);
+        updateStates(fetch.fetch(toFetch));
+
+        // JGit doesn't support a fetch of <empty> to a ref (e.g. :refs/to/delete) therefore we have
+        // manage them separately and remove them one by one.
+        if (!toDelete.isEmpty()) {
+          updateStates(
+              deleteRefCommand.deleteRefsSync(taskIdHex, projectName, toDelete, getRemoteName()));
+        }
+        return fetchRefSpecs;
+      } catch (InexistentRefTransportException e) {
+        String inexistentRef = e.getInexistentRef();
+        repLog.info(
+            "[{}] Remote {} does not have ref {} in replication task, flagging as failed and"
+                + " removing from the replication task",
+            taskIdHex,
+            uri,
+            inexistentRef);
+        fetchFailures.add(e);
+        delta.remove(FetchRefSpec.fromRef(inexistentRef));
+        if (delta.isEmpty()) {
+          repLog.warn("[{}] Empty replication task, skipping.", taskIdHex);
+          return Collections.emptyList();
+        }
+      } catch (IOException e) {
+        notifyRefReplicatedIOException();
+        throw e;
+      } finally {
+        unlockRefSpecs(fetchLocks);
       }
-    } catch (InexistentRefTransportException e) {
-      String inexistentRef = e.getInexistentRef();
-      repLog.info(
-          "[{}] Remote {} does not have ref {} in replication task, flagging as failed and removing"
-              + " from the replication task",
-          taskIdHex,
-          uri,
-          inexistentRef);
-      fetchFailures.add(e);
-      delta.remove(FetchRefSpec.fromRef(inexistentRef));
-      if (delta.isEmpty()) {
-        repLog.warn("[{}] Empty replication task, skipping.", taskIdHex);
-        return Collections.emptyList();
-      }
-
-      runImpl();
-    } catch (IOException e) {
-      notifyRefReplicatedIOException();
-      throw e;
     }
-    return fetchRefSpecs;
   }
 
   @VisibleForTesting
@@ -539,9 +546,10 @@
    * <p>When {@link FetchOne#delta} is not empty and {@link FetchOne#replicationFetchFilter} was
    * provided, the filtered refsSpecs are returned.
    *
+   * @param lock true if the refs should also be locally locked upon filtering.
    * @return The list of refSpecs to fetch
    */
-  public List<FetchRefSpec> getFetchRefSpecs() throws IOException {
+  public List<FetchRefSpec> getFetchRefSpecs(boolean lock) throws IOException {
     List<FetchRefSpec> configRefSpecs =
         config.getFetchRefSpecs().stream().map(FetchRefSpec::fromRefSpec).toList();
 
@@ -549,16 +557,35 @@
       return configRefSpecs;
     }
 
-    return runRefsFilter(computeDeltaIfNeeded()).stream()
+    return runRefsFilter(computeDeltaIfNeeded(), lock).stream()
         .map(ref -> refToFetchRefSpec(ref, configRefSpecs))
         .filter(Optional::isPresent)
         .map(Optional::get)
         .collect(Collectors.toList());
   }
 
+  public void unlockRefSpecs(Map<String, AutoCloseable> locks) {
+    if (locks == null) {
+      return;
+    }
+
+    locks.forEach(
+        (key, value) -> {
+          try {
+            value.close();
+          } catch (Exception e) {
+            repLog.error(
+                String.format(
+                    "[%s] Error when unlocking ref %s:%s", taskIdHex, projectName.get(), key),
+                e);
+          }
+        });
+    locks.clear();
+  }
+
   public List<FetchRefSpec> safeGetFetchRefSpecs() {
     try {
-      return getFetchRefSpecs();
+      return getFetchRefSpecs(FILTER_ONLY);
     } catch (IOException e) {
       repLog.error("[{}] Error when evaluating refsSpecs: {}", taskIdHex, e.getMessage());
       return Collections.emptyList();
@@ -605,13 +632,22 @@
         .flatMap(filter -> Optional.ofNullable(filter.get()));
   }
 
-  private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs) {
+  private Set<FetchRefSpec> runRefsFilter(Set<FetchRefSpec> refs, boolean lock)
+      throws com.google.gerrit.git.LockFailureException {
     Set<String> refsNames =
         refs.stream().map(FetchRefSpec::refName).collect(Collectors.toUnmodifiableSet());
-    Set<String> filteredRefNames =
-        replicationFetchFilter()
-            .map(f -> f.filter(this.projectName.get(), refsNames))
-            .orElse(refsNames);
+    Optional<ReplicationFetchFilter> maybeFilter = replicationFetchFilter();
+    Set<String> filteredRefNames;
+    if (maybeFilter.isPresent()) {
+      if (lock) {
+        fetchLocks = maybeFilter.get().filterAndLock(this.projectName.get(), refsNames);
+        filteredRefNames = fetchLocks.keySet();
+      } else {
+        filteredRefNames = maybeFilter.get().filter(this.projectName.get(), refsNames);
+      }
+    } else {
+      filteredRefNames = refsNames;
+    }
     return refs.stream()
         .filter(refSpec -> filteredRefNames.contains(refSpec.refName()))
         .collect(Collectors.toUnmodifiableSet());
@@ -754,7 +790,7 @@
   @Override
   public boolean hasSucceeded() {
     try {
-      return succeeded || getFetchRefSpecs().isEmpty();
+      return succeeded || getFetchRefSpecs(FILTER_ONLY).isEmpty();
     } catch (IOException e) {
       repLog.error("[{}] Error when evaluating refsSpecs: {}", taskIdHex, e.getMessage());
       return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
index dfe6fbd..19d4a5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
@@ -15,7 +15,10 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import com.google.gerrit.git.LockFailureException;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Filter that is invoked before a set of remote refs are fetched from a remote instance.
@@ -25,5 +28,11 @@
 @ExtensionPoint
 public interface ReplicationFetchFilter {
 
-  public Set<String> filter(String projectName, Set<String> fetchRefs);
+  Set<String> filter(String projectName, Set<String> fetchRefs);
+
+  default Map<String, AutoCloseable> filterAndLock(String projectName, Set<String> fetchRefs)
+      throws LockFailureException {
+    return filter(projectName, fetchRefs).stream()
+        .collect(Collectors.toMap(ref -> ref, ref -> () -> {}));
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 320e9ec..b1d553d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -710,7 +710,10 @@
 
             if (fetchOp.setToRetry()) {
               postReplicationScheduledEvent(fetchOp);
-              pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES);
+              pool.schedule(
+                  queueMetrics.runWithMetrics(this, fetchOp),
+                  config.getRetryDelay(),
+                  TimeUnit.MINUTES);
               queueMetrics.incrementTaskRetrying(this);
             } else {
               fetchOp.canceledByReplication();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 6ce3dbe..1104ff4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.FetchOne.FILTER_ONLY;
 import static com.googlesource.gerrit.plugins.replication.pull.FetchOne.refsToDelete;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.anyList;
@@ -48,6 +49,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import junit.framework.AssertionFailedError;
 import org.eclipse.jgit.errors.PackProtocolException;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
@@ -55,7 +57,6 @@
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RefSpec;
 import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.Transport;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
@@ -91,8 +92,6 @@
   @Mock private FetchRefsDatabase fetchRefsDatabase;
   @Mock private DeleteRefCommand deleteRefCommand;
 
-  @Mock private Transport transport;
-
   private URIish urIish;
   private FetchOne objectUnderTest;
 
@@ -184,7 +183,7 @@
     objectUnderTest.addRefs(refSpecsSetOf(TEST_REF));
     objectUnderTest.addRefs(refSpecsSetOf(TEST_DELETE_REF));
 
-    assertThat(objectUnderTest.getFetchRefSpecs())
+    assertThat(objectUnderTest.getFetchRefSpecs(FILTER_ONLY))
         .isEqualTo(List.of(FetchRefSpec.fromRef(TEST_DELETE_REF)));
   }
 
@@ -194,7 +193,7 @@
     objectUnderTest.addRefs(refSpecsSetOf(TEST_DELETE_REF));
     objectUnderTest.addRefs(refSpecsSetOf(TEST_REF));
 
-    assertThat(objectUnderTest.getFetchRefSpecs())
+    assertThat(objectUnderTest.getFetchRefSpecs(FetchOne.FILTER_ONLY))
         .isEqualTo(List.of(FetchRefSpec.fromRef(TEST_REF + ":" + TEST_REF)));
   }
 
@@ -347,9 +346,7 @@
         Optional.of(List.of(TEST_REF)));
     objectUnderTest.addRefs(refSpecs);
     objectUnderTest.setReplicationFetchFilter(replicationFilter);
-    ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
-    when(replicationFilter.get()).thenReturn(mockFilter);
-    when(mockFilter.filter(TEST_PROJECT_NAME, refs)).thenReturn(Set.of(TEST_REF));
+    when(replicationFilter.get()).thenReturn((projectName, fetchRefs) -> Set.of(TEST_REF));
 
     objectUnderTest.run();
 
@@ -379,11 +376,9 @@
     Map<String, Ref> remoteRefsMap = Map.of(REMOTE_REF, mock(Ref.class));
     setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
     setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
-    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+    setupReplicationFilter(remoteRefs, remoteRefs);
 
     objectUnderTest.run();
-
-    verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs);
   }
 
   @Test
@@ -397,11 +392,9 @@
         Map.of(REF, mockRef("badc0feebadc0feebadc0feebadc0feebadc0fee"));
     setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
     setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
-    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+    setupReplicationFilter(remoteRefs, remoteRefs);
 
     objectUnderTest.run();
-
-    verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs);
   }
 
   @Test
@@ -414,11 +407,9 @@
     Map<String, Ref> remoteRefsMap = Map.of(REF, refValue);
     setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
     setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
-    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+    setupReplicationFilter(Set.of(), remoteRefs);
 
     objectUnderTest.run();
-
-    verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of());
   }
 
   @Test
@@ -430,11 +421,9 @@
 
     setupRemoteConfigMock(List.of(DEV_REFS_SPEC));
     setupFetchRefsDatabaseMock(Map.of(), Map.of(REF, mock(Ref.class)));
-    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+    setupReplicationFilter(Set.of(), remoteRefs);
 
     objectUnderTest.run();
-
-    verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of());
   }
 
   @Test
@@ -840,7 +829,7 @@
         List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()),
         Optional.of(List.of(TEST_REF)));
     objectUnderTest.addRefs(refSpecs);
-    setupReplicationFilterMock(Collections.emptySet());
+    setupReplicationFilter(Set.of(), Set.of());
 
     objectUnderTest.run();
 
@@ -862,9 +851,7 @@
         Optional.of(List.of(TEST_REF)));
     objectUnderTest.addRefs(refSpecs);
     objectUnderTest.setReplicationFetchFilter(replicationFilter);
-    ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
-    when(replicationFilter.get()).thenReturn(mockFilter);
-    when(mockFilter.filter(TEST_PROJECT_NAME, refs)).thenReturn(Set.of(TEST_REF));
+    when(replicationFilter.get()).thenReturn((projectName, refNames) -> Set.of(TEST_REF));
 
     objectUnderTest.run();
 
@@ -942,12 +929,17 @@
     when(fetchRefsDatabase.getRemoteRefsMap(repository, urIish)).thenReturn(remote);
   }
 
-  private ReplicationFetchFilter setupReplicationFilterMock(Set<String> inRefs) {
+  private void setupReplicationFilter(Set<String> expectedRefs, Set<String> filteredRefs) {
     objectUnderTest.setReplicationFetchFilter(replicationFilter);
-    ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
-    when(replicationFilter.get()).thenReturn(mockFilter);
-    when(mockFilter.filter(TEST_PROJECT_NAME, inRefs)).thenReturn(inRefs);
-    return mockFilter;
+    when(replicationFilter.get())
+        .thenReturn(
+            (projectName, refs) -> {
+              if (refs.equals(expectedRefs)) {
+                return filteredRefs;
+              }
+              throw new AssertionFailedError(
+                  "Expected to filter " + expectedRefs + " but received " + refs);
+            });
   }
 
   private List<ReplicationState> createTestStates(FetchRefSpec refSpec, int numberOfStates) {