Fix completed tasks metrics

The completed tasks metrics was misbehaving when
retrying replication tasks, because the reschedule
was adding the metric task wrapper multiple times.

Check for double-wrapping and avoid duplications.

Change-Id: Ib7a87c667fba8a8ef0d6bb05b8a50303c2d37139
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
index 9b8134f..2d40264 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueMetrics.java
@@ -52,6 +52,27 @@
   private final Counter1<String> tasksStarted;
   private final Set<RegistrationHandle> metricsHandles;
 
+  public class RunnableWithMetrics implements Runnable {
+    private final Source source;
+    private final Runnable runnable;
+
+    public RunnableWithMetrics(Source source, Runnable runnable) {
+      this.source = source;
+      this.runnable = runnable;
+    }
+
+    @Override
+    public void run() {
+      incrementTaskStarted(source);
+      runnable.run();
+      if (runnable instanceof Completable) {
+        if (((Completable) runnable).hasSucceeded()) {
+          incrementTaskCompleted(source);
+        }
+      }
+    }
+  }
+
   @Inject
   public ReplicationQueueMetrics(
       @PluginName String pluginName, @Named(REPLICATION_QUEUE_METRICS) MetricMaker metricMaker) {
@@ -244,4 +265,12 @@
   public void incrementTaskStarted(Source source) {
     tasksStarted.increment(source.getRemoteConfigName());
   }
+
+  public Runnable runWithMetrics(Source source, Runnable runnableTask) {
+    if (runnableTask instanceof RunnableWithMetrics) {
+      return runnableTask;
+    }
+
+    return new RunnableWithMetrics(source, runnableTask);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index a60dda3..5e4314d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -504,7 +504,7 @@
         pending.put(uri, e);
         f =
             pool.schedule(
-                runWithMetrics(e),
+                queueMetrics.runWithMetrics(this, e),
                 isSyncCall(replicationType) ? 0 : config.getDelay(),
                 TimeUnit.SECONDS);
         queueMetrics.incrementTaskScheduled(this);
@@ -525,7 +525,9 @@
     @SuppressWarnings("unused")
     ScheduledFuture<?> ignored =
         pool.schedule(
-            runWithMetrics(deleteProjectFactory.create(this, uri, project)), 0, TimeUnit.SECONDS);
+            queueMetrics.runWithMetrics(this, deleteProjectFactory.create(this, uri, project)),
+            0,
+            TimeUnit.SECONDS);
     queueMetrics.incrementTaskScheduled(this);
   }
 
@@ -633,7 +635,10 @@
         switch (reason) {
           case COLLISION:
             queueMetrics.incrementTaskRescheduled(this);
-            pool.schedule(runWithMetrics(fetchOp), config.getRescheduleDelay(), TimeUnit.SECONDS);
+            pool.schedule(
+                queueMetrics.runWithMetrics(this, fetchOp),
+                config.getRescheduleDelay(),
+                TimeUnit.SECONDS);
             break;
           case TRANSPORT_ERROR:
           case REPOSITORY_MISSING:
@@ -878,7 +883,8 @@
       @SuppressWarnings("unused")
       ScheduledFuture<?> ignored =
           pool.schedule(
-              runWithMetrics(updateHeadFactory.create(this, apiURI, project, newHead)),
+              queueMetrics.runWithMetrics(
+                  this, updateHeadFactory.create(this, apiURI, project, newHead)),
               0,
               TimeUnit.SECONDS);
       queueMetrics.incrementTaskScheduled(this);
@@ -941,24 +947,4 @@
       Context.unsetLocalEvent();
     }
   }
-
-  private Runnable runWithMetrics(Runnable runnableTask) {
-    return new Runnable() {
-      @Override
-      public void run() {
-        queueMetrics.incrementTaskStarted(Source.this);
-        runnableTask.run();
-        if (runnableTask instanceof Completable) {
-          if (((Completable) runnableTask).hasSucceeded()) {
-            queueMetrics.incrementTaskCompleted(Source.this);
-          }
-        }
-      }
-
-      @Override
-      public String toString() {
-        return runnableTask.toString();
-      }
-    };
-  }
 }