Allow FetchOne to retry the failed ref only

FetchOne may contain more than one ref to be fetched: the failure
of one of them could block the whole set of fetches, causing an
unwanted replication delay.

Enable the possibility for a FetchOne to partially complete,
leaving a leftover of failed refs that can be rescheduled and
retried individually.

This new behaviour has the immediate advantage of not delaying
the replication of refs that have no replication issues and still
assures that the failed replication events are going to be
rescheduled for retry, if the cause of the failure was not a
permanent exception.

Also fix the raising of the correct exception when executing
FetchOne with a CGit backend.

Tested E2E using CGit and JGit and aggregating a valid ref
and a non-existent one: the valid gets replicated and the invalid
is detected and traced.

Bug: Issue 16742
Change-Id: I093ce3a780313ada18323af7aaca391791b93441
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 43db907..a99f237 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -35,6 +35,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.InexistentRefTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.PermanentTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
@@ -82,6 +83,7 @@
   private final Project.NameKey projectName;
   private final URIish uri;
   private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
+  private final Set<TransportException> fetchFailures = Sets.newHashSetWithExpectedSize(4);
   private boolean fetchAllRefs;
   private Repository git;
   private boolean retrying;
@@ -159,6 +161,10 @@
     return true;
   }
 
+  public String getTaskIdHex() {
+    return taskIdHex;
+  }
+
   @Override
   public String toString() {
     String print = "[" + taskIdHex + "] fetch " + uri;
@@ -277,6 +283,10 @@
     }
   }
 
+  public Set<TransportException> getFetchFailures() {
+    return fetchFailures;
+  }
+
   private void runFetchOperation() {
     try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) {
       doRunFetchOperation();
@@ -389,9 +399,27 @@
   }
 
   private void runImpl() throws IOException {
+    if (delta.isEmpty()) {
+      repLog.warn("Empty replication task [{}], skipping.", taskIdHex);
+      return;
+    }
+
     Fetch fetch = fetchFactory.create(uri, git);
     List<RefSpec> fetchRefSpecs = getFetchRefSpecs();
-    updateStates(fetch.fetch(fetchRefSpecs));
+
+    try {
+      updateStates(fetch.fetch(fetchRefSpecs));
+    } catch (InexistentRefTransportException e) {
+      String inexistentRef = e.getInexistentRef();
+      repLog.info(
+          "Remote {} does not have ref {} in replication task [{}], flagging as failed and removing from the replication task",
+          uri,
+          inexistentRef,
+          taskIdHex);
+      fetchFailures.add(e);
+      delta.remove(inexistentRef);
+      runImpl();
+    }
   }
 
   private List<RefSpec> getFetchRefSpecs() {
@@ -495,7 +523,10 @@
                 null);
       }
     }
-    stateMap.clear();
+
+    for (String doneRef : doneRefs) {
+      stateMap.removeAll(doneRef);
+    }
   }
 
   public static class LockFailureException extends TransportException {
@@ -505,4 +536,8 @@
       super(uri, message);
     }
   }
+
+  public Optional<PullReplicationApiRequestMetrics> getRequestMetrics() {
+    return apiRequestMetrics;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index db7728b..0162ad0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -87,6 +87,7 @@
 import java.util.function.Supplier;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.eclipse.jgit.errors.TransportException;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -592,6 +593,12 @@
     synchronized (stateLock) {
       inFlight.remove(op.getURI());
     }
+
+    Set<TransportException> fetchFailures = op.getFetchFailures();
+    fetchFailures.forEach(
+        e ->
+            repLog.warn(
+                "Replication task [" + op.getTaskIdHex() + "] completed with partial failure", e));
   }
 
   public boolean wouldFetchRef(String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
index c404e30..13d1a28 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
@@ -74,7 +74,7 @@
                 .lines()
                 .collect(Collectors.joining("\n"));
         throw new TransportException(
-            String.format("Cannot fetch from %s, error message: %s}", uri, errorMessage));
+            String.format("Cannot fetch from %s, error message: %s", uri, errorMessage));
       }
 
       return refsSpec.stream()
@@ -83,6 +83,8 @@
                 return new RefUpdateState(value.getSource(), RefUpdate.Result.NEW);
               })
           .collect(Collectors.toList());
+    } catch (TransportException e) {
+      throw PermanentTransportException.wrapIfPermanentTransportException(e);
     } catch (InterruptedException e) {
       repLog.error("Thread interrupted during the fetch from: {}, refs: {}", uri, refs);
       throw new IllegalStateException(e);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/InexistentRefTransportException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/InexistentRefTransportException.java
new file mode 100644
index 0000000..7c2e637
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/InexistentRefTransportException.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.internal.JGitText;
+
+public class InexistentRefTransportException extends PermanentTransportException {
+  private static final Pattern JGIT_INEXISTENT_REF_PATTERN =
+      Pattern.compile(JGitText.get().remoteDoesNotHaveSpec.replaceAll("\\{0\\}", "([^\\\\s]+)"));
+  private static final Pattern CGIT_INEXISTENT_REF_PATTERN =
+      Pattern.compile(".*fatal.*couldn't find remote ref (.*)");
+
+  private static final long serialVersionUID = 1L;
+  private final String inexistentRef;
+
+  public String getInexistentRef() {
+    return inexistentRef;
+  }
+
+  public InexistentRefTransportException(String inexistentRef, Throwable cause) {
+    super("Ref " + inexistentRef + " does not exist on remote", cause);
+
+    this.inexistentRef = inexistentRef;
+  }
+
+  public static Optional<TransportException> getOptionalPermanentFailure(TransportException e) {
+    return wrapException(JGIT_INEXISTENT_REF_PATTERN, e)
+        .or(() -> wrapException(CGIT_INEXISTENT_REF_PATTERN, e));
+  }
+
+  private static Optional<TransportException> wrapException(
+      Pattern exceptionPattern, TransportException exception) {
+    Matcher exceptionMatcher = exceptionPattern.matcher(exception.getMessage());
+    if (exceptionMatcher.matches()) {
+      return Optional.of(new InexistentRefTransportException(exceptionMatcher.group(1), exception));
+    }
+    return Optional.empty();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
index 74ad9fc..3504280 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
@@ -55,10 +55,7 @@
     try {
       return tn.fetch(NullProgressMonitor.INSTANCE, fetchRefSpecs);
     } catch (TransportException e) {
-      if (PermanentTransportException.isPermanentFailure(e)) {
-        throw new PermanentTransportException("Terminal fetch failure", e);
-      }
-      throw e;
+      throw PermanentTransportException.wrapIfPermanentTransportException(e);
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java
index 1d96a02..acb68cf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/PermanentTransportException.java
@@ -16,7 +16,6 @@
 
 import com.jcraft.jsch.JSchException;
 import org.eclipse.jgit.errors.TransportException;
-import org.eclipse.jgit.internal.JGitText;
 
 public class PermanentTransportException extends TransportException {
   private static final long serialVersionUID = 1L;
@@ -25,10 +24,12 @@
     super(msg, cause);
   }
 
-  public static boolean isPermanentFailure(TransportException e) {
+  public static TransportException wrapIfPermanentTransportException(TransportException e) {
     Throwable cause = e.getCause();
-    String message = e.getMessage();
-    return (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:"))
-        || message.matches(JGitText.get().remoteDoesNotHaveSpec.replaceAll("\\{0\\}", ".+"));
+    if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
+      return new PermanentTransportException("Terminal fetch failure", e);
+    }
+
+    return InexistentRefTransportException.getOptionalPermanentFailure(e).orElse(e);
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b76585e..eab5fe6 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -126,7 +126,7 @@
 
 	- UnknownHostKey: thrown by Jsch when establishing an SSH connection for an
 	unknown host.
-	- Jgit transport exception when the remote ref does not exist. The assumption
+	- cGit / JGit transport exception when the remote ref does not exist. The assumption
 	here is that the remote ref does not exist so it is not worth retrying. If the
 	exception arisen as a consequence of some ACLs (mis)configuration, then after
 	fixing the ACLs, an explicit replication must be manually triggered.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java
index fb4fb04..09a465c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PermanentFailureExceptionTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
+import com.googlesource.gerrit.plugins.replication.pull.fetch.InexistentRefTransportException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.PermanentTransportException;
 import com.jcraft.jsch.JSchException;
 import org.eclipse.jgit.errors.TransportException;
@@ -26,17 +27,26 @@
   @Test
   public void shouldConsiderSchUnknownHostAsPermanent() {
     assertThat(
-            PermanentTransportException.isPermanentFailure(
+            PermanentTransportException.wrapIfPermanentTransportException(
                 new TransportException(
                     "SSH error", new JSchException("UnknownHostKey: some.place"))))
-        .isTrue();
+        .isInstanceOf(PermanentTransportException.class);
   }
 
   @Test
-  public void shouldConsiderNotExistingRefsAsPermanent() {
+  public void shouldConsiderNotExistingRefsFromJGitAsPermanent() {
     assertThat(
-            PermanentTransportException.isPermanentFailure(
+            PermanentTransportException.wrapIfPermanentTransportException(
                 new TransportException("Remote does not have refs/heads/foo available for fetch.")))
-        .isTrue();
+        .isInstanceOf(InexistentRefTransportException.class);
+  }
+
+  @Test
+  public void shouldConsiderNotExistingRefsFromCGitAsPermanent() {
+    assertThat(
+            PermanentTransportException.wrapIfPermanentTransportException(
+                new TransportException(
+                    "Cannot fetch from repo, error message: fatal: couldn't find remote ref refs/heads/foobranch")))
+        .isInstanceOf(InexistentRefTransportException.class);
   }
 }