Collect metrics asynchronously

Submit metrics collection tasks to the UpdateGitMetricsExecutor.
This will allow the parallelization of metrics collection
by tuning the pool size dedicated to the plugin.

The thread pool is currently hardcoded but it will become
configurable [1].

With this implementation the only parallelization allowed
is at MetricsCollector level.

Currently we have 2 MetricsCollectors, hence `2 * numberOfUpdatedProjects`
tasks will concur on the same thread pool.

We could introduce more concurrency on the File System metrics
collection by processing the `Stream<Path>` in parallel.
However, since these metrics are not real-time by design, it is
probably better to avoid being too aggressive on the file system.

[1]: https://bugs.chromium.org/p/gerrit/issues/detail?id=16248

Change-Id: I781a56a27d2abf3cda7a68967ab805d2a46b5349
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsExecutor.java
index 82e7585..be5ed72 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsExecutor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsExecutor.java
@@ -21,4 +21,4 @@
 
 @Retention(RUNTIME)
 @BindingAnnotation
-@interface UpdateGitMetricsExecutor {}
+public @interface UpdateGitMetricsExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsTask.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsTask.java
index 7bdfb95..6163a42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/UpdateGitMetricsTask.java
@@ -66,9 +66,21 @@
       gitRepoMetricsCache.getCollectors().stream()
           .forEach(
               metricsCollector -> {
-                HashMap<GitRepoMetric, Long> metrics =
-                    metricsCollector.collect((FileRepository) unwrappedRepo, projectName);
-                gitRepoMetricsCache.setMetrics(metrics, projectName);
+                metricsCollector.collect(
+                    (FileRepository) unwrappedRepo,
+                    projectName,
+                    metrics -> {
+                      Map<GitRepoMetric, Long> newMetrics = new HashMap<>();
+                      metrics.forEach(
+                          (repoMetric, value) -> {
+                            logger.atFine().log(
+                                String.format(
+                                    "Collected %s for project %s: %d",
+                                    repoMetric.getName(), projectName, value));
+                            newMetrics.put(repoMetric, value);
+                          });
+                      gitRepoMetricsCache.setMetrics(newMetrics, projectName);
+                    });
               });
     } catch (RepositoryNotFoundException e) {
       logger.atSevere().withCause(e).log("Cannot find repository for %s", projectName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollector.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollector.java
index 2142830..b2a66c7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollector.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollector.java
@@ -16,11 +16,15 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.gitrepometrics.UpdateGitMetricsExecutor;
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
 
 public class FSMetricsCollector implements MetricsCollector {
@@ -40,64 +44,81 @@
       ImmutableList.of(
           numberOfKeepFiles, numberOfEmptyDirectories, numberOfFiles, numberOfDirectories);
 
-  @Override
-  public HashMap<GitRepoMetric, Long> collect(FileRepository repository, String projectName) {
-    HashMap<GitRepoMetric, Long> metrics = new HashMap<>();
-    HashMap<String, AtomicInteger> partialMetrics =
-        filesAndDirectoriesCount(repository, projectName);
+  private final ExecutorService executorService;
 
-    metrics.put(
-        numberOfEmptyDirectories,
-        partialMetrics.get(numberOfEmptyDirectories.getName()).longValue());
-    metrics.put(numberOfDirectories, partialMetrics.get(numberOfDirectories.getName()).longValue());
-    metrics.put(numberOfFiles, partialMetrics.get(numberOfFiles.getName()).longValue());
-    metrics.put(numberOfKeepFiles, partialMetrics.get(numberOfKeepFiles.getName()).longValue());
+  @Inject
+  public FSMetricsCollector(@UpdateGitMetricsExecutor ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
+  @Override
+  public void collect(
+      FileRepository repository,
+      String projectName,
+      Consumer<HashMap<GitRepoMetric, Long>> populateMetrics) {
+
+    executorService.submit(
+        () -> {
+          populateMetrics.accept(filesAndDirectoriesCount(repository, projectName));
+        });
+  }
+
+  private HashMap<GitRepoMetric, Long> filesAndDirectoriesCount(
+      FileRepository repository, String projectName) {
+
+    HashMap<GitRepoMetric, Long> metrics = new HashMap<GitRepoMetric, Long>();
+    try {
+      metrics =
+          Files.walk(repository.getObjectsDirectory().toPath())
+              .map(
+                  path -> {
+                    File f = path.toFile();
+                    HashMap<GitRepoMetric, Long> m = getInitializedMetrics();
+                    if (f.isFile()) {
+                      m.put(numberOfFiles, 1L);
+                      if (f.getName().endsWith(".keep")) {
+                        m.put(numberOfKeepFiles, 1L);
+                      }
+                    } else {
+                      m.put(numberOfDirectories, 1L);
+                      if (Objects.requireNonNull(f.listFiles()).length == 0) {
+                        m.put(numberOfEmptyDirectories, 1L);
+                      }
+                    }
+                    return m;
+                  })
+              .reduce(
+                  getInitializedMetrics(),
+                  (acc, lastMetric) ->
+                      new HashMap<GitRepoMetric, Long>() {
+                        {
+                          putMetric(numberOfFiles);
+                          putMetric(numberOfDirectories);
+                          putMetric(numberOfEmptyDirectories);
+                          putMetric(numberOfKeepFiles);
+                        }
+
+                        private void putMetric(GitRepoMetric metric) {
+                          put(metric, acc.get(metric) + lastMetric.get(metric));
+                        }
+                      });
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log(
+          "Error reading from file system for project " + projectName);
+    }
 
     return metrics;
   }
 
-  private HashMap<String, AtomicInteger> filesAndDirectoriesCount(
-      FileRepository repository, String projectName) {
-    HashMap<String, AtomicInteger> counter =
-        new HashMap<String, AtomicInteger>() {
-          {
-            put(numberOfFiles.getName(), new AtomicInteger(0));
-            put(numberOfDirectories.getName(), new AtomicInteger(0));
-            put(numberOfEmptyDirectories.getName(), new AtomicInteger(0));
-            put(numberOfKeepFiles.getName(), new AtomicInteger(0));
-          }
-        };
-    try {
-      Files.walk(repository.getObjectsDirectory().toPath())
-          .parallel()
-          .forEach(
-              path -> {
-                if (path.toFile().isFile()) {
-                  counter
-                      .get(numberOfFiles.getName())
-                      .updateAndGet(metricCounter -> metricCounter + 1);
-                  if (path.toFile().getName().endsWith(".keep")) {
-                    counter
-                        .get(numberOfKeepFiles.getName())
-                        .updateAndGet(metricCounter -> metricCounter + 1);
-                  }
-                }
-                if (path.toFile().isDirectory()) {
-                  counter
-                      .get(numberOfDirectories.getName())
-                      .updateAndGet(metricCounter -> metricCounter + 1);
-                  if (Objects.requireNonNull(path.toFile().listFiles()).length == 0) {
-                    counter
-                        .get(numberOfEmptyDirectories.getName())
-                        .updateAndGet(metricCounter -> metricCounter + 1);
-                  }
-                }
-              });
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Can't open object directory for project " + projectName);
-    }
-
-    return counter;
+  private HashMap<GitRepoMetric, Long> getInitializedMetrics() {
+    return new HashMap<GitRepoMetric, Long>() {
+      {
+        put(numberOfFiles, 0L);
+        put(numberOfDirectories, 0L);
+        put(numberOfEmptyDirectories, 0L);
+        put(numberOfKeepFiles, 0L);
+      }
+    };
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/GitStatsMetricsCollector.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/GitStatsMetricsCollector.java
index 6e65fb5..fe6b700 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/GitStatsMetricsCollector.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/GitStatsMetricsCollector.java
@@ -16,8 +16,12 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.gitrepometrics.UpdateGitMetricsExecutor;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
 import org.eclipse.jgit.internal.storage.file.GC;
 
@@ -52,24 +56,39 @@
           sizeOfPackedObjects,
           numberOfBitmaps);
 
+  private final ExecutorService executorService;
+
+  @Inject
+  public GitStatsMetricsCollector(@UpdateGitMetricsExecutor ExecutorService executorService) {
+    this.executorService = executorService;
+  }
+
   @Override
-  public HashMap<GitRepoMetric, Long> collect(FileRepository repository, String projectName) {
-    HashMap<GitRepoMetric, Long> metrics = new HashMap<>(availableMetrics().size());
-    try {
-      GC.RepoStatistics statistics = new GC(repository).getStatistics();
-      metrics.put(numberOfPackedObjects, statistics.numberOfPackedObjects);
-      metrics.put(numberOfPackFiles, statistics.numberOfPackFiles);
-      metrics.put(numberOfLooseObjects, statistics.numberOfLooseObjects);
-      metrics.put(numberOfLooseRefs, statistics.numberOfLooseRefs);
-      metrics.put(numberOfPackedRefs, statistics.numberOfPackedRefs);
-      metrics.put(sizeOfLooseObjects, statistics.sizeOfLooseObjects);
-      metrics.put(sizeOfPackedObjects, statistics.sizeOfPackedObjects);
-      metrics.put(numberOfBitmaps, statistics.numberOfBitmaps);
-      logger.atInfo().log("New Git Statistics metrics collected: %s", statistics.toString());
-    } catch (IOException e) {
-      logger.atSevere().log("Something went wrong: %s", e.getMessage());
-    }
-    return metrics;
+  public void collect(
+      FileRepository repository,
+      String projectName,
+      Consumer<HashMap<GitRepoMetric, Long>> populateMetrics) {
+
+    executorService.submit(
+        () -> {
+          HashMap<GitRepoMetric, Long> metrics = new HashMap<>();
+          try {
+            GC.RepoStatistics statistics = new GC(repository).getStatistics();
+            metrics.put(numberOfPackedObjects, statistics.numberOfPackedObjects);
+            metrics.put(numberOfPackFiles, statistics.numberOfPackFiles);
+            metrics.put(numberOfLooseObjects, statistics.numberOfLooseObjects);
+            metrics.put(numberOfLooseRefs, statistics.numberOfLooseRefs);
+            metrics.put(numberOfPackedRefs, statistics.numberOfPackedRefs);
+            metrics.put(sizeOfLooseObjects, statistics.sizeOfLooseObjects);
+            metrics.put(sizeOfPackedObjects, statistics.sizeOfPackedObjects);
+            metrics.put(numberOfBitmaps, statistics.numberOfBitmaps);
+            logger.atInfo().log("New Git Statistics metrics collected: %s", statistics.toString());
+          } catch (IOException e) {
+            logger.atSevere().log("Something went wrong: %s", e.getMessage());
+          }
+
+          populateMetrics.accept(metrics);
+        });
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/MetricsCollector.java b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/MetricsCollector.java
index b4114f5..dcff8e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/MetricsCollector.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/MetricsCollector.java
@@ -16,6 +16,7 @@
 
 import com.google.common.collect.ImmutableList;
 import java.util.HashMap;
+import java.util.function.Consumer;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
 
 /** This interface is meant to be implemented by Git repository metrics collectors. * */
@@ -26,10 +27,12 @@
    *
    * @param projectName to collect metrics for
    * @param repository {@link FileRepository} to collect metrics from
-   * @return {@code HashMap<GitRepoMetric, Long>} where the key is the {@link GitRepoMetric} and the
-   *     value is the corresponding metric value collected.
+   * @param populateMetrics callback to populate the collected metrics
    */
-  HashMap<GitRepoMetric, Long> collect(FileRepository repository, String projectName);
+  void collect(
+      FileRepository repository,
+      String projectName,
+      Consumer<HashMap<GitRepoMetric, Long>> populateMetrics);
 
   /**
    * Returns the name of the metric collector.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/FakeMetricsCollector.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/FakeMetricsCollector.java
index a24707b..b20c4fe 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/FakeMetricsCollector.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/FakeMetricsCollector.java
@@ -15,11 +15,10 @@
 package com.googlesource.gerrit.plugins.gitrepometrics;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.GitRepoMetric;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.MetricsCollector;
 import java.util.HashMap;
+import java.util.function.Consumer;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
 
 class FakeMetricsCollector implements MetricsCollector {
@@ -28,8 +27,17 @@
   private final GitRepoMetric fakeMetric2;
 
   @Override
-  public HashMap<GitRepoMetric, Long> collect(FileRepository repository, String projectName) {
-    return Maps.newHashMap(ImmutableMap.of(fakeMetric1, 1L, fakeMetric2, 2L));
+  public void collect(
+      FileRepository repository,
+      String projectName,
+      Consumer<HashMap<GitRepoMetric, Long>> populateMetrics) {
+    populateMetrics.accept(
+        new HashMap<GitRepoMetric, Long>() {
+          {
+            put(fakeMetric1, 1L);
+            put(fakeMetric2, 2L);
+          }
+        });
   }
 
   @Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheIT.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheIT.java
index ffa03e5..7397132 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/GitRepoMetricsCacheIT.java
@@ -14,21 +14,21 @@
 
 package com.googlesource.gerrit.plugins.gitrepometrics;
 
-import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.WaitUtil;
 import com.google.gerrit.acceptance.config.GlobalPluginConfig;
 import com.google.gerrit.entities.Project;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.FSMetricsCollector;
 import com.googlesource.gerrit.plugins.gitrepometrics.collectors.GitStatsMetricsCollector;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
-import org.junit.Before;
 import org.junit.Test;
 
 @TestPlugin(
@@ -36,22 +36,25 @@
     sysModule = "com.googlesource.gerrit.plugins.gitrepometrics.Module")
 public class GitRepoMetricsCacheIT extends LightweightPluginDaemonTest {
 
+  private final int MAX_WAIT_TIME_FOR_METRICS_SECS = 5;
+
   @Inject MetricRegistry metricRegistry;
-  @Inject FSMetricsCollector fsMetricsCollector;
-  @Inject GitStatsMetricsCollector gitStatsMetricsCollector;
-  GitRepoMetricsCache gitRepoMetricsCache;
+  private FSMetricsCollector fsMetricsCollector;
+  private GitStatsMetricsCollector gitStatsMetricsCollector;
+  private GitRepoMetricsCache gitRepoMetricsCache;
 
   private final Project.NameKey testProject1 = Project.nameKey("testProject1");
   private final Project.NameKey testProject2 = Project.nameKey("testProject2");
 
   @Override
-  @Before
   public void setUpTestPlugin() throws Exception {
     super.setUpTestPlugin();
 
     repoManager.createRepository(testProject1);
     repoManager.createRepository(testProject2);
     gitRepoMetricsCache = plugin.getSysInjector().getInstance(GitRepoMetricsCache.class);
+    fsMetricsCollector = plugin.getSysInjector().getInstance(FSMetricsCollector.class);
+    gitStatsMetricsCollector = plugin.getSysInjector().getInstance(GitStatsMetricsCollector.class);
   }
 
   @Test
@@ -65,14 +68,25 @@
     new UpdateGitMetricsTask(gitRepoMetricsCache, repoManager, testProject1.get()).run();
     new UpdateGitMetricsTask(gitRepoMetricsCache, repoManager, testProject2.get()).run();
 
-    List<String> repoMetricsCount =
-        metricRegistry.getMetrics().keySet().stream()
-            .filter(metricName -> metricName.contains("git-repo-metrics"))
-            .collect(Collectors.toList());
-
     int expectedMetricsCount =
         fsMetricsCollector.availableMetrics().size()
             + gitStatsMetricsCollector.availableMetrics().size();
-    assertThat(repoMetricsCount.size()).isEqualTo(availableProjects.size() * expectedMetricsCount);
+
+    try {
+      WaitUtil.waitUntil(
+          () -> getPluginMetricsCount() == (long) availableProjects.size() * expectedMetricsCount,
+          Duration.ofSeconds(MAX_WAIT_TIME_FOR_METRICS_SECS));
+    } catch (InterruptedException e) {
+      fail(
+          String.format(
+              "Only %d metrics have been registered, expected %d",
+              getPluginMetricsCount(), availableProjects.size() * expectedMetricsCount));
+    }
+  }
+
+  private long getPluginMetricsCount() {
+    return metricRegistry.getMetrics().keySet().stream()
+        .filter(metricName -> metricName.contains("plugins/git-repo-metrics"))
+        .count();
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollectorTest.java b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollectorTest.java
index 92fc552..df1f689 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollectorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/gitrepometrics/collectors/FSMetricsCollectorTest.java
@@ -20,6 +20,8 @@
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.internal.storage.file.FileRepository;
 import org.junit.Before;
@@ -39,11 +41,23 @@
   }
 
   @Test
-  public void testCorrectMetricsCollection() throws IOException {
-    File objectDirectory = repository.getObjectsDirectory();
+  public void testCorrectMetricsCollection() throws IOException, InterruptedException {
+    File objectDirectory = ((FileRepository) repository).getObjectsDirectory();
     Files.createFile(new File(objectDirectory, "pack/keep1.keep").toPath());
 
-    HashMap<GitRepoMetric, Long> metrics = new FSMetricsCollector().collect(repository, "testRepo");
+    HashMap<GitRepoMetric, Long> metrics = new HashMap<>();
+
+    CountDownLatch latch = new CountDownLatch(1);
+    new FSMetricsCollector(Executors.newFixedThreadPool(2))
+        .collect(
+            (FileRepository) repository,
+            "testRepo",
+            m -> {
+              metrics.putAll(m);
+              latch.countDown();
+            });
+    latch.await();
+
     // This is the FS structure, from the "objects" directory, metrics are collected from:
     //  .
     //  ├── info