Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Remove support for vulnerable ES versions
  AllChangesIndexer: Schedule slices immediately
  MultiProgressMonitor: Support waiting for non-final tasks

Change-Id: I9c8883a4cc38de6692048838ff833b4e9b79f8c3
diff --git a/java/com/google/gerrit/elasticsearch/ElasticVersion.java b/java/com/google/gerrit/elasticsearch/ElasticVersion.java
index c6400df..47fa383 100644
--- a/java/com/google/gerrit/elasticsearch/ElasticVersion.java
+++ b/java/com/google/gerrit/elasticsearch/ElasticVersion.java
@@ -18,9 +18,7 @@
 import java.util.regex.Pattern;
 
 public enum ElasticVersion {
-  V7_6("7.6.*"),
-  V7_7("7.7.*"),
-  V7_8("7.8.*");
+  V7_16("7.16.*");
 
   private final String version;
   private final Pattern pattern;
diff --git a/java/com/google/gerrit/server/git/MultiProgressMonitor.java b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
index 2d854a5..7e5c99f 100644
--- a/java/com/google/gerrit/server/git/MultiProgressMonitor.java
+++ b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
@@ -28,6 +28,8 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.ProgressMonitor;
 
@@ -123,6 +125,64 @@
         return count;
       }
     }
+
+    public int getTotal() {
+      return total;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getTotalDisplay(int total) {
+      return String.valueOf(total);
+    }
+  }
+
+  /** Handle for a sub-task whose total work can be updated while the task is in progress. */
+  public class VolatileTask extends Task {
+    protected AtomicInteger volatileTotal;
+    protected AtomicBoolean isTotalFinalized = new AtomicBoolean(false);
+
+    public VolatileTask(String subTaskName) {
+      super(subTaskName, UNKNOWN);
+      volatileTotal = new AtomicInteger(UNKNOWN);
+    }
+
+    /**
+     * Update the total work for this sub-task.
+     *
+     * <p>Intended to be called from a worker thread.
+     *
+     * @param workUnits number of work units to be added to existing total work.
+     */
+    public void updateTotal(int workUnits) {
+      if (!isTotalFinalized.get()) {
+        volatileTotal.addAndGet(workUnits);
+      } else {
+        logger.atWarning().log(
+            "Total work has been finalized on sub-task " + getName() + " and cannot be updated");
+      }
+    }
+
+    /**
+     * Mark the total on this sub-task as unmodifiable.
+     *
+     * <p>Intended to be called from a worker thread.
+     */
+    public void finalizeTotal() {
+      isTotalFinalized.set(true);
+    }
+
+    @Override
+    public int getTotal() {
+      return volatileTotal.get();
+    }
+
+    @Override
+    public String getTotalDisplay(int total) {
+      return super.getTotalDisplay(total) + (isTotalFinalized.get() ? "" : "+");
+    }
   }
 
   private final OutputStream out;
@@ -180,6 +240,7 @@
    * calls {@link #end()}, the future has an additional {@code maxInterval} to finish before it is
    * forcefully cancelled and {@link ExecutionException} is thrown.
    *
+   * @see #waitForNonFinalTask(Future, long, TimeUnit)
    * @param workerFuture a future that returns when worker threads are finished.
    * @param timeoutTime overall timeout for the task; the future is forcefully cancelled if the task
    *     exceeds the timeout. Non-positive values indicate no timeout.
@@ -189,6 +250,45 @@
    */
   public <T> T waitFor(Future<T> workerFuture, long timeoutTime, TimeUnit timeoutUnit)
       throws TimeoutException {
+    T t = waitForNonFinalTask(workerFuture, timeoutTime, timeoutUnit);
+    synchronized (this) {
+      if (!done) {
+        // The worker may not have called end() explicitly, which is likely a
+        // programming error.
+        logger.atWarning().log("MultiProgressMonitor worker did not call end() before returning");
+        end();
+      }
+    }
+    sendDone();
+    return t;
+  }
+
+  /**
+   * Wait for a non-final task managed by a {@link Future}, with no timeout.
+   *
+   * @see #waitForNonFinalTask(Future, long, TimeUnit)
+   */
+  public <T> T waitForNonFinalTask(Future<T> workerFuture) {
+    try {
+      return waitForNonFinalTask(workerFuture, 0, null);
+    } catch (TimeoutException e) {
+      throw new IllegalStateException("timout exception without setting a timeout", e);
+    }
+  }
+
+  /**
+   * Wait for a task managed by a {@link Future}. This call does not expect the worker thread to
+   * call {@link #end()}. It is intended to be used to track a non-final task.
+   *
+   * @param workerFuture a future that returns when worker threads are finished.
+   * @param timeoutTime overall timeout for the task; the future is forcefully cancelled if the task
+   *     exceeds the timeout. Non-positive values indicate no timeout.
+   * @param timeoutUnit unit for overall task timeout.
+   * @throws TimeoutException if this thread or a worker thread was interrupted, the worker was
+   *     cancelled, or timed out waiting for a worker to call {@link #end()}.
+   */
+  public <T> T waitForNonFinalTask(Future<T> workerFuture, long timeoutTime, TimeUnit timeoutUnit)
+      throws TimeoutException {
     long overallStart = System.nanoTime();
     long deadline;
     if (timeoutTime > 0) {
@@ -199,7 +299,7 @@
 
     synchronized (this) {
       long left = maxIntervalNanos;
-      while (!done) {
+      while (!workerFuture.isDone() && !done) {
         long start = System.nanoTime();
         try {
           NANOSECONDS.timedWait(this, left);
@@ -228,14 +328,8 @@
           left = maxIntervalNanos;
         }
         sendUpdate();
-        if (!done && workerFuture.isDone()) {
-          // The worker may not have called end() explicitly, which is likely a
-          // programming error.
-          logger.atWarning().log("MultiProgressMonitor worker did not call end() before returning");
-          end();
-        }
       }
-      sendDone();
+      wakeUp();
     }
 
     // The loop exits as soon as the worker calls end(), but we give it another
@@ -271,6 +365,18 @@
   }
 
   /**
+   * Begin a sub-task whose total work can be updated.
+   *
+   * @param subTask sub-task name.
+   * @return sub-task handle.
+   */
+  public VolatileTask beginVolatileSubTask(String subTask) {
+    VolatileTask task = new VolatileTask(subTask);
+    tasks.add(task);
+    return task;
+  }
+
+  /**
    * End the overall task.
    *
    * <p>Must be called from a worker thread.
@@ -313,6 +419,7 @@
       boolean first = true;
       for (Task t : tasks) {
         int count = t.getCount();
+        int total = t.getTotal();
         if (count == 0) {
           continue;
         }
@@ -327,10 +434,11 @@
         if (!Strings.isNullOrEmpty(t.name)) {
           s.append(t.name).append(": ");
         }
-        if (t.total == UNKNOWN) {
+        if (total == UNKNOWN) {
           s.append(count);
         } else {
-          s.append(String.format("%d%% (%d/%d)", count * 100 / t.total, count, t.total));
+          s.append(
+              String.format("%d%% (%d/%s)", count * 100 / total, count, t.getTotalDisplay(total)));
         }
       }
     }
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 30205b6..f176c38 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -14,7 +14,6 @@
 
 package com.google.gerrit.server.index.change;
 
-import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static com.google.common.util.concurrent.Futures.transform;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@@ -22,9 +21,8 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -34,6 +32,7 @@
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.MultiProgressMonitor;
 import com.google.gerrit.server.git.MultiProgressMonitor.Task;
+import com.google.gerrit.server.git.MultiProgressMonitor.VolatileTask;
 import com.google.gerrit.server.index.IndexExecutor;
 import com.google.gerrit.server.index.OnlineReindexMode;
 import com.google.gerrit.server.notedb.ChangeNotes;
@@ -44,18 +43,13 @@
 import com.google.inject.Inject;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.ProgressMonitor;
 import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.TextProgressMonitor;
 
 /**
  * Implementation that can index all changes on a host or within a project. Used by Gerrit's
@@ -64,6 +58,9 @@
  */
 public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private MultiProgressMonitor mpm;
+  private VolatileTask doneTask;
+  private Task failedTask;
   private static final int PROJECT_SLICE_MAX_REFS = 1000;
 
   private static class ProjectsCollectionFailure extends Exception {
@@ -130,55 +127,18 @@
     // in 2020.
 
     Stopwatch sw = Stopwatch.createStarted();
-    List<ProjectSlice> projectSlices;
+    AtomicBoolean ok = new AtomicBoolean(true);
+    mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
+    doneTask = mpm.beginVolatileSubTask("changes");
+    failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
+    List<ListenableFuture<?>> futures;
     try {
-      projectSlices = new SliceCreator().create();
-    } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+      futures = new SliceScheduler(index, ok).schedule();
+    } catch (ProjectsCollectionFailure e) {
       logger.atSevere().log(e.getMessage());
       return Result.create(sw, false, 0, 0);
     }
 
-    // Since project slices are created in parallel, they are somewhat shuffled already. However,
-    // the number of threads used to create the project slices doesn't guarantee good randomization.
-    // If the slices are not shuffled well, then multiple threads would typically work concurrently
-    // on different slices of the same project. While this is not a big issue, shuffling the list
-    // beforehand helps with ungrouping the project slices, so different slices are less likely to
-    // be worked on concurrently.
-    // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
-    Collections.shuffle(projectSlices);
-    return indexAll(index, projectSlices);
-  }
-
-  private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
-    Stopwatch sw = Stopwatch.createStarted();
-    MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
-    Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
-    checkState(totalWork >= 0);
-    Task doneTask = mpm.beginSubTask(null, totalWork);
-    Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
-
-    List<ListenableFuture<?>> futures = new ArrayList<>();
-    AtomicBoolean ok = new AtomicBoolean(true);
-
-    for (ProjectSlice projectSlice : projectSlices) {
-      Project.NameKey name = projectSlice.name();
-      int slice = projectSlice.slice();
-      int slices = projectSlice.slices();
-      ListenableFuture<?> future =
-          executor.submit(
-              reindexProject(
-                  indexerFactory.create(executor, index),
-                  name,
-                  slice,
-                  slices,
-                  projectSlice.scanResult(),
-                  doneTask,
-                  failedTask));
-      String description = "project " + name + " (" + slice + "/" + slices + ")";
-      addErrorListener(future, description, projTask, ok);
-      futures.add(future);
-    }
-
     try {
       mpm.waitFor(
           transform(
@@ -310,30 +270,53 @@
     }
   }
 
-  private class SliceCreator {
-    final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+  private class SliceScheduler {
+    final ChangeIndex index;
+    final AtomicBoolean ok;
     final AtomicInteger changeCount = new AtomicInteger(0);
     final AtomicInteger projectsFailed = new AtomicInteger(0);
-    final ProgressMonitor pm = new TextProgressMonitor();
+    final List<ListenableFuture<?>> sliceIndexerFutures = new ArrayList<>();
+    final List<ListenableFuture<?>> sliceCreationFutures = new ArrayList<>();
+    VolatileTask projTask = mpm.beginVolatileSubTask("project-slices");
+    Task slicingProjects;
 
-    private List<ProjectSlice> create()
-        throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
-      List<ListenableFuture<?>> futures = new ArrayList<>();
-      pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
-      for (Project.NameKey name : projectCache.all()) {
-        futures.add(executor.submit(new ProjectSliceCreator(name)));
+    public SliceScheduler(ChangeIndex index, AtomicBoolean ok) {
+      this.index = index;
+      this.ok = ok;
+    }
+
+    private List<ListenableFuture<?>> schedule() throws ProjectsCollectionFailure {
+      ImmutableSortedSet<Project.NameKey> projects = projectCache.all();
+      int projectCount = projects.size();
+      slicingProjects = mpm.beginSubTask("Slicing projects", projectCount);
+      for (Project.NameKey name : projects) {
+        sliceCreationFutures.add(executor.submit(new ProjectSliceCreator(name)));
       }
 
-      Futures.allAsList(futures).get();
+      try {
+        mpm.waitForNonFinalTask(
+            transform(
+                successfulAsList(sliceCreationFutures),
+                x -> {
+                  projTask.finalizeTotal();
+                  doneTask.finalizeTotal();
+                  return null;
+                },
+                directExecutor()));
+      } catch (UncheckedExecutionException e) {
+        logger.atSevere().withCause(e).log("Error project slice creation");
+        ok.set(false);
+      }
 
-      if (projectsFailed.get() > projectCache.all().size() / 2) {
+      if (projectsFailed.get() > projectCount / 2) {
         throw new ProjectsCollectionFailure(
             "Over 50%% of the projects could not be collected: aborted");
       }
 
-      pm.endTask();
+      slicingProjects.endTask();
       setTotalWork(changeCount.get());
-      return projectSlices.stream().collect(Collectors.toList());
+
+      return sliceIndexerFutures;
     }
 
     private class ProjectSliceCreator implements Callable<Void> {
@@ -355,15 +338,32 @@
               verboseWriter.println(
                   "Submitting " + name + " for indexing in " + slices + " slices");
             }
+
+            doneTask.updateTotal(size);
+            projTask.updateTotal(slices);
+
             for (int slice = 0; slice < slices; slice++) {
-              projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+              ProjectSlice projectSlice = ProjectSlice.create(name, slice, slices, sr);
+              ListenableFuture<?> future =
+                  executor.submit(
+                      reindexProject(
+                          indexerFactory.create(executor, index),
+                          name,
+                          slice,
+                          slices,
+                          projectSlice.scanResult(),
+                          doneTask,
+                          failedTask));
+              String description = "project " + name + " (" + slice + "/" + slices + ")";
+              addErrorListener(future, description, projTask, ok);
+              sliceIndexerFutures.add(future);
             }
           }
         } catch (IOException e) {
           logger.atSevere().withCause(e).log("Error collecting project %s", name);
           projectsFailed.incrementAndGet();
         }
-        pm.update(1);
+        slicingProjects.update(1);
         return null;
       }
     }
diff --git a/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java b/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java
index f23cc10..8480a6d 100644
--- a/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java
+++ b/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java
@@ -26,7 +26,7 @@
 
   @ConfigSuite.Default
   public static Config elasticsearchV7() {
-    return getConfig(ElasticVersion.V7_8);
+    return getConfig(ElasticVersion.V7_16);
   }
 
   @Override
diff --git a/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java b/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java
index f35bcb7..e72d806 100644
--- a/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java
+++ b/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java
@@ -28,7 +28,7 @@
 
   @ConfigSuite.Default
   public static Config elasticsearchV7() {
-    return getConfig(ElasticVersion.V7_8);
+    return getConfig(ElasticVersion.V7_16);
   }
 
   @Override
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java b/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java
index c330961..b4fb153 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java
@@ -39,12 +39,8 @@
 
   private static String getImageName(ElasticVersion version) {
     switch (version) {
-      case V7_6:
-        return "blacktop/elasticsearch:7.6.2";
-      case V7_7:
-        return "blacktop/elasticsearch:7.7.1";
-      case V7_8:
-        return "blacktop/elasticsearch:7.8.1";
+      case V7_16:
+        return "gerritforge/elasticsearch:7.16.2";
     }
     throw new IllegalStateException("No tests for version: " + version.name());
   }
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java
index 4826490..39517d5 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java
@@ -36,7 +36,7 @@
   public static void startIndexService() {
     if (container == null) {
       // Only start Elasticsearch once
-      container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+      container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
     }
   }
 
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java
index d9a4d2e..5d64d0a 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java
@@ -46,7 +46,7 @@
   public static void startIndexService() {
     if (container == null) {
       // Only start Elasticsearch once
-      container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+      container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
       client = HttpAsyncClients.createDefault();
       client.start();
     }
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java
index 0fc96f8..645f889 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java
@@ -36,7 +36,7 @@
   public static void startIndexService() {
     if (container == null) {
       // Only start Elasticsearch once
-      container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+      container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
     }
   }
 
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java
index 1e56af9..8d7f5f8 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java
@@ -36,7 +36,7 @@
   public static void startIndexService() {
     if (container == null) {
       // Only start Elasticsearch once
-      container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+      container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
     }
   }
 
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java
index 2ce3a2c..bfb332e 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java
@@ -22,14 +22,7 @@
 public class ElasticVersionTest {
   @Test
   public void supportedVersion() throws Exception {
-    assertThat(ElasticVersion.forVersion("7.6.0")).isEqualTo(ElasticVersion.V7_6);
-    assertThat(ElasticVersion.forVersion("7.6.1")).isEqualTo(ElasticVersion.V7_6);
-
-    assertThat(ElasticVersion.forVersion("7.7.0")).isEqualTo(ElasticVersion.V7_7);
-    assertThat(ElasticVersion.forVersion("7.7.1")).isEqualTo(ElasticVersion.V7_7);
-
-    assertThat(ElasticVersion.forVersion("7.8.0")).isEqualTo(ElasticVersion.V7_8);
-    assertThat(ElasticVersion.forVersion("7.8.1")).isEqualTo(ElasticVersion.V7_8);
+    assertThat(ElasticVersion.forVersion("7.16.2")).isEqualTo(ElasticVersion.V7_16);
   }
 
   @Test