Parallelize account migration in schema 146 migration

* Run gc --prune=now before the migration, this improves performance
* Default number of migration threads: <number of processors>,
  manual configuration via system property threadcount
* For FileRepositorys pack refs every 1000 accounts, otherwise
  performance degrades continuously. Running full gc takes much longer.

This reduced runtime for migration 146 on an empty gerrit 2.14 site with
10000 accounts from 646 seconds to 98 seconds on a MacBook.

Without this change migration time has quadratic complexity and the
average migration rate degrades from 77 accounts/second for 1000 users
to 15 accounts/second for 10000 accounts.

With this change the migration rate is constant 102 accounts/second up
to 128k accounts.

See https://docs.google.com/spreadsheets/d/1U86RdHZ_Em5vr-NSK8coaYdlZ1VGLzmRcpOPt_1c04w

Change-Id: Iac69acfeb2adba0ff5b63a11843a7aa839b98917
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_146.java b/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_146.java
index 9ca61d9..64e9123 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_146.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/schema/Schema_146.java
@@ -14,6 +14,9 @@
 
 package com.google.gerrit.server.schema;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.reviewdb.server.ReviewDb;
@@ -24,21 +27,36 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.internal.storage.file.GC;
 import org.eclipse.jgit.lib.CommitBuilder;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectInserter;
 import org.eclipse.jgit.lib.PersonIdent;
+import org.eclipse.jgit.lib.ProgressMonitor;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.RefUpdate.Result;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.TextProgressMonitor;
 import org.eclipse.jgit.revwalk.RevCommit;
 import org.eclipse.jgit.revwalk.RevSort;
 import org.eclipse.jgit.revwalk.RevWalk;
@@ -60,6 +78,10 @@
   private final GitRepositoryManager repoManager;
   private final AllUsersName allUsersName;
   private final PersonIdent serverIdent;
+  private AtomicInteger i = new AtomicInteger();
+  private Stopwatch sw = Stopwatch.createStarted();
+  ReentrantLock gcLock = new ReentrantLock();
+  private int size;
 
   @Inject
   Schema_146(
@@ -75,13 +97,41 @@
 
   @Override
   protected void migrateData(ReviewDb db, UpdateUI ui) throws OrmException, SQLException {
+    ui.message("Migrating accounts");
+    Set<Entry<Account.Id, Timestamp>> accounts = scanAccounts(db, ui).entrySet();
+    gc(ui);
+    Set<List<Entry<Account.Id, Timestamp>>> batches =
+        Sets.newHashSet(Iterables.partition(accounts, 500));
+    ExecutorService pool = createExecutor(ui);
+    try {
+      batches.stream().forEach(batch -> pool.submit(() -> processBatch(batch, ui)));
+      pool.shutdown();
+      pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    ui.message(
+        String.format("... (%.3f s) Migrated all %d accounts to schema 146", elapsed(), i.get()));
+  }
+
+  private ExecutorService createExecutor(UpdateUI ui) {
+    int threads;
+    try {
+      threads = Integer.parseInt(System.getProperty("threadcount"));
+    } catch (NumberFormatException e) {
+      threads = Runtime.getRuntime().availableProcessors();
+    }
+    ui.message(String.format("... using %d threads ...", threads));
+    return Executors.newFixedThreadPool(threads);
+  }
+
+  private void processBatch(List<Entry<Account.Id, Timestamp>> batch, UpdateUI ui) {
     try (Repository repo = repoManager.openRepository(allUsersName);
         RevWalk rw = new RevWalk(repo);
         ObjectInserter oi = repo.newObjectInserter()) {
       ObjectId emptyTree = emptyTree(oi);
 
-      int i = 0;
-      for (Map.Entry<Account.Id, Timestamp> e : scanAccounts(db).entrySet()) {
+      for (Map.Entry<Account.Id, Timestamp> e : batch) {
         String refName = RefNames.refsUsers(e.getKey());
         Ref ref = repo.exactRef(refName);
         if (ref != null) {
@@ -89,14 +139,63 @@
         } else {
           createUserBranch(repo, oi, emptyTree, e.getKey(), e.getValue());
         }
-        i++;
-        if (i % 100 == 0) {
-          ui.message(String.format("... migrated %d users", i));
+        int count = i.incrementAndGet();
+        showProgress(ui, count);
+        if (count % 1000 == 0) {
+          gc(repo, true, ui);
         }
       }
-      ui.message(String.format("Migrated all %d users to schema 146", i));
     } catch (IOException e) {
-      throw new OrmException("Failed to rewrite user branches.", e);
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private double elapsed() {
+    return sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
+  }
+
+  private void showProgress(UpdateUI ui, int count) {
+    if (count % 100 == 0) {
+      ui.message(
+          String.format(
+              "... (%.3f s) migrated %d%% (%d/%d) accounts",
+              elapsed(), Math.round(100.0 * count / size), count, size));
+    }
+  }
+
+  private void gc(UpdateUI ui) {
+    try (Repository repo = repoManager.openRepository(allUsersName)) {
+      gc(repo, false, ui);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private void gc(Repository repo, boolean refsOnly, UpdateUI ui) {
+    if (repo instanceof FileRepository && gcLock.tryLock()) {
+      ProgressMonitor pm = null;
+      try {
+        pm = new TextProgressMonitor();
+        FileRepository r = (FileRepository) repo;
+        GC gc = new GC(r);
+        gc.setProgressMonitor(pm);
+        pm.beginTask("gc", ProgressMonitor.UNKNOWN);
+        if (refsOnly) {
+          ui.message(String.format("... (%.3f s) pack refs", elapsed()));
+          gc.packRefs();
+        } else {
+          ui.message(String.format("... (%.3f s) gc --prune=now", elapsed()));
+          gc.setExpire(new Date());
+          gc.gc();
+        }
+      } catch (IOException | ParseException e) {
+        throw new RuntimeException(e);
+      } finally {
+        gcLock.unlock();
+        if (pm != null) {
+          pm.endTask();
+        }
+      }
     }
   }
 
@@ -189,13 +288,15 @@
     return oi.insert(Constants.OBJ_TREE, new byte[] {});
   }
 
-  private Map<Account.Id, Timestamp> scanAccounts(ReviewDb db) throws SQLException {
+  private Map<Account.Id, Timestamp> scanAccounts(ReviewDb db, UpdateUI ui) throws SQLException {
+    ui.message(String.format("... (%.3f s) scan accounts", elapsed()));
     try (Statement stmt = newStatement(db);
         ResultSet rs = stmt.executeQuery("SELECT account_id, registered_on FROM accounts")) {
       HashMap<Account.Id, Timestamp> m = new HashMap<>();
       while (rs.next()) {
         m.put(new Account.Id(rs.getInt(1)), rs.getTimestamp(2));
       }
+      size = m.size();
       return m;
     }
   }