Merge branch 'stable-3.3' into stable-3.4
* stable-3.3:
Remove support for vulnerable ES versions
AllChangesIndexer: Schedule slices immediately
MultiProgressMonitor: Support waiting for non-final tasks
Change-Id: I9c8883a4cc38de6692048838ff833b4e9b79f8c3
diff --git a/java/com/google/gerrit/elasticsearch/ElasticVersion.java b/java/com/google/gerrit/elasticsearch/ElasticVersion.java
index c6400df..47fa383 100644
--- a/java/com/google/gerrit/elasticsearch/ElasticVersion.java
+++ b/java/com/google/gerrit/elasticsearch/ElasticVersion.java
@@ -18,9 +18,7 @@
import java.util.regex.Pattern;
public enum ElasticVersion {
- V7_6("7.6.*"),
- V7_7("7.7.*"),
- V7_8("7.8.*");
+ V7_16("7.16.*");
private final String version;
private final Pattern pattern;
diff --git a/java/com/google/gerrit/server/git/MultiProgressMonitor.java b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
index 2d854a5..7e5c99f 100644
--- a/java/com/google/gerrit/server/git/MultiProgressMonitor.java
+++ b/java/com/google/gerrit/server/git/MultiProgressMonitor.java
@@ -28,6 +28,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ProgressMonitor;
@@ -123,6 +125,64 @@
return count;
}
}
+
+ public int getTotal() {
+ return total;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getTotalDisplay(int total) {
+ return String.valueOf(total);
+ }
+ }
+
+ /** Handle for a sub-task whose total work can be updated while the task is in progress. */
+ public class VolatileTask extends Task {
+ protected AtomicInteger volatileTotal;
+ protected AtomicBoolean isTotalFinalized = new AtomicBoolean(false);
+
+ public VolatileTask(String subTaskName) {
+ super(subTaskName, UNKNOWN);
+ volatileTotal = new AtomicInteger(UNKNOWN);
+ }
+
+ /**
+ * Update the total work for this sub-task.
+ *
+ * <p>Intended to be called from a worker thread.
+ *
+ * @param workUnits number of work units to be added to existing total work.
+ */
+ public void updateTotal(int workUnits) {
+ if (!isTotalFinalized.get()) {
+ volatileTotal.addAndGet(workUnits);
+ } else {
+ logger.atWarning().log(
+ "Total work has been finalized on sub-task " + getName() + " and cannot be updated");
+ }
+ }
+
+ /**
+ * Mark the total on this sub-task as unmodifiable.
+ *
+ * <p>Intended to be called from a worker thread.
+ */
+ public void finalizeTotal() {
+ isTotalFinalized.set(true);
+ }
+
+ @Override
+ public int getTotal() {
+ return volatileTotal.get();
+ }
+
+ @Override
+ public String getTotalDisplay(int total) {
+ return super.getTotalDisplay(total) + (isTotalFinalized.get() ? "" : "+");
+ }
}
private final OutputStream out;
@@ -180,6 +240,7 @@
* calls {@link #end()}, the future has an additional {@code maxInterval} to finish before it is
* forcefully cancelled and {@link ExecutionException} is thrown.
*
+ * @see #waitForNonFinalTask(Future, long, TimeUnit)
* @param workerFuture a future that returns when worker threads are finished.
* @param timeoutTime overall timeout for the task; the future is forcefully cancelled if the task
* exceeds the timeout. Non-positive values indicate no timeout.
@@ -189,6 +250,45 @@
*/
public <T> T waitFor(Future<T> workerFuture, long timeoutTime, TimeUnit timeoutUnit)
throws TimeoutException {
+ T t = waitForNonFinalTask(workerFuture, timeoutTime, timeoutUnit);
+ synchronized (this) {
+ if (!done) {
+ // The worker may not have called end() explicitly, which is likely a
+ // programming error.
+ logger.atWarning().log("MultiProgressMonitor worker did not call end() before returning");
+ end();
+ }
+ }
+ sendDone();
+ return t;
+ }
+
+ /**
+ * Wait for a non-final task managed by a {@link Future}, with no timeout.
+ *
+ * @see #waitForNonFinalTask(Future, long, TimeUnit)
+ */
+ public <T> T waitForNonFinalTask(Future<T> workerFuture) {
+ try {
+ return waitForNonFinalTask(workerFuture, 0, null);
+ } catch (TimeoutException e) {
+ throw new IllegalStateException("timout exception without setting a timeout", e);
+ }
+ }
+
+ /**
+ * Wait for a task managed by a {@link Future}. This call does not expect the worker thread to
+ * call {@link #end()}. It is intended to be used to track a non-final task.
+ *
+ * @param workerFuture a future that returns when worker threads are finished.
+ * @param timeoutTime overall timeout for the task; the future is forcefully cancelled if the task
+ * exceeds the timeout. Non-positive values indicate no timeout.
+ * @param timeoutUnit unit for overall task timeout.
+ * @throws TimeoutException if this thread or a worker thread was interrupted, the worker was
+ * cancelled, or timed out waiting for a worker to call {@link #end()}.
+ */
+ public <T> T waitForNonFinalTask(Future<T> workerFuture, long timeoutTime, TimeUnit timeoutUnit)
+ throws TimeoutException {
long overallStart = System.nanoTime();
long deadline;
if (timeoutTime > 0) {
@@ -199,7 +299,7 @@
synchronized (this) {
long left = maxIntervalNanos;
- while (!done) {
+ while (!workerFuture.isDone() && !done) {
long start = System.nanoTime();
try {
NANOSECONDS.timedWait(this, left);
@@ -228,14 +328,8 @@
left = maxIntervalNanos;
}
sendUpdate();
- if (!done && workerFuture.isDone()) {
- // The worker may not have called end() explicitly, which is likely a
- // programming error.
- logger.atWarning().log("MultiProgressMonitor worker did not call end() before returning");
- end();
- }
}
- sendDone();
+ wakeUp();
}
// The loop exits as soon as the worker calls end(), but we give it another
@@ -271,6 +365,18 @@
}
/**
+ * Begin a sub-task whose total work can be updated.
+ *
+ * @param subTask sub-task name.
+ * @return sub-task handle.
+ */
+ public VolatileTask beginVolatileSubTask(String subTask) {
+ VolatileTask task = new VolatileTask(subTask);
+ tasks.add(task);
+ return task;
+ }
+
+ /**
* End the overall task.
*
* <p>Must be called from a worker thread.
@@ -313,6 +419,7 @@
boolean first = true;
for (Task t : tasks) {
int count = t.getCount();
+ int total = t.getTotal();
if (count == 0) {
continue;
}
@@ -327,10 +434,11 @@
if (!Strings.isNullOrEmpty(t.name)) {
s.append(t.name).append(": ");
}
- if (t.total == UNKNOWN) {
+ if (total == UNKNOWN) {
s.append(count);
} else {
- s.append(String.format("%d%% (%d/%d)", count * 100 / t.total, count, t.total));
+ s.append(
+ String.format("%d%% (%d/%s)", count * 100 / total, count, t.getTotalDisplay(total)));
}
}
}
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 30205b6..f176c38 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -14,7 +14,6 @@
package com.google.gerrit.server.index.change;
-import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@@ -22,9 +21,8 @@
import com.google.auto.value.AutoValue;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -34,6 +32,7 @@
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
+import com.google.gerrit.server.git.MultiProgressMonitor.VolatileTask;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.OnlineReindexMode;
import com.google.gerrit.server.notedb.ChangeNotes;
@@ -44,18 +43,13 @@
import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.lib.TextProgressMonitor;
/**
* Implementation that can index all changes on a host or within a project. Used by Gerrit's
@@ -64,6 +58,9 @@
*/
public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private MultiProgressMonitor mpm;
+ private VolatileTask doneTask;
+ private Task failedTask;
private static final int PROJECT_SLICE_MAX_REFS = 1000;
private static class ProjectsCollectionFailure extends Exception {
@@ -130,55 +127,18 @@
// in 2020.
Stopwatch sw = Stopwatch.createStarted();
- List<ProjectSlice> projectSlices;
+ AtomicBoolean ok = new AtomicBoolean(true);
+ mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
+ doneTask = mpm.beginVolatileSubTask("changes");
+ failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
+ List<ListenableFuture<?>> futures;
try {
- projectSlices = new SliceCreator().create();
- } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+ futures = new SliceScheduler(index, ok).schedule();
+ } catch (ProjectsCollectionFailure e) {
logger.atSevere().log(e.getMessage());
return Result.create(sw, false, 0, 0);
}
- // Since project slices are created in parallel, they are somewhat shuffled already. However,
- // the number of threads used to create the project slices doesn't guarantee good randomization.
- // If the slices are not shuffled well, then multiple threads would typically work concurrently
- // on different slices of the same project. While this is not a big issue, shuffling the list
- // beforehand helps with ungrouping the project slices, so different slices are less likely to
- // be worked on concurrently.
- // This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
- Collections.shuffle(projectSlices);
- return indexAll(index, projectSlices);
- }
-
- private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
- Stopwatch sw = Stopwatch.createStarted();
- MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
- Task projTask = mpm.beginSubTask("project-slices", projectSlices.size());
- checkState(totalWork >= 0);
- Task doneTask = mpm.beginSubTask(null, totalWork);
- Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
-
- List<ListenableFuture<?>> futures = new ArrayList<>();
- AtomicBoolean ok = new AtomicBoolean(true);
-
- for (ProjectSlice projectSlice : projectSlices) {
- Project.NameKey name = projectSlice.name();
- int slice = projectSlice.slice();
- int slices = projectSlice.slices();
- ListenableFuture<?> future =
- executor.submit(
- reindexProject(
- indexerFactory.create(executor, index),
- name,
- slice,
- slices,
- projectSlice.scanResult(),
- doneTask,
- failedTask));
- String description = "project " + name + " (" + slice + "/" + slices + ")";
- addErrorListener(future, description, projTask, ok);
- futures.add(future);
- }
-
try {
mpm.waitFor(
transform(
@@ -310,30 +270,53 @@
}
}
- private class SliceCreator {
- final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+ private class SliceScheduler {
+ final ChangeIndex index;
+ final AtomicBoolean ok;
final AtomicInteger changeCount = new AtomicInteger(0);
final AtomicInteger projectsFailed = new AtomicInteger(0);
- final ProgressMonitor pm = new TextProgressMonitor();
+ final List<ListenableFuture<?>> sliceIndexerFutures = new ArrayList<>();
+ final List<ListenableFuture<?>> sliceCreationFutures = new ArrayList<>();
+ VolatileTask projTask = mpm.beginVolatileSubTask("project-slices");
+ Task slicingProjects;
- private List<ProjectSlice> create()
- throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
- List<ListenableFuture<?>> futures = new ArrayList<>();
- pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
- for (Project.NameKey name : projectCache.all()) {
- futures.add(executor.submit(new ProjectSliceCreator(name)));
+ public SliceScheduler(ChangeIndex index, AtomicBoolean ok) {
+ this.index = index;
+ this.ok = ok;
+ }
+
+ private List<ListenableFuture<?>> schedule() throws ProjectsCollectionFailure {
+ ImmutableSortedSet<Project.NameKey> projects = projectCache.all();
+ int projectCount = projects.size();
+ slicingProjects = mpm.beginSubTask("Slicing projects", projectCount);
+ for (Project.NameKey name : projects) {
+ sliceCreationFutures.add(executor.submit(new ProjectSliceCreator(name)));
}
- Futures.allAsList(futures).get();
+ try {
+ mpm.waitForNonFinalTask(
+ transform(
+ successfulAsList(sliceCreationFutures),
+ x -> {
+ projTask.finalizeTotal();
+ doneTask.finalizeTotal();
+ return null;
+ },
+ directExecutor()));
+ } catch (UncheckedExecutionException e) {
+ logger.atSevere().withCause(e).log("Error project slice creation");
+ ok.set(false);
+ }
- if (projectsFailed.get() > projectCache.all().size() / 2) {
+ if (projectsFailed.get() > projectCount / 2) {
throw new ProjectsCollectionFailure(
"Over 50%% of the projects could not be collected: aborted");
}
- pm.endTask();
+ slicingProjects.endTask();
setTotalWork(changeCount.get());
- return projectSlices.stream().collect(Collectors.toList());
+
+ return sliceIndexerFutures;
}
private class ProjectSliceCreator implements Callable<Void> {
@@ -355,15 +338,32 @@
verboseWriter.println(
"Submitting " + name + " for indexing in " + slices + " slices");
}
+
+ doneTask.updateTotal(size);
+ projTask.updateTotal(slices);
+
for (int slice = 0; slice < slices; slice++) {
- projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+ ProjectSlice projectSlice = ProjectSlice.create(name, slice, slices, sr);
+ ListenableFuture<?> future =
+ executor.submit(
+ reindexProject(
+ indexerFactory.create(executor, index),
+ name,
+ slice,
+ slices,
+ projectSlice.scanResult(),
+ doneTask,
+ failedTask));
+ String description = "project " + name + " (" + slice + "/" + slices + ")";
+ addErrorListener(future, description, projTask, ok);
+ sliceIndexerFutures.add(future);
}
}
} catch (IOException e) {
logger.atSevere().withCause(e).log("Error collecting project %s", name);
projectsFailed.incrementAndGet();
}
- pm.update(1);
+ slicingProjects.update(1);
return null;
}
}
diff --git a/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java b/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java
index f23cc10..8480a6d 100644
--- a/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java
+++ b/javatests/com/google/gerrit/acceptance/pgm/ElasticReindexIT.java
@@ -26,7 +26,7 @@
@ConfigSuite.Default
public static Config elasticsearchV7() {
- return getConfig(ElasticVersion.V7_8);
+ return getConfig(ElasticVersion.V7_16);
}
@Override
diff --git a/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java b/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java
index f35bcb7..e72d806 100644
--- a/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java
+++ b/javatests/com/google/gerrit/acceptance/ssh/ElasticIndexIT.java
@@ -28,7 +28,7 @@
@ConfigSuite.Default
public static Config elasticsearchV7() {
- return getConfig(ElasticVersion.V7_8);
+ return getConfig(ElasticVersion.V7_16);
}
@Override
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java b/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java
index c330961..b4fb153 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticContainer.java
@@ -39,12 +39,8 @@
private static String getImageName(ElasticVersion version) {
switch (version) {
- case V7_6:
- return "blacktop/elasticsearch:7.6.2";
- case V7_7:
- return "blacktop/elasticsearch:7.7.1";
- case V7_8:
- return "blacktop/elasticsearch:7.8.1";
+ case V7_16:
+ return "gerritforge/elasticsearch:7.16.2";
}
throw new IllegalStateException("No tests for version: " + version.name());
}
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java
index 4826490..39517d5 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryAccountsTest.java
@@ -36,7 +36,7 @@
public static void startIndexService() {
if (container == null) {
// Only start Elasticsearch once
- container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+ container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
}
}
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java
index d9a4d2e..5d64d0a 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryChangesTest.java
@@ -46,7 +46,7 @@
public static void startIndexService() {
if (container == null) {
// Only start Elasticsearch once
- container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+ container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
client = HttpAsyncClients.createDefault();
client.start();
}
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java
index 0fc96f8..645f889 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryGroupsTest.java
@@ -36,7 +36,7 @@
public static void startIndexService() {
if (container == null) {
// Only start Elasticsearch once
- container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+ container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
}
}
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java
index 1e56af9..8d7f5f8 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticV7QueryProjectsTest.java
@@ -36,7 +36,7 @@
public static void startIndexService() {
if (container == null) {
// Only start Elasticsearch once
- container = ElasticContainer.createAndStart(ElasticVersion.V7_8);
+ container = ElasticContainer.createAndStart(ElasticVersion.V7_16);
}
}
diff --git a/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java b/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java
index 2ce3a2c..bfb332e 100644
--- a/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java
+++ b/javatests/com/google/gerrit/elasticsearch/ElasticVersionTest.java
@@ -22,14 +22,7 @@
public class ElasticVersionTest {
@Test
public void supportedVersion() throws Exception {
- assertThat(ElasticVersion.forVersion("7.6.0")).isEqualTo(ElasticVersion.V7_6);
- assertThat(ElasticVersion.forVersion("7.6.1")).isEqualTo(ElasticVersion.V7_6);
-
- assertThat(ElasticVersion.forVersion("7.7.0")).isEqualTo(ElasticVersion.V7_7);
- assertThat(ElasticVersion.forVersion("7.7.1")).isEqualTo(ElasticVersion.V7_7);
-
- assertThat(ElasticVersion.forVersion("7.8.0")).isEqualTo(ElasticVersion.V7_8);
- assertThat(ElasticVersion.forVersion("7.8.1")).isEqualTo(ElasticVersion.V7_8);
+ assertThat(ElasticVersion.forVersion("7.16.2")).isEqualTo(ElasticVersion.V7_16);
}
@Test