Allow refs filtering for non-delta fetches

Change I19d58638 introduced the ability to filter out refs from the
fetch-replication deltas. That is, the pull-replication plugin could
delegate other plugins to influence which refs should be fetched from
the remote source.

This mechanism however, was not working when receiving a request to
fetch from a source, without an explicit list of refs (for example via
the `pull-replication start` SSH command).

In this case the pull-replication was blindly fetching all matching refs
from the source.

When fetching from source without any explicit refs (i.e. no delta),
compare the remote refs that match the configured refSpecs against the
local refs and select only the ones needing to be created or updated.

The resulting list can then be passed on, as for the delta-case, to the
provided fetch-filter implementation.

In a multi-site setup, this allows to check the resulting refs against
the global-refdb so that no already-in-sync refs are fetched (details on
such implementation can be found at If60f5f5ea).

Note that this change does nothing to validate fetching of refs that do
not currently exist locally or that have been removed from the remote.

These concerns are captured as separate issues in [1] and [2],
respectively.

[1] https://issues.gerritcodereview.com/issues/316844804
[2] https://issues.gerritcodereview.com/issues/316844806

Bug: Issue 40014518
Change-Id: I29507376e9e4339d8a0517fcabd566014963a8ad
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index d02cb02..66aa8d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -17,6 +17,7 @@
 import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
+import com.google.common.base.Suppliers;
 import com.google.common.base.Throwables;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
@@ -49,12 +50,14 @@
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
 import org.eclipse.jgit.errors.NotSupportedException;
 import org.eclipse.jgit.errors.RemoteRepositoryException;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RefSpec;
@@ -82,6 +85,7 @@
   private final RemoteConfig config;
   private final PerThreadRequestScope.Scoper threadScoper;
 
+  private final FetchRefsDatabase fetchRefsDatabase;
   private final Project.NameKey projectName;
   private final URIish uri;
   private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
@@ -105,6 +109,16 @@
   private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
   private boolean succeeded;
 
+  private final Supplier<List<RefSpec>> fetchRefSpecsSupplier =
+      Suppliers.memoize(
+          () -> {
+            try {
+              return computeFetchRefSpecs();
+            } catch (IOException e) {
+              throw new RuntimeException("Could not compute refs specs to fetch", e);
+            }
+          });
+
   @Inject
   FetchOne(
       GitRepositoryManager grm,
@@ -115,6 +129,7 @@
       ReplicationStateListeners sl,
       FetchReplicationMetrics m,
       FetchFactory fetchFactory,
+      FetchRefsDatabase fetchRefsDatabase,
       @Assisted Project.NameKey d,
       @Assisted URIish u,
       @Assisted Optional<PullReplicationApiRequestMetrics> apiRequestMetrics) {
@@ -122,6 +137,7 @@
     pool = s;
     config = c.getRemoteConfig();
     threadScoper = ts;
+    this.fetchRefsDatabase = fetchRefsDatabase;
     projectName = d;
     uri = u;
     lockRetryCount = 0;
@@ -441,7 +457,7 @@
 
   private List<RefSpec> runImpl() throws IOException {
     Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
-    List<RefSpec> fetchRefSpecs = getFetchRefSpecs();
+    List<RefSpec> fetchRefSpecs = fetchRefSpecsSupplier.get();
 
     try {
       updateStates(fetch.fetch(fetchRefSpecs));
@@ -467,24 +483,76 @@
     return fetchRefSpecs;
   }
 
+  /**
+   * Return the list of refSpecs to fetch, possibly after having been filtered.
+   *
+   * <p>When {@link FetchOne#delta} is empty and no {@link FetchOne#replicationFetchFilter} was
+   * provided, the configured refsSpecs is returned.
+   *
+   * <p>When {@link FetchOne#delta} is empty and {@link FetchOne#replicationFetchFilter} was
+   * provided, the configured refsSpecs is expanded to the delta of refs that requires fetching:
+   * that is, refs that are not already up-to-date. The result is then passed to the filter.
+   *
+   * <p>When {@link FetchOne#delta} is not empty and {@link FetchOne#replicationFetchFilter} was
+   * provided, the filtered refsSpecs are returned.
+   *
+   * @return The list of refSpecs to fetch
+   */
   public List<RefSpec> getFetchRefSpecs() {
+    return fetchRefSpecsSupplier.get();
+  }
+
+  private List<RefSpec> computeFetchRefSpecs() throws IOException {
     List<RefSpec> configRefSpecs = config.getFetchRefSpecs();
-    if (delta.isEmpty()) {
+
+    if (delta.isEmpty() && replicationFetchFilter().isEmpty()) {
       return configRefSpecs;
     }
 
-    return runRefsFilter(delta).stream()
+    return runRefsFilter(computeDelta(configRefSpecs)).stream()
         .map(ref -> refToFetchRefSpec(ref, configRefSpecs))
         .filter(Optional::isPresent)
         .map(Optional::get)
         .collect(Collectors.toList());
   }
 
-  private Set<String> runRefsFilter(Set<String> refs) {
+  private Set<String> computeDelta(List<RefSpec> configRefSpecs) throws IOException {
+    if (!delta.isEmpty()) {
+      return delta;
+    }
+    Map<String, Ref> localRefsMap = fetchRefsDatabase.getLocalRefsMap(git);
+    Map<String, Ref> remoteRefsMap = fetchRefsDatabase.getRemoteRefsMap(git, uri);
+
+    return remoteRefsMap.keySet().stream()
+        .filter(
+            srcRef -> {
+              // that match our configured refSpecs
+              return refToFetchRefSpec(srcRef, configRefSpecs)
+                  .flatMap(
+                      spec ->
+                          shouldBeFetched(srcRef, localRefsMap, remoteRefsMap)
+                              ? Optional.of(srcRef)
+                              : Optional.empty())
+                  .isPresent();
+            })
+        .collect(Collectors.toSet());
+  }
+
+  private boolean shouldBeFetched(
+      String srcRef, Map<String, Ref> localRefsMap, Map<String, Ref> remoteRefsMap) {
+    // If we don't have it locally
+    return localRefsMap.get(srcRef) == null
+        // OR we have it, but with a different localRefsMap value
+        || !localRefsMap.get(srcRef).getObjectId().equals(remoteRefsMap.get(srcRef).getObjectId());
+  }
+
+  private Optional<ReplicationFetchFilter> replicationFetchFilter() {
     return Optional.ofNullable(replicationFetchFilter)
-        .flatMap(filter -> Optional.ofNullable(filter.get()))
-        .map(f -> f.filter(this.projectName.get(), refs))
-        .orElse(refs);
+        .flatMap(filter -> Optional.ofNullable(filter.get()));
+  }
+
+  private Set<String> runRefsFilter(Set<String> refs) {
+    return replicationFetchFilter().map(f -> f.filter(this.projectName.get(), refs)).orElse(refs);
   }
 
   private Optional<RefSpec> refToFetchRefSpec(String ref, List<RefSpec> configRefSpecs) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefsDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefsDatabase.java
new file mode 100644
index 0000000..e536aad
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefsDatabase.java
@@ -0,0 +1,49 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static java.util.stream.Collectors.toMap;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.pull.transport.TransportProvider;
+import java.io.IOException;
+import java.util.Map;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.FetchConnection;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class FetchRefsDatabase {
+  private final TransportProvider transportProvider;
+
+  @Inject
+  public FetchRefsDatabase(TransportProvider transportProvider) {
+    this.transportProvider = transportProvider;
+  }
+
+  public Map<String, Ref> getRemoteRefsMap(Repository repository, URIish uri) throws IOException {
+    try (Transport tn = transportProvider.open(repository, uri);
+        FetchConnection fc = tn.openFetch()) {
+      return fc.getRefsMap();
+    }
+  }
+
+  public Map<String, Ref> getLocalRefsMap(Repository repository) throws IOException {
+    return repository.getRefDatabase().getRefs().stream().collect(toMap(Ref::getName, r -> r));
+  }
+}
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index 51d0a53..057567f 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -39,6 +39,28 @@
 with a `*`. If the pattern starts with `^` and ends with `*`, it is
 treated as a regular expression.
 
+FILTERING
+---------
+If the `fetch-filter` is enabled, this command will compare all remote refs
+that match the configured refSpecs against the local refs and select only
+the ones that are not already up-to-date.
+
+The configured refsSpecs is effectively expanded into
+an explicit set of refs that need fetching, meaning that only new refs, or
+refs whose sha1 differs from the remote one will be fetched.
+
+For example: `refs/*:refs/*` might be expanded to `refs/heads/master` and `refs/tags/v1`).
+
+The resulting refs list will then be passed to the provided `fetch-filter`
+implementation (see [extension-point.md](./extension-point.md))
+documentation for more information on this.
+
+*Note* This ref expansion-strategy prevents the `mirror`ing option from
+being honoured, since local refs that no longer exist at the source repository
+are effectively ignored.
+
+This behaviour has been captured in issue [319395646](https://issues.gerritcodereview.com/issues/319395646).
+
 ACCESS
 ------
 Caller must be a member of the privileged 'Administrators' group,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
index 3445604..86b0792 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -35,9 +35,11 @@
 import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.InexistentRefTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -45,10 +47,13 @@
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.eclipse.jgit.errors.PackProtocolException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RefSpec;
 import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.Transport;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
@@ -66,6 +71,8 @@
   private final String URI_PATTERN = "http://test.com/" + TEST_PROJECT_NAME + ".git";
   private final TestMetricMaker testMetricMaker = new TestMetricMaker();
 
+  private final RefSpec ALL_REFS_SPEC = new RefSpec("refs/*:refs/*");
+
   @Mock private GitRepositoryManager grm;
   @Mock private Repository repository;
   @Mock private Source source;
@@ -77,6 +84,9 @@
   @Mock private PullReplicationApiRequestMetrics pullReplicationApiRequestMetrics;
   @Mock private RemoteConfig remoteConfig;
   @Mock private DynamicItem<ReplicationFetchFilter> replicationFilter;
+  @Mock private FetchRefsDatabase fetchRefsDatabase;
+
+  @Mock private Transport transport;
 
   private URIish urIish;
   private FetchOne objectUnderTest;
@@ -98,7 +108,7 @@
     pullReplicationApiRequestMetrics = mock(PullReplicationApiRequestMetrics.class);
     remoteConfig = mock(RemoteConfig.class);
     replicationFilter = mock(DynamicItem.class);
-
+    fetchRefsDatabase = mock(FetchRefsDatabase.class);
     when(sourceConfiguration.getRemoteConfig()).thenReturn(remoteConfig);
     when(idGenerator.next()).thenReturn(1);
     int maxLockRetries = 1;
@@ -114,6 +124,7 @@
             replicationStateListeners,
             fetchReplicationMetrics,
             fetchFactory,
+            fetchRefsDatabase,
             PROJECT_NAME,
             urIish,
             Optional.of(pullReplicationApiRequestMetrics));
@@ -327,6 +338,73 @@
   }
 
   @Test
+  public void fetchWithoutDelta_shouldPassNewRefsToFilter() throws Exception {
+    setupMocks(true);
+    String REMOTE_REF = "refs/heads/remote";
+    Set<String> remoteRefs = Set.of(REMOTE_REF);
+    Map<String, Ref> localRefsMap = Map.of();
+    Map<String, Ref> remoteRefsMap = Map.of(REMOTE_REF, mock(Ref.class));
+    setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
+    setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
+    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+
+    objectUnderTest.run();
+
+    verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs);
+  }
+
+  @Test
+  public void fetchWithoutDelta_shouldPassUpdatedRefsToFilter() throws Exception {
+    setupMocks(true);
+    String REF = "refs/heads/someRef";
+    Set<String> remoteRefs = Set.of(REF);
+    Map<String, Ref> localRefsMap =
+        Map.of(REF, mockRef("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef"));
+    Map<String, Ref> remoteRefsMap =
+        Map.of(REF, mockRef("badc0feebadc0feebadc0feebadc0feebadc0fee"));
+    setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
+    setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
+    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+
+    objectUnderTest.run();
+
+    verify(mockFilter).filter(TEST_PROJECT_NAME, remoteRefs);
+  }
+
+  @Test
+  public void fetchWithoutDelta_shouldNotPassUpToDateRefsToFilter() throws Exception {
+    setupMocks(true);
+    String REF = "refs/heads/someRef";
+    Set<String> remoteRefs = Set.of(REF);
+    Ref refValue = mockRef("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
+    Map<String, Ref> localRefsMap = Map.of(REF, refValue);
+    Map<String, Ref> remoteRefsMap = Map.of(REF, refValue);
+    setupRemoteConfigMock(List.of(ALL_REFS_SPEC));
+    setupFetchRefsDatabaseMock(localRefsMap, remoteRefsMap);
+    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+
+    objectUnderTest.run();
+
+    verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of());
+  }
+
+  @Test
+  public void fetchWithoutDelta_shouldNotPassRefsNonMatchingConfigToFilter() throws Exception {
+    setupMocks(true);
+    String REF = "refs/non-dev/someRef";
+    Set<String> remoteRefs = Set.of(REF);
+    RefSpec DEV_REFS_SPEC = new RefSpec("refs/dev/*:refs/dev/*");
+
+    setupRemoteConfigMock(List.of(DEV_REFS_SPEC));
+    setupFetchRefsDatabaseMock(Map.of(), Map.of(REF, mock(Ref.class)));
+    ReplicationFetchFilter mockFilter = setupReplicationFilterMock(remoteRefs);
+
+    objectUnderTest.run();
+
+    verify(mockFilter).filter(TEST_PROJECT_NAME, Set.of());
+  }
+
+  @Test
   public void shouldMarkTheReplicationStatusAsSucceededOnSuccessfulReplicationOfARef()
       throws Exception {
     setupMocks(true);
@@ -727,10 +805,7 @@
         List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()),
         Optional.of(List.of(TEST_REF)));
     objectUnderTest.addRefs(refSpecs);
-    objectUnderTest.setReplicationFetchFilter(replicationFilter);
-    ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
-    when(replicationFilter.get()).thenReturn(mockFilter);
-    when(mockFilter.filter(TEST_PROJECT_NAME, refSpecs)).thenReturn(Collections.emptySet());
+    setupReplicationFilterMock(Collections.emptySet());
 
     objectUnderTest.run();
 
@@ -789,6 +864,26 @@
     when(grm.openRepository(PROJECT_NAME)).thenReturn(repository);
   }
 
+  private Ref mockRef(String objectIdStr) {
+    Ref r = mock(Ref.class);
+    when(r.getObjectId()).thenReturn(ObjectId.fromString(objectIdStr));
+    return r;
+  }
+
+  private void setupFetchRefsDatabaseMock(Map<String, Ref> local, Map<String, Ref> remote)
+      throws IOException {
+    when(fetchRefsDatabase.getLocalRefsMap(repository)).thenReturn(local);
+    when(fetchRefsDatabase.getRemoteRefsMap(repository, urIish)).thenReturn(remote);
+  }
+
+  private ReplicationFetchFilter setupReplicationFilterMock(Set<String> inRefs) {
+    objectUnderTest.setReplicationFetchFilter(replicationFilter);
+    ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
+    when(replicationFilter.get()).thenReturn(mockFilter);
+    when(mockFilter.filter(TEST_PROJECT_NAME, inRefs)).thenReturn(inRefs);
+    return mockFilter;
+  }
+
   private List<ReplicationState> createTestStates(String ref, int numberOfStates) {
     List<ReplicationState> states =
         IntStream.rangeClosed(1, numberOfStates)