Add replication taskId during the execution of the fetch

The replication taskId was missing during the actual
execution of the fetch operation and in other minor places.
Inject and propagate the replication task id everywhere so
that the Gerrit admin can understand and troubleshoot replication
issues.

Change-Id: I55c996744ae33b7168337be8c2ee019c3ec8d312
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index a99f237..24da012 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -135,14 +135,15 @@
 
   @Override
   public void cancel() {
-    repLog.info("Replication {} was canceled", getURI());
+    repLog.info("[{}] Replication task from {} was canceled", taskIdHex, getURI());
     canceledByReplication();
     pool.fetchWasCanceled(this);
   }
 
   @Override
   public void setCanceledWhileRunning() {
-    repLog.info("Replication {} was canceled while being executed", getURI());
+    repLog.info(
+        "[{}] Replication task from {} was canceled while being executed", taskIdHex, getURI());
     canceledWhileRunning.set(true);
   }
 
@@ -201,10 +202,10 @@
     if (ALL_REFS.equals(ref)) {
       delta.clear();
       fetchAllRefs = true;
-      repLog.trace("Added all refs for replication from {}", uri);
+      repLog.trace("[{}] Added all refs for replication from {}", taskIdHex, uri);
     } else if (!fetchAllRefs) {
       delta.add(ref);
-      repLog.trace("Added ref {} for replication from {}", ref, uri);
+      repLog.trace("[{}] Added ref {} for replication from {}", taskIdHex, ref, uri);
     }
   }
 
@@ -301,7 +302,7 @@
     if (!pool.requestRunway(this)) {
       if (!canceled) {
         repLog.info(
-            "Rescheduling [{}] replication to {} to avoid collision with an in-flight fetch.",
+            "[{}] Rescheduling replication to {} to avoid collision with an in-flight fetch.",
             taskIdHex,
             uri);
         pool.reschedule(this, Source.RetryReason.COLLISION);
@@ -310,7 +311,7 @@
     }
 
     repLog.info(
-        "Replication [{}] from {} started for refs [{}] ...",
+        "[{}] Replication from {} started for refs [{}] ...",
         taskIdHex,
         uri,
         String.join(",", getRefs()));
@@ -327,7 +328,7 @@
               .flatMap(metrics -> metrics.stop(config.getName()))
               .map(NANOSECONDS::toMillis);
       repLog.info(
-          "Replication [{}] from {} completed in {}ms, {}ms delay, {} retries{}",
+          "[{}] Replication from {} completed in {}ms, {}ms delay, {} retries{}",
           taskIdHex,
           uri,
           elapsed,
@@ -336,7 +337,12 @@
           elapsedEnd2End.map(el -> String.format(", E2E %dms", el)).orElse(""));
     } catch (RepositoryNotFoundException e) {
       stateLog.error(
-          "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
+          "["
+              + taskIdHex
+              + "] Cannot replicate "
+              + projectName
+              + "; Local repository error: "
+              + e.getMessage(),
           getStatesAsArray());
 
     } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
@@ -345,9 +351,9 @@
       // raised.
       String msg = e.getMessage();
       repLog.error(
-          "Cannot replicate [{}] {}; Remote repository error: {}", taskIdHex, projectName, msg);
+          "[{}] Cannot replicate {}; Remote repository error: {}", taskIdHex, projectName, msg);
     } catch (NotSupportedException e) {
-      stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+      stateLog.error("[" + taskIdHex + "] Cannot replicate  from " + uri, e, getStatesAsArray());
     } catch (PermanentTransportException e) {
       repLog.error(
           String.format("Terminal failure. Cannot replicate [%s] from %s", taskIdHex, uri), e);
@@ -356,7 +362,7 @@
         lockRetryCount++;
         // The LockFailureException message contains both URI and reason
         // for this failure.
-        repLog.error("Cannot replicate [{}] from {}: {}", taskIdHex, uri, e.getMessage());
+        repLog.error("[{}] Cannot replicate from {}: {}", taskIdHex, uri, e.getMessage());
 
         // The remote fetch operation should be retried.
         if (lockRetryCount <= maxLockRetries) {
@@ -367,7 +373,8 @@
           }
         } else {
           repLog.error(
-              "Giving up after {} occurrences of this error: {} during replication from [{}] {}",
+              "[{}] Giving up after {} occurrences of this error: {} during replication from [{}] {}",
+              taskIdHex,
               lockRetryCount,
               e.getMessage(),
               taskIdHex,
@@ -383,9 +390,12 @@
         }
       }
     } catch (IOException e) {
-      stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+      stateLog.error("[" + taskIdHex + "] Cannot replicate from " + uri, e, getStatesAsArray());
     } catch (RuntimeException | Error e) {
-      stateLog.error("Unexpected error during replication from " + uri, e, getStatesAsArray());
+      stateLog.error(
+          "[" + taskIdHex + "] Unexpected error during replication from " + uri,
+          e,
+          getStatesAsArray());
     } finally {
       if (git != null) {
         git.close();
@@ -395,7 +405,7 @@
   }
 
   private void logCanceledWhileRunningException(TransportException e) {
-    repLog.info("Cannot replicate [{}] from {}. It was canceled while running", taskIdHex, uri, e);
+    repLog.info("[{}] Cannot replicate from {}. It was canceled while running", taskIdHex, uri, e);
   }
 
   private void runImpl() throws IOException {
@@ -404,7 +414,7 @@
       return;
     }
 
-    Fetch fetch = fetchFactory.create(uri, git);
+    Fetch fetch = fetchFactory.create(taskIdHex, uri, git);
     List<RefSpec> fetchRefSpecs = getFetchRefSpecs();
 
     try {
@@ -412,10 +422,10 @@
     } catch (InexistentRefTransportException e) {
       String inexistentRef = e.getInexistentRef();
       repLog.info(
-          "Remote {} does not have ref {} in replication task [{}], flagging as failed and removing from the replication task",
+          "[{}] Remote {} does not have ref {} in replication task, flagging as failed and removing from the replication task",
+          taskIdHex,
           uri,
-          inexistentRef,
-          taskIdHex);
+          inexistentRef);
       fetchFailures.add(e);
       delta.remove(inexistentRef);
       runImpl();
@@ -472,7 +482,8 @@
         case REJECTED_MISSING_OBJECT:
           stateLog.error(
               String.format(
-                  "Failed replicate %s from %s: result %s", uri, u.getRemoteName(), u.getResult()),
+                  "[%s] Failed replicate %s from %s: result %s",
+                  taskIdHex, uri, u.getRemoteName(), u.getResult()),
               logStatesArray);
           fetchStatus = ReplicationState.RefFetchResult.FAILED;
           anyRefFailed = true;
@@ -486,7 +497,8 @@
         case REJECTED_OTHER_REASON:
           stateLog.error(
               String.format(
-                  "Failed replicate %s from %s, reason: %s", uri, u.getRemoteName(), u.toString()),
+                  "[%s] Failed replicate %s from %s, reason: %s",
+                  taskIdHex, uri, u.getRemoteName(), u.toString()),
               logStatesArray);
 
           fetchStatus = ReplicationState.RefFetchResult.FAILED;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
index f2fbcd1..188f69b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
@@ -32,10 +32,11 @@
   public BatchFetchClient(
       SourceConfiguration config,
       FetchFactory factory,
+      @Assisted String taskHexId,
       @Assisted URIish uri,
       @Assisted Repository git) {
     this.batchSize = config.getRefsBatchSize();
-    this.fetchClient = factory.createPlainImpl(uri, git);
+    this.fetchClient = factory.createPlainImpl(taskHexId, uri, git);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
index 13d1a28..24898e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
@@ -41,14 +41,17 @@
   private File localProjectDirectory;
   private URIish uri;
   private int timeout;
+  private final String taskIdHex;
 
   @Inject
   public CGitFetch(
       SourceConfiguration config,
       CredentialsFactory cpFactory,
+      @Assisted String taskIdHex,
       @Assisted URIish uri,
       @Assisted Repository git) {
     this.localProjectDirectory = git.getDirectory();
+    this.taskIdHex = taskIdHex;
     this.uri = appendCredentials(uri, cpFactory.create(config.getRemoteConfig().getName()));
     this.timeout = config.getRemoteConfig().getTimeout();
   }
@@ -59,7 +62,7 @@
     List<String> command = Lists.newArrayList("git", "fetch", uri.toPrivateASCIIString());
     command.addAll(refs);
     ProcessBuilder pb = new ProcessBuilder().command(command).directory(localProjectDirectory);
-    repLog.info("Fetch references {} from {}", refs, uri);
+    repLog.info("[{}] Fetch references {} from {}", taskIdHex, refs, uri);
     Process process = pb.start();
 
     try {
@@ -86,7 +89,8 @@
     } catch (TransportException e) {
       throw PermanentTransportException.wrapIfPermanentTransportException(e);
     } catch (InterruptedException e) {
-      repLog.error("Thread interrupted during the fetch from: {}, refs: {}", uri, refs);
+      repLog.error(
+          "[{}] Thread interrupted during the fetch from: {}, refs: {}", taskIdHex, uri, refs);
       throw new IllegalStateException(e);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
index d356477..55f8f01 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
@@ -18,8 +18,8 @@
 import org.eclipse.jgit.transport.URIish;
 
 public interface FetchFactory {
-  Fetch create(URIish uri, Repository git);
+  Fetch create(String taskIdHex, URIish uri, Repository git);
   // Return implementation without any decorators
   @FetchClientImplementation
-  Fetch createPlainImpl(URIish uri, Repository git);
+  Fetch createPlainImpl(String taskIdHex, URIish uri, Repository git);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
index 3504280..816d101 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
@@ -30,11 +30,16 @@
   URIish uri;
   Repository git;
   private final TransportProvider transportProvider;
+  private final String taskIdHex;
 
   @Inject
   public JGitFetch(
-      TransportProvider transportProvider, @Assisted URIish uri, @Assisted Repository git) {
+      TransportProvider transportProvider,
+      @Assisted String taskIdHex,
+      @Assisted URIish uri,
+      @Assisted Repository git) {
     this.transportProvider = transportProvider;
+    this.taskIdHex = taskIdHex;
     this.uri = uri;
     this.git = git;
   }
@@ -51,7 +56,7 @@
   }
 
   private FetchResult fetchVia(Transport tn, List<RefSpec> fetchRefSpecs) throws IOException {
-    repLog.info("Fetch references {} from {}", fetchRefSpecs, uri);
+    repLog.info("[{}] Fetch references {} from {}", taskIdHex, fetchRefSpecs, uri);
     try {
       return tn.fetch(NullProgressMonitor.INSTANCE, fetchRefSpecs);
     } catch (TransportException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
index 3f40848..d81a253 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
@@ -60,6 +60,7 @@
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.CGitFetchIT$TestModule")
 public class CGitFetchIT extends FetchITBase {
   private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final String TEST_TASK_ID = "taskid";
 
   @Test
   public void shouldFetchRef() throws Exception {
@@ -71,7 +72,8 @@
       RevCommit sourceCommit = pushResult.getCommit();
       String sourceRef = pushResult.getPatchSet().refName();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(sourceRef + ":" + sourceRef)));
 
@@ -91,7 +93,8 @@
 
       createChange();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(nonExistingRef)));
     }
@@ -105,7 +108,8 @@
       Result pushResult = createChange();
       String sourceRef = pushResult.getPatchSet().refName();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish("/not_existing_path/"), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish("/not_existing_path/"), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(sourceRef + ":" + sourceRef)));
     }
@@ -122,7 +126,8 @@
       Result pushResultTwo = createChange();
       String sourceRefTwo = pushResultTwo.getPatchSet().refName();
 
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(
           Lists.newArrayList(
@@ -158,11 +163,12 @@
     Repository repo = mock(Repository.class);
     FetchFactory fetchFactory = mock(FetchFactory.class);
     Fetch fetchClient = mock(Fetch.class);
-    when(fetchFactory.createPlainImpl(uri, repo)).thenReturn(fetchClient);
+    when(fetchFactory.createPlainImpl(TEST_TASK_ID, uri, repo)).thenReturn(fetchClient);
     when(fetchClient.fetch(any())).thenReturn(fetchResultList);
 
     Fetch objectUnderTest =
-        new BatchFetchClient(sourceConfig, fetchFactory, new URIish(testRepoPath.toString()), repo);
+        new BatchFetchClient(
+            sourceConfig, fetchFactory, TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
     objectUnderTest.fetch(
         Lists.newArrayList(
@@ -185,7 +191,8 @@
     String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
 
     try (Repository repo = repoManager.openRepository(project)) {
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(newBranch + ":" + newBranch)));
 
@@ -209,7 +216,8 @@
     gApi.projects().name(testProjectName).branch(newBranch).create(input);
 
     try (Repository repo = repoManager.openRepository(project)) {
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
 
       objectUnderTest.fetch(
           Lists.newArrayList(new RefSpec("non_existing_branch" + ":" + "non_existing_branch")));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java
index ec695a6..76ff02b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/JGitFetchIT.java
@@ -45,6 +45,7 @@
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.JGitFetchIT$TestModule")
 public class JGitFetchIT extends FetchITBase {
   private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final String TEST_TASK_ID = "taskid";
 
   @Test(expected = PermanentTransportException.class)
   public void shouldThrowPermanentTransportExceptionWhenRefDoesNotExists() throws Exception {
@@ -52,7 +53,8 @@
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
     String nonExistingRef = "refs/changes/02/20000/1:refs/changes/02/20000/1";
     try (Repository repo = repoManager.openRepository(project)) {
-      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+      Fetch objectUnderTest =
+          fetchFactory.create(TEST_TASK_ID, new URIish(testRepoPath.toString()), repo);
       objectUnderTest.fetch(Lists.newArrayList(new RefSpec(nonExistingRef)));
     }
   }