Log fetch tasks when graceful shutdown fails

When shutting down the pull-replication, an orderly shutdown is
initiated and the plugin waits for the fetch tasks to be completely
drained, for a period of time.

If an graceful shutdown fails however, we know nothing about which fetch
tasks were interrupted.

It would be useful to know which tasks fail, in which status they were
and which refs they were fetching.

Log pending, in-flight and never executed fetch tasks in the error_log
file, to help troubleshooting as well as facilitating admins to
understand which refs might need manual fetching.

Change-Id: Iafe33158959005f1a2e4a44e95f818a7e417ca53
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 5e65de7..a60dda3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -89,6 +89,7 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.errors.TransportException;
@@ -271,7 +272,9 @@
         cnt = pool.shutdownNow().size();
       } catch (InterruptedException e) {
         logger.atSevere().withCause(e).log("Interrupted during termination.");
-        cnt = pool.shutdownNow().size();
+        List<Runnable> fetchTasks = pool.shutdownNow();
+        logInterruptedShutdownStatus(fetchTasks);
+        cnt = fetchTasks.size();
       }
       pool = null;
     }
@@ -287,6 +290,19 @@
     return cnt;
   }
 
+  private void logInterruptedShutdownStatus(List<Runnable> fetchTasks) {
+    String neverExecutedTasks =
+        fetchTasks.stream().map(r -> r.toString()).collect(Collectors.joining(","));
+    String pendingTasks =
+        pending.values().stream().map(FetchOne::toString).collect(Collectors.joining(","));
+    String inFlightTasks =
+        inFlight.values().stream().map(FetchOne::toString).collect(Collectors.joining(","));
+
+    repLog.error("Never executed tasks: {}", neverExecutedTasks);
+    repLog.error("Pending tasks: {}", pendingTasks);
+    repLog.error("In-flight tasks: {}", inFlightTasks);
+  }
+
   private boolean isDrained() {
     int numberOfPending = pending.size();
     int numberOfInFlight = inFlight.size();
@@ -927,14 +943,22 @@
   }
 
   private Runnable runWithMetrics(Runnable runnableTask) {
-    return () -> {
-      queueMetrics.incrementTaskStarted(Source.this);
-      runnableTask.run();
-      if (runnableTask instanceof Completable) {
-        if (((Completable) runnableTask).hasSucceeded()) {
-          queueMetrics.incrementTaskCompleted(Source.this);
+    return new Runnable() {
+      @Override
+      public void run() {
+        queueMetrics.incrementTaskStarted(Source.this);
+        runnableTask.run();
+        if (runnableTask instanceof Completable) {
+          if (((Completable) runnableTask).hasSucceeded()) {
+            queueMetrics.incrementTaskCompleted(Source.this);
+          }
         }
       }
+
+      @Override
+      public String toString() {
+        return runnableTask.toString();
+      }
     };
   }
 }