When reindexing changes, use multiple threads per project

Each project's changes were reindexed on a single thread. This might
leave most threads of the pool idling when reindexing a site with
one big and many small projects. In the beginning, all CPUs are busy
reindexing projects. But once the small projects have been reindexed,
one thread is still working alone on the big project, while the other
threads are idle.

To avoid this idling we split the big projects into smaller parts and
let the thread pool index these parts. Thereby also the reindexing of
big projects can take advantage of more CPUs.

Change-Id: Ic7b36b5b8badab502370d79085f329f9b8c70d9d
(cherry picked from commit 3679947c7f7de39183a0f5b8d2c16d5e6a3cec4e)
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 2f23ad8..7267ae2 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -21,8 +21,8 @@
 import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ComparisonChain;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.gerrit.index.SiteIndexer;
@@ -43,10 +43,9 @@
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
@@ -58,6 +57,7 @@
 
 public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int PROJECT_SLICE_MAX_REFS = 1000;
 
   private final SchemaFactory<ReviewDb> schemaFactory;
   private final ChangeData.Factory changeDataFactory;
@@ -85,22 +85,27 @@
     this.projectCache = projectCache;
   }
 
-  private static class ProjectHolder implements Comparable<ProjectHolder> {
-    final Project.NameKey name;
-    private final long size;
+  private static class ProjectSlice {
+    private final Project.NameKey name;
+    private final int slice;
+    private final int slices;
 
-    ProjectHolder(Project.NameKey name, long size) {
+    ProjectSlice(Project.NameKey name, int slice, int slices) {
       this.name = name;
-      this.size = size;
+      this.slice = slice;
+      this.slices = slices;
     }
 
-    @Override
-    public int compareTo(ProjectHolder other) {
-      // Sort projects based on size first to maximize utilization of threads early on.
-      return ComparisonChain.start()
-          .compare(other.size, size)
-          .compare(other.name.get(), name.get())
-          .result();
+    public Project.NameKey getName() {
+      return name;
+    }
+
+    public int getSlice() {
+      return slice;
+    }
+
+    public int getSlices() {
+      return slices;
     }
   }
 
@@ -108,19 +113,39 @@
   public Result indexAll(ChangeIndex index) {
     ProgressMonitor pm = new TextProgressMonitor();
     pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
-    SortedSet<ProjectHolder> projects = new TreeSet<>();
+    List<ProjectSlice> projectSlices = new ArrayList<>();
     int changeCount = 0;
     Stopwatch sw = Stopwatch.createStarted();
     int projectsFailed = 0;
     for (Project.NameKey name : projectCache.all()) {
       try (Repository repo = repoManager.openRepository(name)) {
-        long size = estimateSize(repo);
+        // The simplest approach to distribute indexing would be to let each thread grab a project
+        // and index it fully. But if a site has one big project and 100s of small projects, then
+        // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
+        // projects have been reindexed, and only the thread that reindexes the big project is
+        // still working. The other threads would idle. Reindexing the big project on a single
+        // thread becomes the critical path. Bringing in more CPUs would not speed up things.
+        //
+        // To avoid such situations, we split big repos into smaller parts and let
+        // the thread pool index these smaller parts. This splitting introduces an overhead in the
+        // workload setup and there might be additional slow-downs from multiple threads
+        // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
+        // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
+        // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
+        // in 2020.
+        int size = estimateSize(repo);
         changeCount += size;
-        projects.add(new ProjectHolder(name, size));
+        int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
+        if (slices > 1) {
+          verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices");
+        }
+        for (int slice = 0; slice < slices; slice++) {
+          projectSlices.add(new ProjectSlice(name, slice, slices));
+        }
       } catch (IOException e) {
         logger.atSevere().withCause(e).log("Error collecting project %s", name);
         projectsFailed++;
-        if (projectsFailed > projects.size() / 2) {
+        if (projectsFailed > projectCache.all().size() / 2) {
           logger.atSevere().log("Over 50%% of the projects could not be collected: aborted");
           return new Result(sw, false, 0, 0);
         }
@@ -129,24 +154,34 @@
     }
     pm.endTask();
     setTotalWork(changeCount);
-    return indexAll(index, projects);
+
+    // projectSlices are currently grouped by projects. First all slices for project1, followed
+    // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads
+    // would typically work concurrently on different slices of the same project. While this is not
+    // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so
+    // different slices are less likely to be worked on concurrently.
+    // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
+    Collections.shuffle(projectSlices);
+    return indexAll(index, projectSlices);
   }
 
-  private long estimateSize(Repository repo) throws IOException {
+  private int estimateSize(Repository repo) throws IOException {
     // Estimate size based on IDs that show up in ref names. This is not perfect, since patch set
     // refs may exist for changes whose metadata was never successfully stored. But that's ok, as
     // the estimate is just used as a heuristic for sorting projects.
-    return repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
-        .map(r -> Change.Id.fromRef(r.getName()))
-        .filter(Objects::nonNull)
-        .distinct()
-        .count();
+    long size =
+        repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
+            .map(r -> Change.Id.fromRef(r.getName()))
+            .filter(Objects::nonNull)
+            .distinct()
+            .count();
+    return Ints.saturatedCast(size);
   }
 
-  private SiteIndexer.Result indexAll(ChangeIndex index, SortedSet<ProjectHolder> projects) {
+  private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
     Stopwatch sw = Stopwatch.createStarted();
     MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
-    Task projTask = mpm.beginSubTask("projects", projects.size());
+    Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
     checkState(totalWork >= 0);
     Task doneTask = mpm.beginSubTask(null, totalWork);
     Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
@@ -154,12 +189,21 @@
     List<ListenableFuture<?>> futures = new ArrayList<>();
     AtomicBoolean ok = new AtomicBoolean(true);
 
-    for (ProjectHolder project : projects) {
+    for (ProjectSlice projectSlice : projectSlices) {
+      Project.NameKey name = projectSlice.getName();
+      int slice = projectSlice.getSlice();
+      int slices = projectSlice.getSlices();
       ListenableFuture<?> future =
           executor.submit(
               reindexProject(
-                  indexerFactory.create(executor, index), project.name, doneTask, failedTask));
-      addErrorListener(future, "project " + project.name, projTask, ok);
+                  indexerFactory.create(executor, index),
+                  name,
+                  slice,
+                  slices,
+                  doneTask,
+                  failedTask));
+      String description = "project " + name + " (" + slice + "/" + slices + ")";
+      addErrorListener(future, description, projTask, ok);
       futures.add(future);
     }
 
@@ -194,22 +238,38 @@
 
   public Callable<Void> reindexProject(
       ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) {
-    return new ProjectIndexer(indexer, project, done, failed);
+    return reindexProject(indexer, project, 0, 1, done, failed);
+  }
+
+  public Callable<Void> reindexProject(
+      ChangeIndexer indexer,
+      Project.NameKey project,
+      int slice,
+      int slices,
+      Task done,
+      Task failed) {
+    return new ProjectIndexer(indexer, project, slice, slices, done, failed);
   }
 
   private class ProjectIndexer implements Callable<Void> {
     private final ChangeIndexer indexer;
     private final Project.NameKey project;
+    private final int slice;
+    private final int slices;
     private final ProgressMonitor done;
     private final ProgressMonitor failed;
 
     private ProjectIndexer(
         ChangeIndexer indexer,
         Project.NameKey project,
+        int slice,
+        int slices,
         ProgressMonitor done,
         ProgressMonitor failed) {
       this.indexer = indexer;
       this.project = project;
+      this.slice = slice;
+      this.slices = slices;
       this.done = done;
       this.failed = failed;
     }
@@ -225,7 +285,9 @@
         // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
         // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
         // we don't have concrete proof that improving packfile locality would help.
-        notesFactory.scan(repo, db, project).forEach(r -> index(db, r));
+        notesFactory
+            .scan(repo, db, project, id -> (id.get() % slices) == slice)
+            .forEach(r -> index(db, r));
       } catch (RepositoryNotFoundException rnfe) {
         logger.atSevere().log(rnfe.getMessage());
       } finally {
diff --git a/java/com/google/gerrit/server/notedb/ChangeNotes.java b/java/com/google/gerrit/server/notedb/ChangeNotes.java
index 086b2e2..d548bf3 100644
--- a/java/com/google/gerrit/server/notedb/ChangeNotes.java
+++ b/java/com/google/gerrit/server/notedb/ChangeNotes.java
@@ -305,19 +305,35 @@
 
     public Stream<ChangeNotesResult> scan(Repository repo, ReviewDb db, Project.NameKey project)
         throws IOException {
-      return args.migration.readChanges() ? scanNoteDb(repo, db, project) : scanReviewDb(repo, db);
+      return scan(repo, db, project, null);
     }
 
-    private Stream<ChangeNotesResult> scanReviewDb(Repository repo, ReviewDb db)
+    public Stream<ChangeNotesResult> scan(
+        Repository repo,
+        ReviewDb db,
+        Project.NameKey project,
+        Predicate<Change.Id> changeIdPredicate)
         throws IOException {
+      return args.migration.readChanges()
+          ? scanNoteDb(repo, db, project, changeIdPredicate)
+          : scanReviewDb(repo, db, changeIdPredicate);
+    }
+
+    private Stream<ChangeNotesResult> scanReviewDb(
+        Repository repo, ReviewDb db, Predicate<Change.Id> changeIdPredicate) throws IOException {
       // Scan IDs that might exist in ReviewDb, assuming that each change has at least one patch set
       // ref. Not all changes might exist: some patch set refs might have been written where the
       // corresponding ReviewDb write failed. These will be silently filtered out by the batch get
       // call below, which is intended.
       Set<Change.Id> ids = scanChangeIds(repo).fromPatchSetRefs();
 
+      Stream<Change.Id> idStream = ids.stream();
+      if (changeIdPredicate != null) {
+        idStream = idStream.filter(changeIdPredicate);
+      }
+
       // A batch size of N may overload get(Iterable), so use something smaller, but still >1.
-      return Streams.stream(Iterators.partition(ids.iterator(), 30))
+      return Streams.stream(Iterators.partition(idStream.iterator(), 30))
           .flatMap(
               batch -> {
                 try {
@@ -333,10 +349,23 @@
 
     private Stream<ChangeNotesResult> scanNoteDb(
         Repository repo, ReviewDb db, Project.NameKey project) throws IOException {
+      return scanNoteDb(repo, db, project, null);
+    }
+
+    private Stream<ChangeNotesResult> scanNoteDb(
+        Repository repo,
+        ReviewDb db,
+        Project.NameKey project,
+        Predicate<Change.Id> changeIdPredicate)
+        throws IOException {
       ScanResult sr = scanChangeIds(repo);
       PrimaryStorage defaultStorage = args.migration.changePrimaryStorage();
 
-      return sr.all().stream()
+      Stream<Change.Id> idStream = sr.all().stream();
+      if (changeIdPredicate != null) {
+        idStream = idStream.filter(changeIdPredicate);
+      }
+      return idStream
           .map(id -> scanOneNoteDbChange(db, project, sr, defaultStorage, id))
           .filter(Objects::nonNull);
     }