RebuildNotedb: batch changes by project

If we batch changes by project before rebuilding them and writing them
to the notedb, we can use a single BatchRefUpdate for all of the
changes in a project, which reduces overhead for writing every change
to the notedb.

Additionally, within the code the rebuilds each change, I synchronized
on the BatchRefUpdate object because it is not thread safe. Since all
changes in a project will be using the same one, we can just
synchronize the function calls that modify that BatchRefUpdate.

Change-Id: I4af196fa720180b0846e9a6e7cc6d9083a75f695
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
index db5058e..f497444 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
@@ -17,7 +17,9 @@
 import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -29,7 +31,9 @@
 import com.google.gerrit.pgm.util.SiteProgram;
 import com.google.gerrit.pgm.util.ThreadLimiter;
 import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.MultiProgressMonitor;
 import com.google.gerrit.server.git.MultiProgressMonitor.Task;
 import com.google.gerrit.server.git.WorkQueue;
@@ -43,6 +47,9 @@
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 
+import org.eclipse.jgit.lib.BatchRefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,71 +85,105 @@
     sysManager.start();
 
     ListeningExecutorService executor = newExecutor();
-    final MultiProgressMonitor mpm =
-        new MultiProgressMonitor(System.out, "Rebuilding notedb");
-    final Task doneTask =
-        mpm.beginSubTask("changes", MultiProgressMonitor.UNKNOWN);
-    final Task failedTask =
-        mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
+    System.out.println("Rebuilding the notedb");
     ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class);
 
-    List<Change> allChanges = getAllChanges();
-    final List<ListenableFuture<?>> futures = Lists.newArrayList();
+    Multimap<Project.NameKey, Change> changesByProject = getChangesByProject();
     final AtomicBoolean ok = new AtomicBoolean(true);
     Stopwatch sw = Stopwatch.createStarted();
-    for (final Change c : allChanges) {
-      final ListenableFuture<?> future = rebuilder.rebuildAsync(c, executor);
-      futures.add(future);
-      future.addListener(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            future.get();
-            doneTask.update(1);
-          } catch (ExecutionException | InterruptedException e) {
-            fail(e);
-          } catch (RuntimeException e) {
-            failAndThrow(e);
-          } catch (Error e) {
-            // Can't join with RuntimeException because "RuntimeException |
-            // Error" becomes Throwable, which messes with signatures.
-            failAndThrow(e);
-          }
-        }
+    GitRepositoryManager repoManager =
+        sysInjector.getInstance(GitRepositoryManager.class);
 
-        private void fail(Throwable t) {
-          log.error("Failed to rebuild change " + c.getId(), t);
-          ok.set(false);
-          failedTask.update(1);
-        }
+    for (final Project.NameKey project : changesByProject.keySet()) {
+      final Repository repo = repoManager.openRepository(project);
+      try {
+        final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
 
-        private void failAndThrow(RuntimeException e) {
-          fail(e);
-          throw e;
+        // Here, we truncate the project name to 50 characters to ensure that
+        // the whole monitor line for a project fits on one line (<80 chars).
+        int monitorStringMaxLength = 50;
+        String projectString = project.toString();
+        String monitorString = (projectString.length() > monitorStringMaxLength)
+            ? projectString.substring(0, monitorStringMaxLength)
+            : projectString;
+        if (projectString.length() > monitorString.length()) {
+          monitorString = monitorString + "...";
         }
+        final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
+            monitorString);
+        final Task doneTask =
+            mpm.beginSubTask("done", changesByProject.get(project).size());
+        final Task failedTask = mpm.beginSubTask("failed",
+            MultiProgressMonitor.UNKNOWN);
 
-        private void failAndThrow(Error e) {
-          fail(e);
-          throw e;
-        }
-      }, MoreExecutors.sameThreadExecutor());
-    }
-    try {
-      mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
-          new AsyncFunction<List<?>, Void>() {
+        for (final Change c : changesByProject.get(project)) {
+          final ListenableFuture<?> future =
+              rebuilder.rebuildAsync(c, executor, bru);
+          futures.add(future);
+          future.addListener(new Runnable() {
             @Override
-            public ListenableFuture<Void> apply(List<?> input) {
-              mpm.end();
-              return Futures.immediateFuture(null);
+            public void run() {
+              try {
+                future.get();
+                doneTask.update(1);
+              } catch (ExecutionException | InterruptedException e) {
+                fail(e);
+              } catch (RuntimeException e) {
+                failAndThrow(e);
+              } catch (Error e) {
+                // Can't join with RuntimeException because "RuntimeException |
+                // Error" becomes Throwable, which messes with signatures.
+                failAndThrow(e);
+              }
             }
-      }));
-    } catch (ExecutionException e) {
-      log.error("Error rebuilding notedb", e);
-      ok.set(false);
+
+            private void fail(Throwable t) {
+              log.error("Failed to rebuild change " + c.getId(), t);
+              ok.set(false);
+              failedTask.update(1);
+            }
+
+            private void failAndThrow(RuntimeException e) {
+              fail(e);
+              throw e;
+            }
+
+            private void failAndThrow(Error e) {
+              fail(e);
+              throw e;
+            }
+          }, MoreExecutors.sameThreadExecutor());
+        }
+
+        mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
+            new AsyncFunction<List<?>, Void>() {
+                @Override
+              public ListenableFuture<Void> apply(List<?> input)
+                  throws Exception {
+                Task t = mpm.beginSubTask("update refs",
+                    MultiProgressMonitor.UNKNOWN);
+                RevWalk walk = new RevWalk(repo);
+                try {
+                  bru.execute(walk, t);
+                  mpm.end();
+                  return Futures.immediateFuture(null);
+                } finally {
+                  walk.release();
+                }
+              }
+            }));
+      } catch (Exception e) {
+        log.error("Error rebuilding notedb", e);
+        ok.set(false);
+        break;
+      } finally {
+        repo.close();
+      }
     }
     double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
     System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n",
-        allChanges.size(), t, allChanges.size() / t);
+        changesByProject.size(), t, changesByProject.size() / t);
     return ok.get() ? 0 : 1;
   }
 
@@ -168,17 +209,20 @@
     }
   }
 
-  private List<Change> getAllChanges() throws OrmException {
-    // Memoize all changes to a list so we can close the db connection and allow
+  private Multimap<Project.NameKey, Change> getChangesByProject()
+      throws OrmException {
+    // Memorize all changes so we can close the db connection and allow
     // rebuilder threads to use the full connection pool.
-    // TODO(dborowitz): May need to batch changes, e.g. by project (though note
-    // that unlike Reindex, we don't think there is an inherent benefit to
-    // grouping by project), to avoid wasting too much memory here.
     SchemaFactory<ReviewDb> schemaFactory = sysInjector.getInstance(Key.get(
         new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
     ReviewDb db = schemaFactory.open();
+    Multimap<Project.NameKey, Change> changesByProject =
+        ArrayListMultimap.create();
     try {
-      return db.changes().all().toList();
+      for (Change c : db.changes().all()) {
+        changesByProject.put(c.getProject(), c);
+      }
+      return changesByProject;
     } finally {
       db.close();
     }
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
index c4a7c07..910edf9 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
@@ -72,12 +72,12 @@
     this.updateFactory = updateFactory;
   }
 
-  public ListenableFuture<?> rebuildAsync(
-      final Change change, ListeningExecutorService executor) {
+  public ListenableFuture<?> rebuildAsync(final Change change,
+      ListeningExecutorService executor, final BatchRefUpdate bru) {
     return executor.submit(new Callable<Void>() {
-      @Override
+        @Override
       public Void call() throws Exception {
-        rebuild(change, null);
+        rebuild(change, bru);
         return null;
       }
     });
@@ -109,7 +109,7 @@
             controlFactory.controlFor(change, user), e.when);
         update.setPatchSetId(e.psId);
         if (batch == null) {
-          batch = update.openUpdate();
+          batch = update.openUpdateInBatch(bru);
         }
       }
       e.apply(update);
@@ -118,7 +118,15 @@
       if (update != null) {
         writeToBatch(batch, update);
       }
-      batch.commit();
+
+      // Since the BatchMetaDataUpdates generated by all ChangeRebuilders on a
+      // given project are backed by the same BatchRefUpdate, we need to
+      // synchronize on the BatchRefUpdate. Therefore, since commit on a
+      // BatchMetaDataUpdate is the only method that modifies a BatchRefUpdate,
+      // we can just synchronize this call.
+      synchronized (bru) {
+        batch.commit();
+      }
     }
   }