Merge changes Ie3972a84,I4bf1d191 into stable-3.2

* changes:
  AllChangesIndexer: Parallelize project slice creation
  AllChangesIndexer: Avoid scanning for change refs in each slice
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 309c915..494aa84 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -20,15 +20,16 @@
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
 import com.google.common.flogger.FluentLogger;
-import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.index.SiteIndexer;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.MultiProgressMonitor;
@@ -37,6 +38,7 @@
 import com.google.gerrit.server.index.OnlineReindexMode;
 import com.google.gerrit.server.notedb.ChangeNotes;
 import com.google.gerrit.server.notedb.ChangeNotes.Factory.ChangeNotesResult;
+import com.google.gerrit.server.notedb.ChangeNotes.Factory.ScanResult;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.query.change.ChangeData;
 import com.google.inject.Inject;
@@ -44,11 +46,13 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.ProgressMonitor;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.lib.TextProgressMonitor;
@@ -62,6 +66,14 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int PROJECT_SLICE_MAX_REFS = 1000;
 
+  private static class ProjectsCollectionFailure extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public ProjectsCollectionFailure(String message) {
+      super(message);
+    }
+  }
+
   private final ChangeData.Factory changeDataFactory;
   private final GitRepositoryManager repoManager;
   private final ListeningExecutorService executor;
@@ -85,103 +97,58 @@
     this.projectCache = projectCache;
   }
 
-  private static class ProjectSlice {
-    private final Project.NameKey name;
-    private final int slice;
-    private final int slices;
+  @AutoValue
+  public abstract static class ProjectSlice {
+    public abstract Project.NameKey name();
 
-    ProjectSlice(Project.NameKey name, int slice, int slices) {
-      this.name = name;
-      this.slice = slice;
-      this.slices = slices;
-    }
+    public abstract int slice();
 
-    public Project.NameKey getName() {
-      return name;
-    }
+    public abstract int slices();
 
-    public int getSlice() {
-      return slice;
-    }
+    public abstract ScanResult scanResult();
 
-    public int getSlices() {
-      return slices;
+    private static ProjectSlice create(Project.NameKey name, int slice, int slices, ScanResult sr) {
+      return new AutoValue_AllChangesIndexer_ProjectSlice(name, slice, slices, sr);
     }
   }
 
   @Override
   public Result indexAll(ChangeIndex index) {
-    ProgressMonitor pm = new TextProgressMonitor();
-    pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
-    List<ProjectSlice> projectSlices = new ArrayList<>();
-    int changeCount = 0;
-    Stopwatch sw = Stopwatch.createStarted();
-    int projectsFailed = 0;
-    for (Project.NameKey name : projectCache.all()) {
-      try (Repository repo = repoManager.openRepository(name)) {
-        // The simplest approach to distribute indexing would be to let each thread grab a project
-        // and index it fully. But if a site has one big project and 100s of small projects, then
-        // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
-        // projects have been reindexed, and only the thread that reindexes the big project is
-        // still working. The other threads would idle. Reindexing the big project on a single
-        // thread becomes the critical path. Bringing in more CPUs would not speed up things.
-        //
-        // To avoid such situations, we split big repos into smaller parts and let
-        // the thread pool index these smaller parts. This splitting introduces an overhead in the
-        // workload setup and there might be additional slow-downs from multiple threads
-        // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
-        // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
-        // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
-        // in 2020.
-        int size = estimateSize(repo);
-        if (size == 0) {
-          pm.update(1);
-          continue;
-        }
-        changeCount += size;
-        int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
-        if (slices > 1) {
-          verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices");
-        }
-        for (int slice = 0; slice < slices; slice++) {
-          projectSlices.add(new ProjectSlice(name, slice, slices));
-        }
-      } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error collecting project %s", name);
-        projectsFailed++;
-        if (projectsFailed > projectCache.all().size() / 2) {
-          logger.atSevere().log("Over 50%% of the projects could not be collected: aborted");
-          return Result.create(sw, false, 0, 0);
-        }
-      }
-      pm.update(1);
-    }
-    pm.endTask();
-    setTotalWork(changeCount);
+    // The simplest approach to distribute indexing would be to let each thread grab a project
+    // and index it fully. But if a site has one big project and 100s of small projects, then
+    // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
+    // projects have been reindexed, and only the thread that reindexes the big project is
+    // still working. The other threads would idle. Reindexing the big project on a single
+    // thread becomes the critical path. Bringing in more CPUs would not speed up things.
+    //
+    // To avoid such situations, we split big repos into smaller parts and let
+    // the thread pool index these smaller parts. This splitting introduces an overhead in the
+    // workload setup and there might be additional slow-downs from multiple threads
+    // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
+    // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
+    // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
+    // in 2020.
 
-    // projectSlices are currently grouped by projects. First all slices for project1, followed
-    // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads
-    // would typically work concurrently on different slices of the same project. While this is not
-    // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so
-    // different slices are less likely to be worked on concurrently.
+    Stopwatch sw = Stopwatch.createStarted();
+    List<ProjectSlice> projectSlices;
+    try {
+      projectSlices = new SliceCreator().create();
+    } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+      logger.atSevere().log(e.getMessage());
+      return Result.create(sw, false, 0, 0);
+    }
+
+    // Since project slices are created in parallel, they are somewhat shuffled already. However,
+    // the number of threads used to create the project slices doesn't guarantee good randomization.
+    // If the slices are not shuffled well, then multiple threads would typically work concurrently
+    // on different slices of the same project. While this is not a big issue, shuffling the list
+    // beforehand helps with ungrouping the project slices, so different slices are less likely to
+    // be worked on concurrently.
     // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
     Collections.shuffle(projectSlices);
     return indexAll(index, projectSlices);
   }
 
-  private int estimateSize(Repository repo) throws IOException {
-    // Estimate size based on IDs that show up in ref names. This is not perfect, since patch set
-    // refs may exist for changes whose metadata was never successfully stored. But that's ok, as
-    // the estimate is just used as a heuristic for sorting projects.
-    long size =
-        repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
-            .map(r -> Change.Id.fromRef(r.getName()))
-            .filter(Objects::nonNull)
-            .distinct()
-            .count();
-    return Ints.saturatedCast(size);
-  }
-
   private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
     Stopwatch sw = Stopwatch.createStarted();
     MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
@@ -194,9 +161,9 @@
     AtomicBoolean ok = new AtomicBoolean(true);
 
     for (ProjectSlice projectSlice : projectSlices) {
-      Project.NameKey name = projectSlice.getName();
-      int slice = projectSlice.getSlice();
-      int slices = projectSlice.getSlices();
+      Project.NameKey name = projectSlice.name();
+      int slice = projectSlice.slice();
+      int slices = projectSlice.slices();
       ListenableFuture<?> future =
           executor.submit(
               reindexProject(
@@ -204,6 +171,7 @@
                   name,
                   slice,
                   slices,
+                  projectSlice.scanResult(),
                   doneTask,
                   failedTask));
       String description = "project " + name + " (" + slice + "/" + slices + ")";
@@ -242,7 +210,13 @@
 
   public Callable<Void> reindexProject(
       ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) {
-    return reindexProject(indexer, project, 0, 1, done, failed);
+    try (Repository repo = repoManager.openRepository(project)) {
+      return reindexProject(
+          indexer, project, 0, 1, ChangeNotes.Factory.scanChangeIds(repo), done, failed);
+    } catch (IOException e) {
+      logger.atSevere().log(e.getMessage());
+      return null;
+    }
   }
 
   public Callable<Void> reindexProject(
@@ -250,9 +224,10 @@
       Project.NameKey project,
       int slice,
       int slices,
+      ScanResult scanResult,
       Task done,
       Task failed) {
-    return new ProjectIndexer(indexer, project, slice, slices, done, failed);
+    return new ProjectIndexer(indexer, project, slice, slices, scanResult, done, failed);
   }
 
   private class ProjectIndexer implements Callable<Void> {
@@ -260,6 +235,7 @@
     private final Project.NameKey project;
     private final int slice;
     private final int slices;
+    private final ScanResult scanResult;
     private final ProgressMonitor done;
     private final ProgressMonitor failed;
 
@@ -268,32 +244,30 @@
         Project.NameKey project,
         int slice,
         int slices,
+        ScanResult scanResult,
         ProgressMonitor done,
         ProgressMonitor failed) {
       this.indexer = indexer;
       this.project = project;
       this.slice = slice;
       this.slices = slices;
+      this.scanResult = scanResult;
       this.done = done;
       this.failed = failed;
     }
 
     @Override
     public Void call() throws Exception {
-      try (Repository repo = repoManager.openRepository(project)) {
-        OnlineReindexMode.begin();
-
-        // Order of scanning changes is undefined. This is ok if we assume that packfile locality is
-        // not important for indexing, since sites should have a fully populated DiffSummary cache.
-        // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
-        // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
-        // we don't have concrete proof that improving packfile locality would help.
-        notesFactory.scan(repo, project, id -> (id.get() % slices) == slice).forEach(r -> index(r));
-      } catch (RepositoryNotFoundException rnfe) {
-        logger.atSevere().log(rnfe.getMessage());
-      } finally {
-        OnlineReindexMode.end();
-      }
+      OnlineReindexMode.begin();
+      // Order of scanning changes is undefined. This is ok if we assume that packfile locality is
+      // not important for indexing, since sites should have a fully populated DiffSummary cache.
+      // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
+      // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
+      // we don't have concrete proof that improving packfile locality would help.
+      notesFactory
+          .scan(scanResult, project, id -> (id.get() % slices) == slice)
+          .forEach(r -> index(r));
+      OnlineReindexMode.end();
       return null;
     }
 
@@ -333,4 +307,63 @@
       return "Index all changes of project " + project.get();
     }
   }
+
+  private class SliceCreator {
+    final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+    final AtomicInteger changeCount = new AtomicInteger(0);
+    final AtomicInteger projectsFailed = new AtomicInteger(0);
+    final ProgressMonitor pm = new TextProgressMonitor();
+
+    private List<ProjectSlice> create()
+        throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
+      List<ListenableFuture<?>> futures = new ArrayList<>();
+      pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
+      for (Project.NameKey name : projectCache.all()) {
+        futures.add(executor.submit(new ProjectSliceCreator(name)));
+      }
+
+      Futures.allAsList(futures).get();
+
+      if (projectsFailed.get() > projectCache.all().size() / 2) {
+        throw new ProjectsCollectionFailure(
+            "Over 50%% of the projects could not be collected: aborted");
+      }
+
+      pm.endTask();
+      setTotalWork(changeCount.get());
+      return projectSlices.stream().collect(Collectors.toList());
+    }
+
+    private class ProjectSliceCreator implements Callable<Void> {
+      final Project.NameKey name;
+
+      public ProjectSliceCreator(Project.NameKey name) {
+        this.name = name;
+      }
+
+      @Override
+      public Void call() throws IOException {
+        try (Repository repo = repoManager.openRepository(name)) {
+          ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo);
+          int size = sr.all().size();
+          if (size > 0) {
+            changeCount.addAndGet(size);
+            int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
+            if (slices > 1) {
+              verboseWriter.println(
+                  "Submitting " + name + " for indexing in " + slices + " slices");
+            }
+            for (int slice = 0; slice < slices; slice++) {
+              projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+            }
+          }
+        } catch (IOException e) {
+          logger.atSevere().withCause(e).log("Error collecting project %s", name);
+          projectsFailed.incrementAndGet();
+        }
+        pm.update(1);
+        return null;
+      }
+    }
+  }
 }
diff --git a/java/com/google/gerrit/server/notedb/ChangeNotes.java b/java/com/google/gerrit/server/notedb/ChangeNotes.java
index 36a61cc0..51acf16e 100644
--- a/java/com/google/gerrit/server/notedb/ChangeNotes.java
+++ b/java/com/google/gerrit/server/notedb/ChangeNotes.java
@@ -106,6 +106,29 @@
       this.projectCache = projectCache;
     }
 
+    @AutoValue
+    public abstract static class ScanResult {
+      abstract ImmutableSet<Change.Id> fromPatchSetRefs();
+
+      abstract ImmutableSet<Change.Id> fromMetaRefs();
+
+      public SetView<Change.Id> all() {
+        return Sets.union(fromPatchSetRefs(), fromMetaRefs());
+      }
+    }
+
+    public static ScanResult scanChangeIds(Repository repo) throws IOException {
+      ImmutableSet.Builder<Change.Id> fromPs = ImmutableSet.builder();
+      ImmutableSet.Builder<Change.Id> fromMeta = ImmutableSet.builder();
+      for (Ref r : repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES)) {
+        Change.Id id = Change.Id.fromRef(r.getName());
+        if (id != null) {
+          (r.getName().endsWith(RefNames.META_SUFFIX) ? fromMeta : fromPs).add(id);
+        }
+      }
+      return new AutoValue_ChangeNotes_Factory_ScanResult(fromPs.build(), fromMeta.build());
+    }
+
     public ChangeNotes createChecked(Change c) {
       return createChecked(c.getProject(), c.getId());
     }
@@ -213,8 +236,11 @@
     public Stream<ChangeNotesResult> scan(
         Repository repo, Project.NameKey project, Predicate<Change.Id> changeIdPredicate)
         throws IOException {
-      ScanResult sr = scanChangeIds(repo);
+      return scan(scanChangeIds(repo), project, changeIdPredicate);
+    }
 
+    public Stream<ChangeNotesResult> scan(
+        ScanResult sr, Project.NameKey project, Predicate<Change.Id> changeIdPredicate) {
       Stream<Change.Id> idStream = sr.all().stream();
       if (changeIdPredicate != null) {
         idStream = idStream.filter(changeIdPredicate);
@@ -286,29 +312,6 @@
       @Nullable
       abstract ChangeNotes maybeNotes();
     }
-
-    @AutoValue
-    abstract static class ScanResult {
-      abstract ImmutableSet<Change.Id> fromPatchSetRefs();
-
-      abstract ImmutableSet<Change.Id> fromMetaRefs();
-
-      SetView<Change.Id> all() {
-        return Sets.union(fromPatchSetRefs(), fromMetaRefs());
-      }
-    }
-
-    private static ScanResult scanChangeIds(Repository repo) throws IOException {
-      ImmutableSet.Builder<Change.Id> fromPs = ImmutableSet.builder();
-      ImmutableSet.Builder<Change.Id> fromMeta = ImmutableSet.builder();
-      for (Ref r : repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES)) {
-        Change.Id id = Change.Id.fromRef(r.getName());
-        if (id != null) {
-          (r.getName().endsWith(RefNames.META_SUFFIX) ? fromMeta : fromPs).add(id);
-        }
-      }
-      return new AutoValue_ChangeNotes_Factory_ScanResult(fromPs.build(), fromMeta.build());
-    }
   }
 
   private final boolean shouldExist;