RebuildNotedb: batch changes by project
If we batch changes by project before rebuilding them and writing them
to the notedb, we can use a single BatchRefUpdate for all of the
changes in a project, which reduces overhead for writing every change
to the notedb.
Additionally, within the code the rebuilds each change, I synchronized
on the BatchRefUpdate object because it is not thread safe. Since all
changes in a project will be using the same one, we can just
synchronize the function calls that modify that BatchRefUpdate.
Change-Id: I4af196fa720180b0846e9a6e7cc6d9083a75f695
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
index db5058e..f497444 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
@@ -17,7 +17,9 @@
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -29,7 +31,9 @@
import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.pgm.util.ThreadLimiter;
import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.git.WorkQueue;
@@ -43,6 +47,9 @@
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
+import org.eclipse.jgit.lib.BatchRefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,71 +85,105 @@
sysManager.start();
ListeningExecutorService executor = newExecutor();
- final MultiProgressMonitor mpm =
- new MultiProgressMonitor(System.out, "Rebuilding notedb");
- final Task doneTask =
- mpm.beginSubTask("changes", MultiProgressMonitor.UNKNOWN);
- final Task failedTask =
- mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
+ System.out.println("Rebuilding the notedb");
ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class);
- List<Change> allChanges = getAllChanges();
- final List<ListenableFuture<?>> futures = Lists.newArrayList();
+ Multimap<Project.NameKey, Change> changesByProject = getChangesByProject();
final AtomicBoolean ok = new AtomicBoolean(true);
Stopwatch sw = Stopwatch.createStarted();
- for (final Change c : allChanges) {
- final ListenableFuture<?> future = rebuilder.rebuildAsync(c, executor);
- futures.add(future);
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- doneTask.update(1);
- } catch (ExecutionException | InterruptedException e) {
- fail(e);
- } catch (RuntimeException e) {
- failAndThrow(e);
- } catch (Error e) {
- // Can't join with RuntimeException because "RuntimeException |
- // Error" becomes Throwable, which messes with signatures.
- failAndThrow(e);
- }
- }
+ GitRepositoryManager repoManager =
+ sysInjector.getInstance(GitRepositoryManager.class);
- private void fail(Throwable t) {
- log.error("Failed to rebuild change " + c.getId(), t);
- ok.set(false);
- failedTask.update(1);
- }
+ for (final Project.NameKey project : changesByProject.keySet()) {
+ final Repository repo = repoManager.openRepository(project);
+ try {
+ final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
- private void failAndThrow(RuntimeException e) {
- fail(e);
- throw e;
+ // Here, we truncate the project name to 50 characters to ensure that
+ // the whole monitor line for a project fits on one line (<80 chars).
+ int monitorStringMaxLength = 50;
+ String projectString = project.toString();
+ String monitorString = (projectString.length() > monitorStringMaxLength)
+ ? projectString.substring(0, monitorStringMaxLength)
+ : projectString;
+ if (projectString.length() > monitorString.length()) {
+ monitorString = monitorString + "...";
}
+ final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
+ monitorString);
+ final Task doneTask =
+ mpm.beginSubTask("done", changesByProject.get(project).size());
+ final Task failedTask = mpm.beginSubTask("failed",
+ MultiProgressMonitor.UNKNOWN);
- private void failAndThrow(Error e) {
- fail(e);
- throw e;
- }
- }, MoreExecutors.sameThreadExecutor());
- }
- try {
- mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
- new AsyncFunction<List<?>, Void>() {
+ for (final Change c : changesByProject.get(project)) {
+ final ListenableFuture<?> future =
+ rebuilder.rebuildAsync(c, executor, bru);
+ futures.add(future);
+ future.addListener(new Runnable() {
@Override
- public ListenableFuture<Void> apply(List<?> input) {
- mpm.end();
- return Futures.immediateFuture(null);
+ public void run() {
+ try {
+ future.get();
+ doneTask.update(1);
+ } catch (ExecutionException | InterruptedException e) {
+ fail(e);
+ } catch (RuntimeException e) {
+ failAndThrow(e);
+ } catch (Error e) {
+ // Can't join with RuntimeException because "RuntimeException |
+ // Error" becomes Throwable, which messes with signatures.
+ failAndThrow(e);
+ }
}
- }));
- } catch (ExecutionException e) {
- log.error("Error rebuilding notedb", e);
- ok.set(false);
+
+ private void fail(Throwable t) {
+ log.error("Failed to rebuild change " + c.getId(), t);
+ ok.set(false);
+ failedTask.update(1);
+ }
+
+ private void failAndThrow(RuntimeException e) {
+ fail(e);
+ throw e;
+ }
+
+ private void failAndThrow(Error e) {
+ fail(e);
+ throw e;
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+
+ mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
+ new AsyncFunction<List<?>, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(List<?> input)
+ throws Exception {
+ Task t = mpm.beginSubTask("update refs",
+ MultiProgressMonitor.UNKNOWN);
+ RevWalk walk = new RevWalk(repo);
+ try {
+ bru.execute(walk, t);
+ mpm.end();
+ return Futures.immediateFuture(null);
+ } finally {
+ walk.release();
+ }
+ }
+ }));
+ } catch (Exception e) {
+ log.error("Error rebuilding notedb", e);
+ ok.set(false);
+ break;
+ } finally {
+ repo.close();
+ }
}
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n",
- allChanges.size(), t, allChanges.size() / t);
+ changesByProject.size(), t, changesByProject.size() / t);
return ok.get() ? 0 : 1;
}
@@ -168,17 +209,20 @@
}
}
- private List<Change> getAllChanges() throws OrmException {
- // Memoize all changes to a list so we can close the db connection and allow
+ private Multimap<Project.NameKey, Change> getChangesByProject()
+ throws OrmException {
+ // Memorize all changes so we can close the db connection and allow
// rebuilder threads to use the full connection pool.
- // TODO(dborowitz): May need to batch changes, e.g. by project (though note
- // that unlike Reindex, we don't think there is an inherent benefit to
- // grouping by project), to avoid wasting too much memory here.
SchemaFactory<ReviewDb> schemaFactory = sysInjector.getInstance(Key.get(
new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
ReviewDb db = schemaFactory.open();
+ Multimap<Project.NameKey, Change> changesByProject =
+ ArrayListMultimap.create();
try {
- return db.changes().all().toList();
+ for (Change c : db.changes().all()) {
+ changesByProject.put(c.getProject(), c);
+ }
+ return changesByProject;
} finally {
db.close();
}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
index c4a7c07..910edf9 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
@@ -72,12 +72,12 @@
this.updateFactory = updateFactory;
}
- public ListenableFuture<?> rebuildAsync(
- final Change change, ListeningExecutorService executor) {
+ public ListenableFuture<?> rebuildAsync(final Change change,
+ ListeningExecutorService executor, final BatchRefUpdate bru) {
return executor.submit(new Callable<Void>() {
- @Override
+ @Override
public Void call() throws Exception {
- rebuild(change, null);
+ rebuild(change, bru);
return null;
}
});
@@ -109,7 +109,7 @@
controlFactory.controlFor(change, user), e.when);
update.setPatchSetId(e.psId);
if (batch == null) {
- batch = update.openUpdate();
+ batch = update.openUpdateInBatch(bru);
}
}
e.apply(update);
@@ -118,7 +118,15 @@
if (update != null) {
writeToBatch(batch, update);
}
- batch.commit();
+
+ // Since the BatchMetaDataUpdates generated by all ChangeRebuilders on a
+ // given project are backed by the same BatchRefUpdate, we need to
+ // synchronize on the BatchRefUpdate. Therefore, since commit on a
+ // BatchMetaDataUpdate is the only method that modifies a BatchRefUpdate,
+ // we can just synchronize this call.
+ synchronized (bru) {
+ batch.commit();
+ }
}
}