Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Increase FetchOne coverage with unit tests
  Display the in-flight taskId when a runway conflict is detected
  Restore the fetching of all refs with empty delta
  Allow to filter out refs from the fetch-replication deltas
  Do not accumulate replication events with retrying tasks
  Add replication taskId during the execution of the fetch
  Allow FetchOne to retry the failed ref only

Change-Id: Ic3962ffbf8a91b768a4c7984236d06096dcdb15e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 43db907..42bea3e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Sets;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.PerThreadRequestScope;
@@ -35,6 +36,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.InexistentRefTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.PermanentTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
@@ -82,6 +84,7 @@
   private final Project.NameKey projectName;
   private final URIish uri;
   private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
+  private final Set<TransportException> fetchFailures = Sets.newHashSetWithExpectedSize(4);
   private boolean fetchAllRefs;
   private Repository git;
   private boolean retrying;
@@ -98,6 +101,7 @@
   private final AtomicBoolean canceledWhileRunning;
   private final FetchFactory fetchFactory;
   private final Optional<PullReplicationApiRequestMetrics> apiRequestMetrics;
+  private DynamicItem<ReplicationFetchFilter> replicationFetchFilter;
 
   @Inject
   FetchOne(
@@ -131,16 +135,23 @@
     this.apiRequestMetrics = apiRequestMetrics;
   }
 
+  @Inject(optional = true)
+  public void setReplicationFetchFilter(
+      DynamicItem<ReplicationFetchFilter> replicationFetchFilter) {
+    this.replicationFetchFilter = replicationFetchFilter;
+  }
+
   @Override
   public void cancel() {
-    repLog.info("Replication {} was canceled", getURI());
+    repLog.info("[{}] Replication task from {} was canceled", taskIdHex, getURI());
     canceledByReplication();
     pool.fetchWasCanceled(this);
   }
 
   @Override
   public void setCanceledWhileRunning() {
-    repLog.info("Replication {} was canceled while being executed", getURI());
+    repLog.info(
+        "[{}] Replication task from {} was canceled while being executed", taskIdHex, getURI());
     canceledWhileRunning.set(true);
   }
 
@@ -159,6 +170,10 @@
     return true;
   }
 
+  public String getTaskIdHex() {
+    return taskIdHex;
+  }
+
   @Override
   public String toString() {
     String print = "[" + taskIdHex + "] fetch " + uri;
@@ -195,10 +210,10 @@
     if (ALL_REFS.equals(ref)) {
       delta.clear();
       fetchAllRefs = true;
-      repLog.trace("Added all refs for replication from {}", uri);
+      repLog.trace("[{}] Added all refs for replication from {}", taskIdHex, uri);
     } else if (!fetchAllRefs) {
       delta.add(ref);
-      repLog.trace("Added ref {} for replication from {}", ref, uri);
+      repLog.trace("[{}] Added ref {} for replication from {}", taskIdHex, ref, uri);
     }
   }
 
@@ -277,6 +292,10 @@
     }
   }
 
+  public Set<TransportException> getFetchFailures() {
+    return fetchFailures;
+  }
+
   private void runFetchOperation() {
     try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
       doRunFetchOperation();
@@ -291,16 +310,17 @@
     if (!pool.requestRunway(this)) {
       if (!canceled) {
         repLog.info(
-            "Rescheduling [{}] replication to {} to avoid collision with an in-flight fetch.",
+            "[{}] Rescheduling replication from {} to avoid collision with an in-flight fetch task [{}].",
             taskIdHex,
-            uri);
+            uri,
+            pool.getInFlight(getURI()).map(FetchOne::getTaskIdHex).orElse("<unknown>"));
         pool.reschedule(this, Source.RetryReason.COLLISION);
       }
       return;
     }
 
     repLog.info(
-        "Replication [{}] from {} started for refs [{}] ...",
+        "[{}] Replication from {} started for refs [{}] ...",
         taskIdHex,
         uri,
         String.join(",", getRefs()));
@@ -317,7 +337,7 @@
               .flatMap(metrics -> metrics.stop(config.getName()))
               .map(NANOSECONDS::toMillis);
       repLog.info(
-          "Replication [{}] from {} completed in {}ms, {}ms delay, {} retries{}",
+          "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
           taskIdHex,
           uri,
           elapsed,
@@ -326,7 +346,12 @@
           elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
     } catch (RepositoryNotFoundException e) {
       stateLog.error(
-          "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
+          "["
+              + taskIdHex
+              + "] Cannot replicate "
+              + projectName
+              + "; Local repository error: "
+              + e.getMessage(),
           getStatesAsArray());
 
     } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
@@ -335,9 +360,9 @@
       // raised.
       String msg = e.getMessage();
       repLog.error(
-          "Cannot replicate [{}] {}; Remote repository error: {}", taskIdHex, projectName, msg);
+          "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
     } catch (NotSupportedException e) {
-      stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+      stateLog.error("[" + taskIdHex + "] Cannot replicate  from " + uri, e, getStatesAsArray());
     } catch (PermanentTransportException e) {
       repLog.error(
           String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
@@ -346,7 +371,7 @@
         lockRetryCount++;
         // The LockFailureException message contains both URI and reason
         // for this failure.
-        repLog.error("Cannot replicate [{}] from {}: {}", taskIdHex, uri, e.getMessage());
+        repLog.error("[{}] Cannot replicate from {}: {}", taskIdHex, uri, e.getMessage());
 
         // The remote fetch operation should be retried.
         if (lockRetryCount <= maxLockRetries) {
@@ -357,7 +382,8 @@
           }
         } else {
           repLog.error(
-              "Giving up after {} occurrences of this error: {} during replication from [{}] {}",
+              "[{}] Giving up after {} occurrences of this error: {} during replication from [{}] {}",
+              taskIdHex,
               lockRetryCount,
               e.getMessage(),
               taskIdHex,
@@ -373,9 +399,12 @@
         }
       }
     } catch (IOException e) {
-      stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+      stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
     } catch (RuntimeException | Error e) {
-      stateLog.error("Unexpected error during replication from " + uri, e, getStatesAsArray());
+      stateLog.error(
+          "[" + taskIdHex + "] Unexpected error during replication from " + uri,
+          e,
+          getStatesAsArray());
     } finally {
       if (git != null) {
         git.close();
@@ -385,13 +414,31 @@
   }
 
   private void logCanceledWhileRunningException(TransportException e) {
-    repLog.info("Cannot replicate [{}] from {}. It was canceled while running", taskIdHex, uri, e);
+    repLog.info("[{}] Cannot replicate from {}. It was canceled while running", taskIdHex, uri, e);
   }
 
   private void runImpl() throws IOException {
-    Fetch fetch = fetchFactory.create(uri, git);
+    Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
     List<RefSpec> fetchRefSpecs = getFetchRefSpecs();
-    updateStates(fetch.fetch(fetchRefSpecs));
+
+    try {
+      updateStates(fetch.fetch(fetchRefSpecs));
+    } catch (InexistentRefTransportException e) {
+      String inexistentRef = e.getInexistentRef();
+      repLog.info(
+          "[{}] Remote {} does not have ref {} in replication task, flagging as failed and removing from the replication task",
+          taskIdHex,
+          uri,
+          inexistentRef);
+      fetchFailures.add(e);
+      delta.remove(inexistentRef);
+      if (delta.isEmpty()) {
+        repLog.warn("[{}] Empty replication task, skipping.", taskIdHex);
+        return;
+      }
+
+      runImpl();
+    }
   }
 
   private List<RefSpec> getFetchRefSpecs() {
@@ -400,13 +447,20 @@
       return configRefSpecs;
     }
 
-    return delta.stream()
+    return runRefsFilter(delta).stream()
         .map(ref -> refToFetchRefSpec(ref, configRefSpecs))
         .filter(Optional::isPresent)
         .map(Optional::get)
         .collect(Collectors.toList());
   }
 
+  private Set<String> runRefsFilter(Set<String> refs) {
+    return Optional.ofNullable(replicationFetchFilter)
+        .flatMap(filter -> Optional.ofNullable(filter.get()))
+        .map(f -> f.filter(this.projectName.get(), refs))
+        .orElse(refs);
+  }
+
   private Optional<RefSpec> refToFetchRefSpec(String ref, List<RefSpec> configRefSpecs) {
     for (RefSpec refSpec : configRefSpecs) {
       if (refSpec.matchSource(ref)) {
@@ -444,7 +498,8 @@
         case REJECTED_MISSING_OBJECT:
           stateLog.error(
               String.format(
-                  "Failed replicate %s from %s: result %s", uri, u.getRemoteName(), u.getResult()),
+                  "[%s] Failed replicate %s from %s: result %s",
+                  taskIdHex, uri, u.getRemoteName(), u.getResult()),
               logStatesArray);
           fetchStatus = ReplicationState.RefFetchResult.FAILED;
           anyRefFailed = true;
@@ -458,7 +513,8 @@
         case REJECTED_OTHER_REASON:
           stateLog.error(
               String.format(
-                  "Failed replicate %s from %s, reason: %s", uri, u.getRemoteName(), u.toString()),
+                  "[%s] Failed replicate %s from %s, reason: %s",
+                  taskIdHex, uri, u.getRemoteName(), u.toString()),
               logStatesArray);
 
           fetchStatus = ReplicationState.RefFetchResult.FAILED;
@@ -495,7 +551,10 @@
                 null);
       }
     }
-    stateMap.clear();
+
+    for (String doneRef : doneRefs) {
+      stateMap.removeAll(doneRef);
+    }
   }
 
   public static class LockFailureException extends TransportException {
@@ -505,4 +564,8 @@
       super(uri, message);
     }
   }
+
+  public Optional<PullReplicationApiRequestMetrics> getRequestMetrics() {
+    return apiRequestMetrics;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationExtensionPointModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationExtensionPointModule.java
new file mode 100644
index 0000000..7b35668
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationExtensionPointModule.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+
+/**
+ * Gerrit libModule for applying a fetch-filter for pull replications.
+ *
+ * <p>It should be used only when an actual filter is defined, otherwise the default plugin
+ * behaviour will be fetching refs without any filtering.
+ */
+public class ReplicationExtensionPointModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.itemOf(binder(), ReplicationFetchFilter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
new file mode 100644
index 0000000..dfe6fbd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationFetchFilter.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import java.util.Set;
+
+/**
+ * Filter that is invoked before a set of remote refs are fetched from a remote instance.
+ *
+ * <p>It can be used to filter out unwanted fetches.
+ */
+@ExtensionPoint
+public interface ReplicationFetchFilter {
+
+  public Set<String> filter(String projectName, Set<String> fetchRefs);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 3d9f5c3..ff1664e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -86,6 +86,7 @@
 import java.util.function.Supplier;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -439,7 +440,7 @@
     synchronized (stateLock) {
       FetchOne e = pending.get(uri);
       Future<?> f = CompletableFuture.completedFuture(null);
-      if (e == null) {
+      if (e == null || e.isRetrying()) {
         e = opFactory.create(project, uri, apiRequestMetrics);
         addRef(e, ref);
         e.addState(ref, state);
@@ -587,10 +588,20 @@
     return true;
   }
 
+  Optional<FetchOne> getInFlight(URIish uri) {
+    return Optional.ofNullable(inFlight.get(uri));
+  }
+
   void notifyFinished(FetchOne op) {
     synchronized (stateLock) {
       inFlight.remove(op.getURI());
     }
+
+    Set<TransportException> fetchFailures = op.getFetchFailures();
+    fetchFailures.forEach(
+        e ->
+            repLog.warn(
+                "Replication task [" + op.getTaskIdHex() + "] completed with partial failure", e));
   }
 
   public boolean wouldFetchRef(String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
index f2fbcd1..188f69b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
@@ -32,10 +32,11 @@
   public BatchFetchClient(
       SourceConfiguration config,
       FetchFactory factory,
+      @Assisted String taskHexId,
       @Assisted URIish uri,
       @Assisted Repository git) {
     this.batchSize = config.getRefsBatchSize();
-    this.fetchClient = factory.createPlainImpl(uri, git);
+    this.fetchClient = factory.createPlainImpl(taskHexId, uri, git);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
index c404e30..24898e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
@@ -41,14 +41,17 @@
   private File localProjectDirectory;
   private URIish uri;
   private int timeout;
+  private final String taskIdHex;
 
   @Inject
   public CGitFetch(
       SourceConfiguration config,
       CredentialsFactory cpFactory,
+      @Assisted String taskIdHex,
       @Assisted URIish uri,
       @Assisted Repository git) {
     this.localProjectDirectory = git.getDirectory();
+    this.taskIdHex = taskIdHex;
     this.uri = appendCredentials(uri, cpFactory.create(config.getRemoteConfig().getName()));
     this.timeout = config.getRemoteConfig().getTimeout();
   }
@@ -59,7 +62,7 @@
     List<String> command = Lists.newArrayList("git", "fetch", uri.toPrivateASCIIString());
     command.addAll(refs);
     ProcessBuilder pb = new ProcessBuilder().command(command).directory(localProjectDirectory);
-    repLog.info("Fetch references {} from {}", refs, uri);
+    repLog.info("[{}] Fetch references {} from {}", taskIdHex, refs, uri);
     Process process = pb.start();
 
     try {
@@ -74,7 +77,7 @@
                 .lines()
                 .collect(Collectors.joining("\n"));
         throw new TransportException(
-            String.format("Cannot fetch from %s, error message: %s}", uri, errorMessage));
+            String.format("Cannot fetch from %s, error message: %s", uri, errorMessage));
       }
 
       return refsSpec.stream()
@@ -83,8 +86,11 @@
                 return new RefUpdateState(value.getSource(), RefUpdate.Result.NEW);
               })
           .collect(Collectors.toList());
+    } catch (TransportException e) {
+      throw PermanentTransportException.wrapIfPermanentTransportException(e);
     } catch (InterruptedException e) {
-      repLog.error("Thread interrupted during the fetch from: {}, refs: {}", uri, refs);
+      repLog.error(
+          "[{}] Thread interrupted during the fetch from: {}, refs: {}", taskIdHex, uri, refs);
       throw new IllegalStateException(e);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
index d356477..55f8f01 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
@@ -18,8 +18,8 @@
 import org.eclipse.jgit.transport.URIish;
 
 public interface FetchFactory {
-  Fetch create(URIish uri, Repository git);
+  Fetch create(String taskIdHex, URIish uri, Repository git);
   // Return implementation without any decorators
   @FetchClientImplementation
-  Fetch createPlainImpl(URIish uri, Repository git);
+  Fetch createPlainImpl(String taskIdHex, URIish uri, Repository git);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/InexistentRefTransportException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/InexistentRefTransportException.java
new file mode 100644
index 0000000..7c2e637
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/InexistentRefTransportException.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.internal.JGitText;
+
+public class InexistentRefTransportException extends PermanentTransportException {
+  private static final Pattern JGIT_INEXISTENT_REF_PATTERN =
+      Pattern.compile(JGitText.get().remoteDoesNotHaveSpec.replaceAll("\\{0\\}", "([^\\\\s]+)"));
+  private static final Pattern CGIT_INEXISTENT_REF_PATTERN =
+      Pattern.compile(".*fatal.*couldn't find remote ref (.*)");
+
+  private static final long serialVersionUID = 1L;
+  private final String inexistentRef;
+
+  public String getInexistentRef() {
+    return inexistentRef;
+  }
+
+  public InexistentRefTransportException(String inexistentRef, Throwable cause) {
+    super("Ref " + inexistentRef + " does not exist on remote", cause);
+
+    this.inexistentRef = inexistentRef;
+  }
+
+  public static Optional<TransportException> getOptionalPermanentFailure(TransportException e) {
+    return wrapException(JGIT_INEXISTENT_REF_PATTERN, e)
+        .or(() -> wrapException(CGIT_INEXISTENT_REF_PATTERN, e));
+  }
+
+  private static Optional<TransportException> wrapException(
+      Pattern exceptionPattern, TransportException exception) {
+    Matcher exceptionMatcher = exceptionPattern.matcher(exception.getMessage());
+    if (exceptionMatcher.matches()) {
+      return Optional.of(new InexistentRefTransportException(exceptionMatcher.group(1), exception));
+    }
+    return Optional.empty();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
index 74ad9fc..816d101 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
@@ -30,11 +30,16 @@
   URIish uri;
   Repository git;
   private final TransportProvider transportProvider;
+  private final String taskIdHex;
 
   @Inject
   public JGitFetch(
-      TransportProvider transportProvider, @Assisted URIish uri, @Assisted Repository git) {
+      TransportProvider transportProvider,
+      @Assisted String taskIdHex,
+      @Assisted URIish uri,
+      @Assisted Repository git) {
     this.transportProvider = transportProvider;
+    this.taskIdHex = taskIdHex;
     this.uri = uri;
     this.git = git;
   }
@@ -51,14 +56,11 @@
   }
 
   private FetchResult fetchVia(Transport tn, List<RefSpec> fetchRefSpecs) throws IOException {
-    repLog.info("Fetch references {} from {}", fetchRefSpecs, uri);
+    repLog.info("[{}] Fetch references {} from {}", taskIdHex, fetchRefSpecs, uri);
     try {
       return tn.fetch(NullProgressMonitor.INSTANCE, fetchRefSpecs);
     } catch (TransportException e) {
-      if (PermanentTransportException.isPermanentFailure(e)) {
-        throw new PermanentTransportException("Terminal fetch failure", e);
-      }
-      throw e;
+      throw PermanentTransportException.wrapIfPermanentTransportException(e);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java
index 1d96a02..acb68cf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java
@@ -16,7 +16,6 @@
 
 import com.jcraft.jsch.JSchException;
 import org.eclipse.jgit.errors.TransportException;
-import org.eclipse.jgit.internal.JGitText;
 
 public class PermanentTransportException extends TransportException {
   private static final long serialVersionUID = 1L;
@@ -25,10 +24,12 @@
     super(msg, cause);
   }
 
-  public static boolean isPermanentFailure(TransportException e) {
+  public static TransportException wrapIfPermanentTransportException(TransportException e) {
     Throwable cause = e.getCause();
-    String message = e.getMessage();
-    return (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:"))
-        || message.matches(JGitText.get().remoteDoesNotHaveSpec.replaceAll("\\{0\\}", ".+"));
+    if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
+      return new PermanentTransportException("Terminal fetch failure", e);
+    }
+
+    return InexistentRefTransportException.getOptionalPermanentFailure(e).orElse(e);
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b76585e..eab5fe6 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -126,7 +126,7 @@
 
 	- UnknownHostKey: thrown by Jsch when establishing an SSH connection for an
 	unknown host.
-	- Jgit transport exception when the remote ref does not exist. The assumption
+	- cGit / JGit transport exception when the remote ref does not exist. The assumption
 	here is that the remote ref does not exist so it is not worth retrying. If the
 	exception arisen as a consequence of some ACLs (mis)configuration, then after
 	fixing the ACLs, an explicit replication must be manually triggered.
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
new file mode 100644
index 0000000..33127c5
--- /dev/null
+++ b/src/main/resources/Documentation/extension-point.md
@@ -0,0 +1,47 @@
+@PLUGIN@ extension points
+==============
+
+The @PLUGIN@ plugin exposes an extension point to allow influencing its
+behaviour from another plugin or a script.
+Extension points are available only when the plugin extension points module
+is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+implemented by another plugin which depends on this as `provided`
+dependency.
+
+### Install extension libModule
+
+The @PLUGIN@ plugin's extension points are defined in the
+`c.g.g.p.r.p.ReplicationExtensionPointModule` that needs to be configured
+as libModule.
+
+Create a symbolic link from `$GERRIT_SITE/plugins/@PLUGIN@.jar` into
+`$GERRIT_SITE/lib` and then add the @PLUGIN@ extension module to the
+`gerrit.config`.
+
+Example:
+
+```
+[gerrit]
+  installModule = com.googlesource.gerrit.plugins.replication.pull.ReplicationExtensionPointModule
+```
+
+> **NOTE**: Use and configuration of the @PLUGIN@ plugin as library module
+requires a Gerrit server restart and does not support hot plugin install or
+upgrade.
+
+
+### Extension points
+
+* `com.googlesource.gerrit.plugins.replication.pull.ReplicationFetchFilter`
+
+  Filter out the refs fetched from a remote instance.
+  Only one filter at a time is supported. Filter implementation needs to
+  bind a `DynamicItem`.
+
+  Default: no filtering
+
+  Example:
+
+  ```
+  DynamicItem.bind(binder(), ReplicationFetchFilter.class).to(ReplicationFetchFilterImpl.class);
+  ```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
index 3f40848..d81a253 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
@@ -60,6 +60,7 @@
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.CGitFetchIT$TestModule")
 public class CGitFetchIT extends FetchITBase {
   private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final String TEST_TASK_ID = "taskid";
 
   @Test
   public void shouldFetchRef() throws Exception {
@@ -71,7 +72,8 @@
       RevCommit sourceCommit = pushResult.getCommit();
       String sourceRef = pushResult.getPatchSet().refName();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(sourceRef + ":" + sourceRef)));
 
@@ -91,7 +93,8 @@
 
       createChange();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(nonExistingRef)));
     }
@@ -105,7 +108,8 @@
       Result pushResult = createChange();
       String sourceRef = pushResult.getPatchSet().refName();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish("/not_existing_path/"), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish("/not_existing_path/"), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(sourceRef + ":" + sourceRef)));
     }
@@ -122,7 +126,8 @@
       Result pushResultTwo = createChange();
       String sourceRefTwo = pushResultTwo.getPatchSet().refName();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(
           Lists.newArrayList(
@@ -158,11 +163,12 @@
     Repository repo = mock(Repository.class);
     FetchFactory fetchFactory = mock(FetchFactory.class);
     Fetch fetchClient = mock(Fetch.class);
-    when(fetchFactory.createPlainImpl(uri, repo)).thenReturn(fetchClient);
+    when(fetchFactory.createPlainImpl(TEST_TASK_ID, uri, repo)).thenReturn(fetchClient);
     when(fetchClient.fetch(any())).thenReturn(fetchResultList);
 
     Fetch objectUnderTest =
-        new BatchFetchClient(sourceConfig, fetchFactory, new URIish(testRepoPath.toString()), repo);
+        new BatchFetchClient(
+            sourceConfig, fetchFactory, TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
     objectUnderTest.fetch(
         Lists.newArrayList(
@@ -185,7 +191,8 @@
     String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
 
     try (Repository repo = repoManager.openRepository(project)) {
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(newBranch + ":" + newBranch)));
 
@@ -209,7 +216,8 @@
     gApi.projects().name(testProjectName).branch(newBranch).create(input);
 
     try (Repository repo = repoManager.openRepository(project)) {
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(
           Lists.newArrayList(new RefSpec("non_existing_branch" + ":" + "non_existing_branch")));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
new file mode 100644
index 0000000..3980f93
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchOneTest.java
@@ -0,0 +1,848 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.util.IdGenerator;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.InexistentRefTransportException;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.eclipse.jgit.errors.PackProtocolException;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class FetchOneTest {
+  private final String TEST_PROJECT_NAME = "FetchOneTest";
+  private final Project.NameKey PROJECT_NAME = Project.NameKey.parse(TEST_PROJECT_NAME);
+  private final String TEST_REF = "refs/heads/refForReplicationTask";
+  private final String URI_PATTERN = "http://test.com/" + TEST_PROJECT_NAME + ".git";
+
+  @Mock private GitRepositoryManager grm;
+  @Mock private Repository repository;
+  @Mock private Source source;
+  @Mock private SourceConfiguration sourceConfiguration;
+  @Mock private PerThreadRequestScope.Scoper scoper;
+  @Mock private IdGenerator idGenerator;
+  @Mock private ReplicationStateListeners replicationStateListeners;
+  @Mock private FetchFactory fetchFactory;
+  @Mock private PullReplicationApiRequestMetrics pullReplicationApiRequestMetrics;
+  @Mock private RemoteConfig remoteConfig;
+  @Mock private DynamicItem<ReplicationFetchFilter> replicationFilter;
+
+  private URIish urIish;
+  private FetchOne objectUnderTest;
+
+  @Before
+  public void setup() throws Exception {
+    FetchReplicationMetrics fetchReplicationMetrics =
+        new FetchReplicationMetrics("pull-replication", new DisabledMetricMaker());
+    urIish = new URIish(URI_PATTERN);
+
+    grm = mock(GitRepositoryManager.class);
+    source = mock(Source.class);
+    sourceConfiguration = mock(SourceConfiguration.class);
+    scoper = mock(PerThreadRequestScope.Scoper.class);
+    idGenerator = mock(IdGenerator.class);
+    replicationStateListeners = mock(ReplicationStateListeners.class);
+    fetchFactory = mock(FetchFactory.class);
+    pullReplicationApiRequestMetrics = mock(PullReplicationApiRequestMetrics.class);
+    remoteConfig = mock(RemoteConfig.class);
+    replicationFilter = mock(DynamicItem.class);
+
+    when(sourceConfiguration.getRemoteConfig()).thenReturn(remoteConfig);
+    when(idGenerator.next()).thenReturn(1);
+    int maxLockRetries = 1;
+    when(source.getLockErrorMaxRetries()).thenReturn(maxLockRetries);
+
+    objectUnderTest =
+        new FetchOne(
+            grm,
+            source,
+            sourceConfiguration,
+            scoper,
+            idGenerator,
+            replicationStateListeners,
+            fetchReplicationMetrics,
+            fetchFactory,
+            PROJECT_NAME,
+            urIish,
+            Optional.of(pullReplicationApiRequestMetrics));
+  }
+
+  @Test
+  public void shouldIncludeTheTaskIndexInItsStringRepresentation() {
+    String expected = "[" + objectUnderTest.getTaskIdHex() + "] fetch " + URI_PATTERN;
+
+    assertThat(objectUnderTest.toString()).isEqualTo(expected);
+  }
+
+  @Test
+  public void shouldIncludeTheRetryCountInItsStringRepresentationWhenATaskIsRetried() {
+    objectUnderTest.setToRetry();
+    String expected = "(retry 1) [" + objectUnderTest.getTaskIdHex() + "] fetch " + URI_PATTERN;
+
+    assertThat(objectUnderTest.toString()).isEqualTo(expected);
+  }
+
+  @Test
+  public void shouldAddARefToTheDeltaIfItsNotTheAllRefs() {
+    Set<String> refs = Set.of(TEST_REF);
+    objectUnderTest.addRefs(refs);
+
+    assertThat(refs).isEqualTo(objectUnderTest.getRefs());
+  }
+
+  @Test
+  public void shouldIgnoreEveryRefButTheAllRefsWhenAddingARef() {
+    objectUnderTest.addRefs(Set.of(TEST_REF, FetchOne.ALL_REFS));
+
+    assertThat(Set.of(FetchOne.ALL_REFS)).isEqualTo(objectUnderTest.getRefs());
+  }
+
+  @Test
+  public void shouldReturnExistingStates() {
+    assertThat(createTestStates(TEST_REF, 1)).isEqualTo(objectUnderTest.getStates().get(TEST_REF));
+  }
+
+  @Test
+  public void shouldKeepMultipleStatesInInsertionOrderForARef() {
+    List<ReplicationState> states = createTestStates(TEST_REF, 2);
+
+    List<ReplicationState> actualStates = objectUnderTest.getStates().get(TEST_REF);
+
+    assertThat(actualStates).containsExactlyElementsIn(states).inOrder();
+  }
+
+  @Test
+  public void shouldReturnStatesInAnArray() {
+    List<ReplicationState> states = createTestStates(TEST_REF, 2);
+
+    ReplicationState[] actualStates = objectUnderTest.getStatesAsArray();
+
+    assertThat(actualStates).asList().containsExactly(states.toArray());
+  }
+
+  @Test
+  public void shouldClearTheStates() {
+    createTestStates(TEST_REF, 2);
+
+    objectUnderTest.removeStates();
+
+    assertThat(objectUnderTest.getStates().isEmpty()).isTrue();
+  }
+
+  @Test
+  public void shouldNotifyTheSourceWhenTaskIsCancelled() {
+    objectUnderTest.cancel();
+
+    verify(source).fetchWasCanceled(objectUnderTest);
+    assertThat(objectUnderTest.wasCanceled()).isTrue();
+  }
+
+  @Test
+  public void shouldRunAReplicationTaskForAllRefsIfDeltaIsEmpty() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(FetchOne.ALL_REFS, 1);
+    setupFetchFactoryMock(Collections.emptyList());
+
+    objectUnderTest.run();
+
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            FetchOne.ALL_REFS,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NO_CHANGE);
+  }
+
+  @Test
+  public void shouldRescheduleReplicationTaskAndExitIfTheQueueLockCantBeObtained()
+      throws Exception {
+    setupMocks(false);
+
+    objectUnderTest.run();
+
+    verify(source, never()).notifyFinished(objectUnderTest);
+    verify(source).reschedule(objectUnderTest, Source.RetryReason.COLLISION);
+  }
+
+  @Test
+  public void shouldNotRescheduleAnAlreadyCancelledReplicationTaskIfTheQueueLockCantBeObtained()
+      throws Exception {
+    setupMocks(false);
+    objectUnderTest.canceledByReplication();
+
+    objectUnderTest.run();
+
+    verify(source, never()).notifyFinished(objectUnderTest);
+    verify(source, never()).reschedule(objectUnderTest, Source.RetryReason.COLLISION);
+  }
+
+  @Test
+  public void shouldRunTheFetchOperationEvenWhenStateIsEmpty() throws Exception {
+    setupMocks(true);
+    Fetch mockFetch =
+        setupFetchFactoryMock(
+            List.of(
+                new FetchFactoryEntry.Builder()
+                    .withRefSpecName(TEST_REF)
+                    .withRemoteName("testRemote")
+                    .withResult(RefUpdate.Result.NEW)
+                    .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    verify(mockFetch).fetch(List.of(new RefSpec(TEST_REF)));
+  }
+
+  @Test
+  public void
+      shouldSetTheReplicationFetchResultStatusToNotAttemptedAndThenFailedForARefForWhichThereIsNoState()
+          throws Exception {
+    setupMocks(true);
+    String someRef = "refs/heads/someRef";
+    List<ReplicationState> states = createTestStates(someRef, 1);
+    setupFetchFactoryMock(
+        List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            someRef,
+            urIish,
+            ReplicationState.RefFetchResult.NOT_ATTEMPTED,
+            null);
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, someRef, urIish, ReplicationState.RefFetchResult.FAILED, null);
+  }
+
+  @Test(expected = InternalError.class)
+  public void shouldThrowAnExceptionForUnrecoverableErrors() {
+    setupFailingScopeMock();
+
+    objectUnderTest.run();
+  }
+
+  @Test
+  public void shouldFilterOutRefsFromFetchReplicationDelta() throws Exception {
+    setupMocks(true);
+    String filteredRef = "refs/heads/filteredRef";
+    Set<String> refSpecs = Set.of(TEST_REF, filteredRef);
+    List<ReplicationState> states =
+        Stream.concat(
+                createTestStates(TEST_REF, 1).stream(), createTestStates(filteredRef, 1).stream())
+            .collect(Collectors.toList());
+    setupFetchFactoryMock(
+        List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()),
+        Optional.of(List.of(TEST_REF)));
+    objectUnderTest.addRefs(refSpecs);
+    objectUnderTest.setReplicationFetchFilter(replicationFilter);
+    ReplicationFetchFilter mockFilter = mock(ReplicationFetchFilter.class);
+    when(replicationFilter.get()).thenReturn(mockFilter);
+    when(mockFilter.filter(TEST_PROJECT_NAME, refSpecs)).thenReturn(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(states.get(1))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, filteredRef, urIish, ReplicationState.RefFetchResult.FAILED, null);
+  }
+
+  @Test
+  public void shouldMarkTheReplicationStatusAsSucceededOnSuccessfulReplicationOfARef()
+      throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    setupFetchFactoryMock(
+        List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+  }
+
+  @Test
+  public void shouldMarkAllTheStatesOfARefAsReplicatedSuccessfullyOnASuccessfulReplication()
+      throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 2);
+    setupFetchFactoryMock(
+        List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(states.get(1))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+  }
+
+  @Test
+  public void shouldUpdateTheStateOfAllRefsOnSuccessfulReplication() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states =
+        Stream.concat(
+                createTestStates(TEST_REF, 1).stream(),
+                createTestStates(FetchOne.ALL_REFS, 1).stream())
+            .collect(Collectors.toList());
+    setupFetchFactoryMock(
+        List.of(new FetchFactoryEntry.Builder().refSpecNameWithDefaults(TEST_REF).build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(states.get(1))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            FetchOne.ALL_REFS,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+  }
+
+  @Test
+  public void shouldMarkReplicationStateAsRejectedWhenTheObjectIsNotInRepository()
+      throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 2);
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.REJECTED_MISSING_OBJECT)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.FAILED,
+            RefUpdate.Result.REJECTED_MISSING_OBJECT);
+  }
+
+  @Test
+  public void shouldMarkReplicationStateAsRejectedWhenFailedForUnknownReason() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.REJECTED_OTHER_REASON)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.FAILED,
+            RefUpdate.Result.REJECTED_OTHER_REASON);
+  }
+
+  @Test
+  public void shouldMarkReplicationStateOfAllRefsAsRejectedForAnyFailedTask() throws Exception {
+    setupMocks(true);
+    String failingRef = "refs/heads/failingRef";
+    String forcedRef = "refs/heads/forcedRef";
+    List<ReplicationState> states =
+        Stream.of(
+                createTestStates(TEST_REF, 1),
+                createTestStates(failingRef, 1),
+                createTestStates(forcedRef, 1),
+                createTestStates(FetchOne.ALL_REFS, 1))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.NEW)
+                .build(),
+            new FetchFactoryEntry.Builder()
+                .withRefNames(failingRef)
+                .withResult(RefUpdate.Result.REJECTED_MISSING_OBJECT)
+                .build(),
+            new FetchFactoryEntry.Builder()
+                .withRefNames(forcedRef)
+                .withResult(RefUpdate.Result.FORCED)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF, failingRef, forcedRef));
+
+    objectUnderTest.run();
+
+    assertFinishedWithEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(states.get(1))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            failingRef,
+            urIish,
+            ReplicationState.RefFetchResult.FAILED,
+            RefUpdate.Result.REJECTED_MISSING_OBJECT);
+    verify(states.get(2))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            forcedRef,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.FORCED);
+    verify(states.get(3))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            FetchOne.ALL_REFS,
+            urIish,
+            ReplicationState.RefFetchResult.FAILED,
+            RefUpdate.Result.FORCED);
+  }
+
+  @Test
+  public void shouldRetryOnLockingFailures() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.LOCK_FAILURE)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+    verify(source).reschedule(objectUnderTest, Source.RetryReason.TRANSPORT_ERROR);
+  }
+
+  @Test
+  public void shouldNotRetryWhenMaxLockRetriesLimitIsReached() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.LOCK_FAILURE)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    Stream.of(1, 1).forEach(e -> objectUnderTest.run());
+
+    verify(source, times(2)).notifyFinished(objectUnderTest);
+    verify(states.get(0), times(2))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+    verify(source).reschedule(objectUnderTest, Source.RetryReason.TRANSPORT_ERROR);
+  }
+
+  @Test
+  public void shouldNotRetryOnLockingFailuresIfTheTaskWasCancelledWhileRunning() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.LOCK_FAILURE)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+    objectUnderTest.setCanceledWhileRunning();
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+    verify(source, never()).reschedule(any(), any());
+  }
+
+  @Test
+  public void shouldNotRetryForUnexpectedIOErrors() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    setupFetchFactoryMock(
+        List.of(
+            new FetchFactoryEntry.Builder()
+                .withRefNames(TEST_REF)
+                .withResult(RefUpdate.Result.IO_FAILURE)
+                .build()));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+    verify(source, never()).reschedule(any(), any());
+  }
+
+  @Test
+  public void shouldTreatInexistentRefsAsFailures() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    Fetch fetch =
+        setupFetchFactoryMock(
+            List.of(
+                new FetchFactoryEntry.Builder()
+                    .withRefNames(TEST_REF)
+                    .withResult(RefUpdate.Result.NEW)
+                    .build()));
+    when(fetch.fetch(anyList()))
+        .thenThrow(new InexistentRefTransportException(TEST_REF, new Throwable("boom")));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+  }
+
+  @Test
+  public void shouldRemoveAnInexistentRefFromTheDeltaAndCarryOn() throws Exception {
+    setupMocks(true);
+    String inexistentRef = "refs/heads/inexistentRef";
+    List<ReplicationState> states =
+        Stream.of(createTestStates(inexistentRef, 1), createTestStates(TEST_REF, 1))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+    Fetch fetch =
+        setupFetchFactoryMock(
+            List.of(
+                new FetchFactoryEntry.Builder()
+                    .withRefNames(inexistentRef)
+                    .withResult(RefUpdate.Result.NEW)
+                    .build(),
+                new FetchFactoryEntry.Builder()
+                    .withRefNames(TEST_REF)
+                    .withResult(RefUpdate.Result.NEW)
+                    .build()));
+    when(fetch.fetch(anyList()))
+        .thenThrow(new InexistentRefTransportException(TEST_REF, new Throwable("boom")))
+        .thenReturn(List.of(new RefUpdateState(TEST_REF, RefUpdate.Result.NEW)));
+    objectUnderTest.addRefs(Set.of(inexistentRef, TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            inexistentRef,
+            urIish,
+            ReplicationState.RefFetchResult.NOT_ATTEMPTED,
+            null);
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, inexistentRef, urIish, ReplicationState.RefFetchResult.FAILED, null);
+    verify(states.get(1))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME,
+            TEST_REF,
+            urIish,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+  }
+
+  @Test
+  public void shouldRescheduleCertainTypesOfTransportException() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    Fetch fetch =
+        setupFetchFactoryMock(
+            List.of(
+                new FetchFactoryEntry.Builder()
+                    .withRefNames(TEST_REF)
+                    .withResult(RefUpdate.Result.NEW)
+                    .build()));
+    when(fetch.fetch(anyList())).thenThrow(new PackProtocolException(urIish, "boom"));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0))
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+    verify(source).reschedule(objectUnderTest, Source.RetryReason.TRANSPORT_ERROR);
+  }
+
+  @Test
+  public void shouldNotMarkReplicationTaskAsFailedIfItIsBeingRetried() throws Exception {
+    setupMocks(true);
+    List<ReplicationState> states = createTestStates(TEST_REF, 1);
+    Fetch fetch =
+        setupFetchFactoryMock(
+            List.of(
+                new FetchFactoryEntry.Builder()
+                    .withRefNames(TEST_REF)
+                    .withResult(RefUpdate.Result.NEW)
+                    .build()));
+    when(fetch.fetch(anyList())).thenThrow(new PackProtocolException(urIish, "boom"));
+    objectUnderTest.addRefs(Set.of(TEST_REF));
+    objectUnderTest.setToRetry();
+
+    objectUnderTest.run();
+
+    assertFinishedWithNonEmptyStateAndNoFailures();
+    verify(states.get(0), never())
+        .notifyRefReplicated(
+            TEST_PROJECT_NAME, TEST_REF, urIish, ReplicationState.RefFetchResult.FAILED, null);
+  }
+
+  private void setupRequestScopeMock() {
+    when(scoper.scope(any()))
+        .thenAnswer(
+            (Answer<Callable<Object>>)
+                invocation -> {
+                  Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
+                  return originalCall;
+                });
+  }
+
+  private void setupFailingScopeMock() {
+    when(scoper.scope(any())).thenThrow(new InternalError());
+  }
+
+  private void setupMocks(boolean runawayAllowed) throws Exception {
+    setupRequestScopeMock();
+    setupSourceMock(runawayAllowed);
+    setupGitRepoManagerMock();
+  }
+
+  private void setupSourceMock(boolean allowed) {
+    when(source.requestRunway(any())).thenReturn(allowed);
+  }
+
+  private void setupGitRepoManagerMock() throws IOException {
+    when(grm.openRepository(PROJECT_NAME)).thenReturn(repository);
+  }
+
+  private List<ReplicationState> createTestStates(String ref, int numberOfStates) {
+    List<ReplicationState> states =
+        IntStream.rangeClosed(1, numberOfStates)
+            .mapToObj(i -> Mockito.mock(ReplicationState.class))
+            .collect(Collectors.toList());
+    states.forEach(rs -> objectUnderTest.addState(ref, rs));
+
+    return states;
+  }
+
+  private void setupRemoteConfigMock(List<RefSpec> refSpecs) {
+    when(remoteConfig.getFetchRefSpecs()).thenReturn(refSpecs);
+    when(remoteConfig.getName()).thenReturn(PROJECT_NAME.get());
+  }
+
+  private Fetch setupFetchFactoryMock(List<FetchFactoryEntry> fetchFactoryEntries)
+      throws Exception {
+    return setupFetchFactoryMock(fetchFactoryEntries, Optional.empty());
+  }
+
+  private Fetch setupFetchFactoryMock(
+      List<FetchFactoryEntry> fetchFactoryEntries, Optional<List<String>> filteredRefs)
+      throws Exception {
+    List<RefSpec> refSpecs =
+        fetchFactoryEntries.stream()
+            .map(ffe -> new RefSpec(ffe.getRefSpecName()))
+            .collect(Collectors.toList());
+    List<RefUpdateState> refUpdateStates =
+        fetchFactoryEntries.stream()
+            .map(ffe -> new RefUpdateState(ffe.getRemoteName(), ffe.getResult()))
+            .collect(Collectors.toList());
+    List<RefSpec> filteredRefSpecs =
+        filteredRefs
+            .map(refList -> refList.stream().map(RefSpec::new).collect(Collectors.toList()))
+            .orElse(refSpecs);
+
+    setupRemoteConfigMock(refSpecs);
+    Fetch mockFetch = mock(Fetch.class);
+    when(fetchFactory.create(objectUnderTest.getTaskIdHex(), urIish, repository))
+        .thenReturn(mockFetch);
+    when(mockFetch.fetch(argThat(rs -> rs.containsAll(filteredRefSpecs))))
+        .thenReturn(refUpdateStates);
+    return mockFetch;
+  }
+
+  private void assertFinishedWithEmptyStateAndNoFailures() {
+    assertFinishedWithStateAndFailures(true, true);
+  }
+
+  private void assertFinishedWithNonEmptyStateAndNoFailures() {
+    assertFinishedWithStateAndFailures(false, true);
+  }
+
+  private void assertFinishedWithNonEmptyStateAndFailures() {
+    assertFinishedWithStateAndFailures(false, false);
+  }
+
+  private void assertFinishedWithStateAndFailures(boolean emptyState, boolean noFailures) {
+    assertThat(objectUnderTest.getStates().isEmpty()).isEqualTo(emptyState);
+    verify(source).notifyFinished(objectUnderTest);
+    assertThat(objectUnderTest.getFetchFailures().isEmpty()).isEqualTo(noFailures);
+  }
+}
+
+class FetchFactoryEntry {
+  private String refSpecName;
+  private String remoteName;
+  private RefUpdate.Result result;
+
+  public String getRefSpecName() {
+    return refSpecName;
+  }
+
+  public String getRemoteName() {
+    return remoteName;
+  }
+
+  public RefUpdate.Result getResult() {
+    return result;
+  }
+
+  private FetchFactoryEntry(Builder builder) {
+    this.refSpecName = builder.refSpecName;
+    this.remoteName = builder.remoteName;
+    this.result = builder.result;
+  }
+
+  public static class Builder {
+    private String refSpecName;
+    private String remoteName;
+    private RefUpdate.Result result;
+
+    public Builder withRefSpecName(String refSpecName) {
+      this.refSpecName = refSpecName;
+      return this;
+    }
+
+    public Builder withRemoteName(String remoteName) {
+      this.remoteName = remoteName;
+      return this;
+    }
+
+    public Builder withResult(RefUpdate.Result result) {
+      this.result = result;
+      return this;
+    }
+
+    public Builder refSpecNameWithDefaults(String refSpecName) {
+      this.refSpecName = refSpecName;
+      this.remoteName = refSpecName;
+      this.result = RefUpdate.Result.NEW;
+      return this;
+    }
+
+    public Builder withRefNames(String refSpecName) {
+      this.refSpecName = refSpecName;
+      this.remoteName = refSpecName;
+      return this;
+    }
+
+    public FetchFactoryEntry build() {
+      return new FetchFactoryEntry(this);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java
index ec695a6..76ff02b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java
@@ -45,6 +45,7 @@
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.JGitFetchIT$TestModule")
 public class JGitFetchIT extends FetchITBase {
   private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final String TEST_TASK_ID = "taskid";
 
   @Test(expected = PermanentTransportException.class)
   public void shouldThrowPermanentTransportExceptionWhenRefDoesNotExists() throws Exception {
@@ -52,7 +53,8 @@
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
     String nonExistingRef = "refs/changes/02/20000/1:refs/changes/02/20000/1";
     try (Repository repo = repoManager.openRepository(project)) {
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(nonExistingRef)));
     }
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java
index fb4fb04..09a465c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
+import com.googlesource.gerrit.plugins.replication.pull.fetch.InexistentRefTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.PermanentTransportException;
 import com.jcraft.jsch.JSchException;
 import org.eclipse.jgit.errors.TransportException;
@@ -26,17 +27,26 @@
   @Test
   public void shouldConsiderSchUnknownHostAsPermanent() {
     assertThat(
-            PermanentTransportException.isPermanentFailure(
+            PermanentTransportException.wrapIfPermanentTransportException(
                 new TransportException(
                     "SSH error", new JSchException("UnknownHostKey: some.place"))))
-        .isTrue();
+        .isInstanceOf(PermanentTransportException.class);
   }
 
   @Test
-  public void shouldConsiderNotExistingRefsAsPermanent() {
+  public void shouldConsiderNotExistingRefsFromJGitAsPermanent() {
     assertThat(
-            PermanentTransportException.isPermanentFailure(
+            PermanentTransportException.wrapIfPermanentTransportException(
                 new TransportException("Remote does not have refs/heads/foo available for fetch.")))
-        .isTrue();
+        .isInstanceOf(InexistentRefTransportException.class);
+  }
+
+  @Test
+  public void shouldConsiderNotExistingRefsFromCGitAsPermanent() {
+    assertThat(
+            PermanentTransportException.wrapIfPermanentTransportException(
+                new TransportException(
+                    "Cannot fetch from repo, error message: fatal: couldn't find remote ref refs/heads/foobranch")))
+        .isInstanceOf(InexistentRefTransportException.class);
   }
 }