Parallelize Schema 108

This schema iterates through all open changes by repository and updates
project groups, parallelizing it would be efficient. Default number of
migration threads is equal to number of processors or can be manually
configured through cache.projects.loadThreads property.

This reduced runtime for migration 108 on a gerrit site with ~16k repos
from ~1800secs to ~850secs on a machine with 24 processors.

Change-Id: Ie1b847c95f9021d50bfabe6c90954b84fc4528ff
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_108.java b/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_108.java
index dc88f8d..66e0d3a 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_108.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_108.java
@@ -27,6 +27,7 @@
 import com.google.gerrit.reviewdb.client.Project.NameKey;
 import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.config.GerritServerConfig;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.GroupCollector;
 import com.google.gerrit.server.project.NoSuchChangeException;
@@ -39,6 +40,7 @@
 import java.util.Set;
 import java.util.SortedSet;
 import org.eclipse.jgit.errors.MissingObjectException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
@@ -51,37 +53,54 @@
 
 public class Schema_108 extends SchemaVersion {
   private final GitRepositoryManager repoManager;
+  private final Config cfg;
+  private ReviewDb db;
+  private UpdateUI ui;
 
   @Inject
-  Schema_108(Provider<Schema_107> prior, GitRepositoryManager repoManager) {
+  Schema_108(
+      Provider<Schema_107> prior,
+      GitRepositoryManager repoManager,
+      @GerritServerConfig Config cfg) {
     super(prior);
     this.repoManager = repoManager;
+    this.cfg = cfg;
   }
 
   @Override
   protected void migrateData(ReviewDb db, UpdateUI ui) throws OrmException {
+    this.db = db;
+    this.ui = ui;
     ui.message("Listing all changes ...");
     SetMultimap<Project.NameKey, Change.Id> openByProject = getOpenChangesByProject(db, ui);
     ui.message("done");
 
     ui.message("Updating groups for open changes ...");
-    int i = 0;
-    for (Map.Entry<Project.NameKey, Collection<Change.Id>> e : openByProject.asMap().entrySet()) {
-      try (Repository repo = repoManager.openRepository(e.getKey());
-          RevWalk rw = new RevWalk(repo)) {
-        updateProjectGroups(db, repo, rw, (Set<Change.Id>) e.getValue(), ui);
-      } catch (IOException | NoSuchChangeException err) {
-        throw new OrmException(err);
-      }
-      if (++i % 100 == 0) {
-        ui.message("  done " + i + " projects ...");
-      }
-    }
+    runParallelTasks(
+        createExecutor(ui),
+        openByProject.asMap().entrySet(),
+        (batch) -> processProjectBatch((Map.Entry<NameKey, Collection<Change.Id>>) batch),
+        ui);
     ui.message("done");
   }
 
-  private void updateProjectGroups(
-      ReviewDb db, Repository repo, RevWalk rw, Set<Change.Id> changes, UpdateUI ui)
+  private Void processProjectBatch(
+      Map.Entry<Project.NameKey, Collection<Change.Id>> changesByProject) throws OrmException {
+    try (Repository repo = repoManager.openRepository(changesByProject.getKey());
+        RevWalk rw = new RevWalk(repo)) {
+      updateProjectGroups(repo, rw, (Set<Change.Id>) changesByProject.getValue());
+    } catch (IOException | NoSuchChangeException err) {
+      throw new OrmException(err);
+    }
+    return null;
+  }
+
+  @Override
+  protected int getThreads() {
+    return cfg.getInt("cache", "projects", "loadThreads", super.getThreads());
+  }
+
+  private void updateProjectGroups(Repository repo, RevWalk rw, Set<Change.Id> changes)
       throws OrmException, IOException {
     // Match sorting in ReceiveCommits.
     rw.reset();