Merge "Bump Jetty version to 9.4.30.v20200611" into stable-2.16
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 2f23ad8..7267ae2 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -21,8 +21,8 @@
 import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ComparisonChain;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.gerrit.index.SiteIndexer;
@@ -43,10 +43,9 @@
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
@@ -58,6 +57,7 @@
 
 public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int PROJECT_SLICE_MAX_REFS = 1000;
 
   private final SchemaFactory<ReviewDb> schemaFactory;
   private final ChangeData.Factory changeDataFactory;
@@ -85,22 +85,27 @@
     this.projectCache = projectCache;
   }
 
-  private static class ProjectHolder implements Comparable<ProjectHolder> {
-    final Project.NameKey name;
-    private final long size;
+  private static class ProjectSlice {
+    private final Project.NameKey name;
+    private final int slice;
+    private final int slices;
 
-    ProjectHolder(Project.NameKey name, long size) {
+    ProjectSlice(Project.NameKey name, int slice, int slices) {
       this.name = name;
-      this.size = size;
+      this.slice = slice;
+      this.slices = slices;
     }
 
-    @Override
-    public int compareTo(ProjectHolder other) {
-      // Sort projects based on size first to maximize utilization of threads early on.
-      return ComparisonChain.start()
-          .compare(other.size, size)
-          .compare(other.name.get(), name.get())
-          .result();
+    public Project.NameKey getName() {
+      return name;
+    }
+
+    public int getSlice() {
+      return slice;
+    }
+
+    public int getSlices() {
+      return slices;
     }
   }
 
@@ -108,19 +113,39 @@
   public Result indexAll(ChangeIndex index) {
     ProgressMonitor pm = new TextProgressMonitor();
     pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
-    SortedSet<ProjectHolder> projects = new TreeSet<>();
+    List<ProjectSlice> projectSlices = new ArrayList<>();
     int changeCount = 0;
     Stopwatch sw = Stopwatch.createStarted();
     int projectsFailed = 0;
     for (Project.NameKey name : projectCache.all()) {
       try (Repository repo = repoManager.openRepository(name)) {
-        long size = estimateSize(repo);
+        // The simplest approach to distribute indexing would be to let each thread grab a project
+        // and index it fully. But if a site has one big project and 100s of small projects, then
+        // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
+        // projects have been reindexed, and only the thread that reindexes the big project is
+        // still working. The other threads would idle. Reindexing the big project on a single
+        // thread becomes the critical path. Bringing in more CPUs would not speed up things.
+        //
+        // To avoid such situations, we split big repos into smaller parts and let
+        // the thread pool index these smaller parts. This splitting introduces an overhead in the
+        // workload setup and there might be additional slow-downs from multiple threads
+        // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
+        // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
+        // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
+        // in 2020.
+        int size = estimateSize(repo);
         changeCount += size;
-        projects.add(new ProjectHolder(name, size));
+        int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
+        if (slices > 1) {
+          verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices");
+        }
+        for (int slice = 0; slice < slices; slice++) {
+          projectSlices.add(new ProjectSlice(name, slice, slices));
+        }
       } catch (IOException e) {
         logger.atSevere().withCause(e).log("Error collecting project %s", name);
         projectsFailed++;
-        if (projectsFailed > projects.size() / 2) {
+        if (projectsFailed > projectCache.all().size() / 2) {
           logger.atSevere().log("Over 50%% of the projects could not be collected: aborted");
           return new Result(sw, false, 0, 0);
         }
@@ -129,24 +154,34 @@
     }
     pm.endTask();
     setTotalWork(changeCount);
-    return indexAll(index, projects);
+
+    // projectSlices are currently grouped by projects. First all slices for project1, followed
+    // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads
+    // would typically work concurrently on different slices of the same project. While this is not
+    // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so
+    // different slices are less likely to be worked on concurrently.
+    // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
+    Collections.shuffle(projectSlices);
+    return indexAll(index, projectSlices);
   }
 
-  private long estimateSize(Repository repo) throws IOException {
+  private int estimateSize(Repository repo) throws IOException {
     // Estimate size based on IDs that show up in ref names. This is not perfect, since patch set
     // refs may exist for changes whose metadata was never successfully stored. But that's ok, as
     // the estimate is just used as a heuristic for sorting projects.
-    return repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
-        .map(r -> Change.Id.fromRef(r.getName()))
-        .filter(Objects::nonNull)
-        .distinct()
-        .count();
+    long size =
+        repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
+            .map(r -> Change.Id.fromRef(r.getName()))
+            .filter(Objects::nonNull)
+            .distinct()
+            .count();
+    return Ints.saturatedCast(size);
   }
 
-  private SiteIndexer.Result indexAll(ChangeIndex index, SortedSet<ProjectHolder> projects) {
+  private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
     Stopwatch sw = Stopwatch.createStarted();
     MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
-    Task projTask = mpm.beginSubTask("projects", projects.size());
+    Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
     checkState(totalWork >= 0);
     Task doneTask = mpm.beginSubTask(null, totalWork);
     Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
@@ -154,12 +189,21 @@
     List<ListenableFuture<?>> futures = new ArrayList<>();
     AtomicBoolean ok = new AtomicBoolean(true);
 
-    for (ProjectHolder project : projects) {
+    for (ProjectSlice projectSlice : projectSlices) {
+      Project.NameKey name = projectSlice.getName();
+      int slice = projectSlice.getSlice();
+      int slices = projectSlice.getSlices();
       ListenableFuture<?> future =
           executor.submit(
               reindexProject(
-                  indexerFactory.create(executor, index), project.name, doneTask, failedTask));
-      addErrorListener(future, "project " + project.name, projTask, ok);
+                  indexerFactory.create(executor, index),
+                  name,
+                  slice,
+                  slices,
+                  doneTask,
+                  failedTask));
+      String description = "project " + name + " (" + slice + "/" + slices + ")";
+      addErrorListener(future, description, projTask, ok);
       futures.add(future);
     }
 
@@ -194,22 +238,38 @@
 
   public Callable<Void> reindexProject(
       ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) {
-    return new ProjectIndexer(indexer, project, done, failed);
+    return reindexProject(indexer, project, 0, 1, done, failed);
+  }
+
+  public Callable<Void> reindexProject(
+      ChangeIndexer indexer,
+      Project.NameKey project,
+      int slice,
+      int slices,
+      Task done,
+      Task failed) {
+    return new ProjectIndexer(indexer, project, slice, slices, done, failed);
   }
 
   private class ProjectIndexer implements Callable<Void> {
     private final ChangeIndexer indexer;
     private final Project.NameKey project;
+    private final int slice;
+    private final int slices;
     private final ProgressMonitor done;
     private final ProgressMonitor failed;
 
     private ProjectIndexer(
         ChangeIndexer indexer,
         Project.NameKey project,
+        int slice,
+        int slices,
         ProgressMonitor done,
         ProgressMonitor failed) {
       this.indexer = indexer;
       this.project = project;
+      this.slice = slice;
+      this.slices = slices;
       this.done = done;
       this.failed = failed;
     }
@@ -225,7 +285,9 @@
         // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
         // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
         // we don't have concrete proof that improving packfile locality would help.
-        notesFactory.scan(repo, db, project).forEach(r -> index(db, r));
+        notesFactory
+            .scan(repo, db, project, id -> (id.get() % slices) == slice)
+            .forEach(r -> index(db, r));
       } catch (RepositoryNotFoundException rnfe) {
         logger.atSevere().log(rnfe.getMessage());
       } finally {
diff --git a/java/com/google/gerrit/server/notedb/ChangeNotes.java b/java/com/google/gerrit/server/notedb/ChangeNotes.java
index 086b2e2..d548bf3 100644
--- a/java/com/google/gerrit/server/notedb/ChangeNotes.java
+++ b/java/com/google/gerrit/server/notedb/ChangeNotes.java
@@ -305,19 +305,35 @@
 
     public Stream<ChangeNotesResult> scan(Repository repo, ReviewDb db, Project.NameKey project)
         throws IOException {
-      return args.migration.readChanges() ? scanNoteDb(repo, db, project) : scanReviewDb(repo, db);
+      return scan(repo, db, project, null);
     }
 
-    private Stream<ChangeNotesResult> scanReviewDb(Repository repo, ReviewDb db)
+    public Stream<ChangeNotesResult> scan(
+        Repository repo,
+        ReviewDb db,
+        Project.NameKey project,
+        Predicate<Change.Id> changeIdPredicate)
         throws IOException {
+      return args.migration.readChanges()
+          ? scanNoteDb(repo, db, project, changeIdPredicate)
+          : scanReviewDb(repo, db, changeIdPredicate);
+    }
+
+    private Stream<ChangeNotesResult> scanReviewDb(
+        Repository repo, ReviewDb db, Predicate<Change.Id> changeIdPredicate) throws IOException {
       // Scan IDs that might exist in ReviewDb, assuming that each change has at least one patch set
       // ref. Not all changes might exist: some patch set refs might have been written where the
       // corresponding ReviewDb write failed. These will be silently filtered out by the batch get
       // call below, which is intended.
       Set<Change.Id> ids = scanChangeIds(repo).fromPatchSetRefs();
 
+      Stream<Change.Id> idStream = ids.stream();
+      if (changeIdPredicate != null) {
+        idStream = idStream.filter(changeIdPredicate);
+      }
+
       // A batch size of N may overload get(Iterable), so use something smaller, but still >1.
-      return Streams.stream(Iterators.partition(ids.iterator(), 30))
+      return Streams.stream(Iterators.partition(idStream.iterator(), 30))
           .flatMap(
               batch -> {
                 try {
@@ -333,10 +349,23 @@
 
     private Stream<ChangeNotesResult> scanNoteDb(
         Repository repo, ReviewDb db, Project.NameKey project) throws IOException {
+      return scanNoteDb(repo, db, project, null);
+    }
+
+    private Stream<ChangeNotesResult> scanNoteDb(
+        Repository repo,
+        ReviewDb db,
+        Project.NameKey project,
+        Predicate<Change.Id> changeIdPredicate)
+        throws IOException {
       ScanResult sr = scanChangeIds(repo);
       PrimaryStorage defaultStorage = args.migration.changePrimaryStorage();
 
-      return sr.all().stream()
+      Stream<Change.Id> idStream = sr.all().stream();
+      if (changeIdPredicate != null) {
+        idStream = idStream.filter(changeIdPredicate);
+      }
+      return idStream
           .map(id -> scanOneNoteDbChange(db, project, sr, defaultStorage, id))
           .filter(Objects::nonNull);
     }
diff --git a/plugins/replication b/plugins/replication
index 58778e1..9f679b0 160000
--- a/plugins/replication
+++ b/plugins/replication
@@ -1 +1 @@
-Subproject commit 58778e145d83dfa05035626aca62d968f11b7e3d
+Subproject commit 9f679b030aedcb5cdae6810798f2602b62f58fa0