AllChangesIndexer: Schedule slices immediately

If a site has one huge repository and several small/medium sized repos,
then slice creation for the large repo will effectively block other
smaller repos from starting to reindex their changes. With this change,
we schedule slices without waiting for any other slice creation.

With this change, we can no longer shuffle slices. However, there is
some randomness to how slices are processed based on the thread count.

Progress monitors are also updated to report progress despite not
knowing the total work. The monitors add a '+' to the stats when the
total work is not finalized. For example:

project-slices: 32% (2167/6649+)
changes: 18% (591857/3234259+)

On large test-sites with ~3.5m changes and ~15k projects repos on NFS,
with loose refs equal to number of changes(created by notedb migration)
and with caches (change_kind, diff, diff_summary) populated, this change
brings down reindex time from ~165mins to ~125mins.

Change-Id: If3187ed9c9953177c270761da243b186627d8638
diff --git a/java/com/google/gerrit/server/git/MultiProgressMonitor.java b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
index 5cdd504..7e5c99f 100644
--- a/java/com/google/gerrit/server/git/MultiProgressMonitor.java
+++ b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
@@ -28,6 +28,8 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ProgressMonitor;
 
@@ -123,6 +125,64 @@
         return count;
       }
     }
+
+    public int getTotal() {
+      return total;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getTotalDisplay(int total) {
+      return String.valueOf(total);
+    }
+  }
+
+  /** Handle for a sub-task whose total work can be updated while the task is in progress. */
+  public class VolatileTask extends Task {
+    protected AtomicInteger volatileTotal;
+    protected AtomicBoolean isTotalFinalized = new AtomicBoolean(false);
+
+    public VolatileTask(String subTaskName) {
+      super(subTaskName, UNKNOWN);
+      volatileTotal = new AtomicInteger(UNKNOWN);
+    }
+
+    /**
+     * Update the total work for this sub-task.
+     *
+     * <p>Intended to be called from a worker thread.
+     *
+     * @param workUnits number of work units to be added to existing total work.
+     */
+    public void updateTotal(int workUnits) {
+      if (!isTotalFinalized.get()) {
+        volatileTotal.addAndGet(workUnits);
+      } else {
+        logger.atWarning().log(
+            "Total work has been finalized on sub-task " + getName() + " and cannot be updated");
+      }
+    }
+
+    /**
+     * Mark the total on this sub-task as unmodifiable.
+     *
+     * <p>Intended to be called from a worker thread.
+     */
+    public void finalizeTotal() {
+      isTotalFinalized.set(true);
+    }
+
+    @Override
+    public int getTotal() {
+      return volatileTotal.get();
+    }
+
+    @Override
+    public String getTotalDisplay(int total) {
+      return super.getTotalDisplay(total) + (isTotalFinalized.get() ? "" : "+");
+    }
   }
 
   private final OutputStream out;
@@ -305,6 +365,18 @@
   }
 
   /**
+   * Begin a sub-task whose total work can be updated.
+   *
+   * @param subTask sub-task name.
+   * @return sub-task handle.
+   */
+  public VolatileTask beginVolatileSubTask(String subTask) {
+    VolatileTask task = new VolatileTask(subTask);
+    tasks.add(task);
+    return task;
+  }
+
+  /**
    * End the overall task.
    *
    * <p>Must be called from a worker thread.
@@ -347,6 +419,7 @@
       boolean first = true;
       for (Task t : tasks) {
         int count = t.getCount();
+        int total = t.getTotal();
         if (count == 0) {
           continue;
         }
@@ -361,10 +434,11 @@
         if (!Strings.isNullOrEmpty(t.name)) {
           s.append(t.name).append(": ");
         }
-        if (t.total == UNKNOWN) {
+        if (total == UNKNOWN) {
           s.append(count);
         } else {
-          s.append(String.format("%d%% (%d/%d)", count * 100 / t.total, count, t.total));
+          s.append(
+              String.format("%d%% (%d/%s)", count * 100 / total, count, t.getTotalDisplay(total)));
         }
       }
     }
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index f466ad6..d6b8ef9 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -14,7 +14,6 @@
 
 package com.google.gerrit.server.index.change;
 
-import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static com.google.common.util.concurrent.Futures.transform;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@@ -22,9 +21,8 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -34,6 +32,7 @@
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.MultiProgressMonitor;
 import com.google.gerrit.server.git.MultiProgressMonitor.Task;
+import com.google.gerrit.server.git.MultiProgressMonitor.VolatileTask;
 import com.google.gerrit.server.index.IndexExecutor;
 import com.google.gerrit.server.index.OnlineReindexMode;
 import com.google.gerrit.server.notedb.ChangeNotes;
@@ -44,18 +43,13 @@
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.ProgressMonitor;
 import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.TextProgressMonitor;
 
 /**
  * Implementation that can index all changes on a host or within a project. Used by Gerrit's
@@ -64,6 +58,9 @@
  */
 public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private MultiProgressMonitor mpm;
+  private VolatileTask doneTask;
+  private Task failedTask;
   private static final int PROJECT_SLICE_MAX_REFS = 1000;
 
   private static class ProjectsCollectionFailure extends Exception {
@@ -130,55 +127,18 @@
     // in 2020.
 
     Stopwatch sw = Stopwatch.createStarted();
-    List<ProjectSlice> projectSlices;
+    AtomicBoolean ok = new AtomicBoolean(true);
+    mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
+    doneTask = mpm.beginVolatileSubTask("changes");
+    failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
+    List<ListenableFuture<?>> futures;
     try {
-      projectSlices = new SliceCreator().create();
-    } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+      futures = new SliceScheduler(index, ok).schedule();
+    } catch (ProjectsCollectionFailure e) {
       logger.atSevere().log(e.getMessage());
       return Result.create(sw, false, 0, 0);
     }
 
-    // Since project slices are created in parallel, they are somewhat shuffled already. However,
-    // the number of threads used to create the project slices doesn't guarantee good randomization.
-    // If the slices are not shuffled well, then multiple threads would typically work concurrently
-    // on different slices of the same project. While this is not a big issue, shuffling the list
-    // beforehand helps with ungrouping the project slices, so different slices are less likely to
-    // be worked on concurrently.
-    // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
-    Collections.shuffle(projectSlices);
-    return indexAll(index, projectSlices);
-  }
-
-  private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
-    Stopwatch sw = Stopwatch.createStarted();
-    MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
-    Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
-    checkState(totalWork >= 0);
-    Task doneTask = mpm.beginSubTask(null, totalWork);
-    Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
-
-    List<ListenableFuture<?>> futures = new ArrayList<>();
-    AtomicBoolean ok = new AtomicBoolean(true);
-
-    for (ProjectSlice projectSlice : projectSlices) {
-      Project.NameKey name = projectSlice.name();
-      int slice = projectSlice.slice();
-      int slices = projectSlice.slices();
-      ListenableFuture<?> future =
-          executor.submit(
-              reindexProject(
-                  indexerFactory.create(executor, index),
-                  name,
-                  slice,
-                  slices,
-                  projectSlice.scanResult(),
-                  doneTask,
-                  failedTask));
-      String description = "project " + name + " (" + slice + "/" + slices + ")";
-      addErrorListener(future, description, projTask, ok);
-      futures.add(future);
-    }
-
     try {
       mpm.waitFor(
           transform(
@@ -308,30 +268,53 @@
     }
   }
 
-  private class SliceCreator {
-    final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+  private class SliceScheduler {
+    final ChangeIndex index;
+    final AtomicBoolean ok;
     final AtomicInteger changeCount = new AtomicInteger(0);
     final AtomicInteger projectsFailed = new AtomicInteger(0);
-    final ProgressMonitor pm = new TextProgressMonitor();
+    final List<ListenableFuture<?>> sliceIndexerFutures = new ArrayList<>();
+    final List<ListenableFuture<?>> sliceCreationFutures = new ArrayList<>();
+    VolatileTask projTask = mpm.beginVolatileSubTask("project-slices");
+    Task slicingProjects;
 
-    private List<ProjectSlice> create()
-        throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
-      List<ListenableFuture<?>> futures = new ArrayList<>();
-      pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
-      for (Project.NameKey name : projectCache.all()) {
-        futures.add(executor.submit(new ProjectSliceCreator(name)));
+    public SliceScheduler(ChangeIndex index, AtomicBoolean ok) {
+      this.index = index;
+      this.ok = ok;
+    }
+
+    private List<ListenableFuture<?>> schedule() throws ProjectsCollectionFailure {
+      ImmutableSortedSet<Project.NameKey> projects = projectCache.all();
+      int projectCount = projects.size();
+      slicingProjects = mpm.beginSubTask("Slicing projects", projectCount);
+      for (Project.NameKey name : projects) {
+        sliceCreationFutures.add(executor.submit(new ProjectSliceCreator(name)));
       }
 
-      Futures.allAsList(futures).get();
+      try {
+        mpm.waitForNonFinalTask(
+            transform(
+                successfulAsList(sliceCreationFutures),
+                x -> {
+                  projTask.finalizeTotal();
+                  doneTask.finalizeTotal();
+                  return null;
+                },
+                directExecutor()));
+      } catch (UncheckedExecutionException e) {
+        logger.atSevere().withCause(e).log("Error project slice creation");
+        ok.set(false);
+      }
 
-      if (projectsFailed.get() > projectCache.all().size() / 2) {
+      if (projectsFailed.get() > projectCount / 2) {
         throw new ProjectsCollectionFailure(
             "Over 50%% of the projects could not be collected: aborted");
       }
 
-      pm.endTask();
+      slicingProjects.endTask();
       setTotalWork(changeCount.get());
-      return projectSlices.stream().collect(Collectors.toList());
+
+      return sliceIndexerFutures;
     }
 
     private class ProjectSliceCreator implements Callable<Void> {
@@ -353,15 +336,32 @@
               verboseWriter.println(
                   "Submitting " + name + " for indexing in " + slices + " slices");
             }
+
+            doneTask.updateTotal(size);
+            projTask.updateTotal(slices);
+
             for (int slice = 0; slice < slices; slice++) {
-              projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+              ProjectSlice projectSlice = ProjectSlice.create(name, slice, slices, sr);
+              ListenableFuture<?> future =
+                  executor.submit(
+                      reindexProject(
+                          indexerFactory.create(executor, index),
+                          name,
+                          slice,
+                          slices,
+                          projectSlice.scanResult(),
+                          doneTask,
+                          failedTask));
+              String description = "project " + name + " (" + slice + "/" + slices + ")";
+              addErrorListener(future, description, projTask, ok);
+              sliceIndexerFutures.add(future);
             }
           }
         } catch (IOException e) {
           logger.atSevere().withCause(e).log("Error collecting project %s", name);
           projectsFailed.incrementAndGet();
         }
-        pm.update(1);
+        slicingProjects.update(1);
         return null;
       }
     }