Merge changes Ie3972a84,I4bf1d191 into stable-3.2
* changes:
AllChangesIndexer: Parallelize project slice creation
AllChangesIndexer: Avoid scanning for change refs in each slice
diff --git a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
index 309c915..494aa84 100644
--- a/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
+++ b/java/com/google/gerrit/server/index/change/AllChangesIndexer.java
@@ -20,15 +20,16 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
+import com.google.auto.value.AutoValue;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
-import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.entities.RefNames;
import com.google.gerrit.index.SiteIndexer;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
@@ -37,6 +38,7 @@
import com.google.gerrit.server.index.OnlineReindexMode;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.ChangeNotes.Factory.ChangeNotesResult;
+import com.google.gerrit.server.notedb.ChangeNotes.Factory.ScanResult;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.inject.Inject;
@@ -44,11 +46,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.TextProgressMonitor;
@@ -62,6 +66,14 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int PROJECT_SLICE_MAX_REFS = 1000;
+ private static class ProjectsCollectionFailure extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public ProjectsCollectionFailure(String message) {
+ super(message);
+ }
+ }
+
private final ChangeData.Factory changeDataFactory;
private final GitRepositoryManager repoManager;
private final ListeningExecutorService executor;
@@ -85,103 +97,58 @@
this.projectCache = projectCache;
}
- private static class ProjectSlice {
- private final Project.NameKey name;
- private final int slice;
- private final int slices;
+ @AutoValue
+ public abstract static class ProjectSlice {
+ public abstract Project.NameKey name();
- ProjectSlice(Project.NameKey name, int slice, int slices) {
- this.name = name;
- this.slice = slice;
- this.slices = slices;
- }
+ public abstract int slice();
- public Project.NameKey getName() {
- return name;
- }
+ public abstract int slices();
- public int getSlice() {
- return slice;
- }
+ public abstract ScanResult scanResult();
- public int getSlices() {
- return slices;
+ private static ProjectSlice create(Project.NameKey name, int slice, int slices, ScanResult sr) {
+ return new AutoValue_AllChangesIndexer_ProjectSlice(name, slice, slices, sr);
}
}
@Override
public Result indexAll(ChangeIndex index) {
- ProgressMonitor pm = new TextProgressMonitor();
- pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
- List<ProjectSlice> projectSlices = new ArrayList<>();
- int changeCount = 0;
- Stopwatch sw = Stopwatch.createStarted();
- int projectsFailed = 0;
- for (Project.NameKey name : projectCache.all()) {
- try (Repository repo = repoManager.openRepository(name)) {
- // The simplest approach to distribute indexing would be to let each thread grab a project
- // and index it fully. But if a site has one big project and 100s of small projects, then
- // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
- // projects have been reindexed, and only the thread that reindexes the big project is
- // still working. The other threads would idle. Reindexing the big project on a single
- // thread becomes the critical path. Bringing in more CPUs would not speed up things.
- //
- // To avoid such situations, we split big repos into smaller parts and let
- // the thread pool index these smaller parts. This splitting introduces an overhead in the
- // workload setup and there might be additional slow-downs from multiple threads
- // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
- // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
- // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
- // in 2020.
- int size = estimateSize(repo);
- if (size == 0) {
- pm.update(1);
- continue;
- }
- changeCount += size;
- int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
- if (slices > 1) {
- verboseWriter.println("Submitting " + name + " for indexing in " + slices + " slices");
- }
- for (int slice = 0; slice < slices; slice++) {
- projectSlices.add(new ProjectSlice(name, slice, slices));
- }
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error collecting project %s", name);
- projectsFailed++;
- if (projectsFailed > projectCache.all().size() / 2) {
- logger.atSevere().log("Over 50%% of the projects could not be collected: aborted");
- return Result.create(sw, false, 0, 0);
- }
- }
- pm.update(1);
- }
- pm.endTask();
- setTotalWork(changeCount);
+ // The simplest approach to distribute indexing would be to let each thread grab a project
+ // and index it fully. But if a site has one big project and 100s of small projects, then
+ // in the beginning all CPUs would be busy reindexing projects. But soon enough all small
+ // projects have been reindexed, and only the thread that reindexes the big project is
+ // still working. The other threads would idle. Reindexing the big project on a single
+ // thread becomes the critical path. Bringing in more CPUs would not speed up things.
+ //
+ // To avoid such situations, we split big repos into smaller parts and let
+ // the thread pool index these smaller parts. This splitting introduces an overhead in the
+ // workload setup and there might be additional slow-downs from multiple threads
+ // concurrently working on different parts of the same project. But for Wikimedia's Gerrit,
+ // which had 2 big projects, many middle sized ones, and lots of smaller ones, the
+ // splitting of repos into smaller parts reduced indexing time from 1.5 hours to 55 minutes
+ // in 2020.
- // projectSlices are currently grouped by projects. First all slices for project1, followed
- // by all slices for project2, and so on. As workers pick tasks sequentially, multiple threads
- // would typically work concurrently on different slices of the same project. While this is not
- // a big issue, shuffling the list beforehand helps with ungrouping the project slices, so
- // different slices are less likely to be worked on concurrently.
+ Stopwatch sw = Stopwatch.createStarted();
+ List<ProjectSlice> projectSlices;
+ try {
+ projectSlices = new SliceCreator().create();
+ } catch (ProjectsCollectionFailure | InterruptedException | ExecutionException e) {
+ logger.atSevere().log(e.getMessage());
+ return Result.create(sw, false, 0, 0);
+ }
+
+ // Since project slices are created in parallel, they are somewhat shuffled already. However,
+ // the number of threads used to create the project slices doesn't guarantee good randomization.
+ // If the slices are not shuffled well, then multiple threads would typically work concurrently
+ // on different slices of the same project. While this is not a big issue, shuffling the list
+ // beforehand helps with ungrouping the project slices, so different slices are less likely to
+ // be worked on concurrently.
// This shuffling gave a 6% runtime reduction for Wikimedia's Gerrit in 2020.
Collections.shuffle(projectSlices);
return indexAll(index, projectSlices);
}
- private int estimateSize(Repository repo) throws IOException {
- // Estimate size based on IDs that show up in ref names. This is not perfect, since patch set
- // refs may exist for changes whose metadata was never successfully stored. But that's ok, as
- // the estimate is just used as a heuristic for sorting projects.
- long size =
- repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES).stream()
- .map(r -> Change.Id.fromRef(r.getName()))
- .filter(Objects::nonNull)
- .distinct()
- .count();
- return Ints.saturatedCast(size);
- }
-
private SiteIndexer.Result indexAll(ChangeIndex index, List<ProjectSlice> projectSlices) {
Stopwatch sw = Stopwatch.createStarted();
MultiProgressMonitor mpm = new MultiProgressMonitor(progressOut, "Reindexing changes");
@@ -194,9 +161,9 @@
AtomicBoolean ok = new AtomicBoolean(true);
for (ProjectSlice projectSlice : projectSlices) {
- Project.NameKey name = projectSlice.getName();
- int slice = projectSlice.getSlice();
- int slices = projectSlice.getSlices();
+ Project.NameKey name = projectSlice.name();
+ int slice = projectSlice.slice();
+ int slices = projectSlice.slices();
ListenableFuture<?> future =
executor.submit(
reindexProject(
@@ -204,6 +171,7 @@
name,
slice,
slices,
+ projectSlice.scanResult(),
doneTask,
failedTask));
String description = "project " + name + " (" + slice + "/" + slices + ")";
@@ -242,7 +210,13 @@
public Callable<Void> reindexProject(
ChangeIndexer indexer, Project.NameKey project, Task done, Task failed) {
- return reindexProject(indexer, project, 0, 1, done, failed);
+ try (Repository repo = repoManager.openRepository(project)) {
+ return reindexProject(
+ indexer, project, 0, 1, ChangeNotes.Factory.scanChangeIds(repo), done, failed);
+ } catch (IOException e) {
+ logger.atSevere().log(e.getMessage());
+ return null;
+ }
}
public Callable<Void> reindexProject(
@@ -250,9 +224,10 @@
Project.NameKey project,
int slice,
int slices,
+ ScanResult scanResult,
Task done,
Task failed) {
- return new ProjectIndexer(indexer, project, slice, slices, done, failed);
+ return new ProjectIndexer(indexer, project, slice, slices, scanResult, done, failed);
}
private class ProjectIndexer implements Callable<Void> {
@@ -260,6 +235,7 @@
private final Project.NameKey project;
private final int slice;
private final int slices;
+ private final ScanResult scanResult;
private final ProgressMonitor done;
private final ProgressMonitor failed;
@@ -268,32 +244,30 @@
Project.NameKey project,
int slice,
int slices,
+ ScanResult scanResult,
ProgressMonitor done,
ProgressMonitor failed) {
this.indexer = indexer;
this.project = project;
this.slice = slice;
this.slices = slices;
+ this.scanResult = scanResult;
this.done = done;
this.failed = failed;
}
@Override
public Void call() throws Exception {
- try (Repository repo = repoManager.openRepository(project)) {
- OnlineReindexMode.begin();
-
- // Order of scanning changes is undefined. This is ok if we assume that packfile locality is
- // not important for indexing, since sites should have a fully populated DiffSummary cache.
- // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
- // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
- // we don't have concrete proof that improving packfile locality would help.
- notesFactory.scan(repo, project, id -> (id.get() % slices) == slice).forEach(r -> index(r));
- } catch (RepositoryNotFoundException rnfe) {
- logger.atSevere().log(rnfe.getMessage());
- } finally {
- OnlineReindexMode.end();
- }
+ OnlineReindexMode.begin();
+ // Order of scanning changes is undefined. This is ok if we assume that packfile locality is
+ // not important for indexing, since sites should have a fully populated DiffSummary cache.
+ // It does mean that reindexing after invalidating the DiffSummary cache will be expensive,
+ // but the goal is to invalidate that cache as infrequently as we possibly can. And besides,
+ // we don't have concrete proof that improving packfile locality would help.
+ notesFactory
+ .scan(scanResult, project, id -> (id.get() % slices) == slice)
+ .forEach(r -> index(r));
+ OnlineReindexMode.end();
return null;
}
@@ -333,4 +307,63 @@
return "Index all changes of project " + project.get();
}
}
+
+ private class SliceCreator {
+ final Set<ProjectSlice> projectSlices = Sets.newConcurrentHashSet();
+ final AtomicInteger changeCount = new AtomicInteger(0);
+ final AtomicInteger projectsFailed = new AtomicInteger(0);
+ final ProgressMonitor pm = new TextProgressMonitor();
+
+ private List<ProjectSlice> create()
+ throws ProjectsCollectionFailure, InterruptedException, ExecutionException {
+ List<ListenableFuture<?>> futures = new ArrayList<>();
+ pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
+ for (Project.NameKey name : projectCache.all()) {
+ futures.add(executor.submit(new ProjectSliceCreator(name)));
+ }
+
+ Futures.allAsList(futures).get();
+
+ if (projectsFailed.get() > projectCache.all().size() / 2) {
+ throw new ProjectsCollectionFailure(
+ "Over 50%% of the projects could not be collected: aborted");
+ }
+
+ pm.endTask();
+ setTotalWork(changeCount.get());
+ return projectSlices.stream().collect(Collectors.toList());
+ }
+
+ private class ProjectSliceCreator implements Callable<Void> {
+ final Project.NameKey name;
+
+ public ProjectSliceCreator(Project.NameKey name) {
+ this.name = name;
+ }
+
+ @Override
+ public Void call() throws IOException {
+ try (Repository repo = repoManager.openRepository(name)) {
+ ScanResult sr = ChangeNotes.Factory.scanChangeIds(repo);
+ int size = sr.all().size();
+ if (size > 0) {
+ changeCount.addAndGet(size);
+ int slices = 1 + size / PROJECT_SLICE_MAX_REFS;
+ if (slices > 1) {
+ verboseWriter.println(
+ "Submitting " + name + " for indexing in " + slices + " slices");
+ }
+ for (int slice = 0; slice < slices; slice++) {
+ projectSlices.add(ProjectSlice.create(name, slice, slices, sr));
+ }
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error collecting project %s", name);
+ projectsFailed.incrementAndGet();
+ }
+ pm.update(1);
+ return null;
+ }
+ }
+ }
}
diff --git a/java/com/google/gerrit/server/notedb/ChangeNotes.java b/java/com/google/gerrit/server/notedb/ChangeNotes.java
index 36a61cc0..51acf16e 100644
--- a/java/com/google/gerrit/server/notedb/ChangeNotes.java
+++ b/java/com/google/gerrit/server/notedb/ChangeNotes.java
@@ -106,6 +106,29 @@
this.projectCache = projectCache;
}
+ @AutoValue
+ public abstract static class ScanResult {
+ abstract ImmutableSet<Change.Id> fromPatchSetRefs();
+
+ abstract ImmutableSet<Change.Id> fromMetaRefs();
+
+ public SetView<Change.Id> all() {
+ return Sets.union(fromPatchSetRefs(), fromMetaRefs());
+ }
+ }
+
+ public static ScanResult scanChangeIds(Repository repo) throws IOException {
+ ImmutableSet.Builder<Change.Id> fromPs = ImmutableSet.builder();
+ ImmutableSet.Builder<Change.Id> fromMeta = ImmutableSet.builder();
+ for (Ref r : repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES)) {
+ Change.Id id = Change.Id.fromRef(r.getName());
+ if (id != null) {
+ (r.getName().endsWith(RefNames.META_SUFFIX) ? fromMeta : fromPs).add(id);
+ }
+ }
+ return new AutoValue_ChangeNotes_Factory_ScanResult(fromPs.build(), fromMeta.build());
+ }
+
public ChangeNotes createChecked(Change c) {
return createChecked(c.getProject(), c.getId());
}
@@ -213,8 +236,11 @@
public Stream<ChangeNotesResult> scan(
Repository repo, Project.NameKey project, Predicate<Change.Id> changeIdPredicate)
throws IOException {
- ScanResult sr = scanChangeIds(repo);
+ return scan(scanChangeIds(repo), project, changeIdPredicate);
+ }
+ public Stream<ChangeNotesResult> scan(
+ ScanResult sr, Project.NameKey project, Predicate<Change.Id> changeIdPredicate) {
Stream<Change.Id> idStream = sr.all().stream();
if (changeIdPredicate != null) {
idStream = idStream.filter(changeIdPredicate);
@@ -286,29 +312,6 @@
@Nullable
abstract ChangeNotes maybeNotes();
}
-
- @AutoValue
- abstract static class ScanResult {
- abstract ImmutableSet<Change.Id> fromPatchSetRefs();
-
- abstract ImmutableSet<Change.Id> fromMetaRefs();
-
- SetView<Change.Id> all() {
- return Sets.union(fromPatchSetRefs(), fromMetaRefs());
- }
- }
-
- private static ScanResult scanChangeIds(Repository repo) throws IOException {
- ImmutableSet.Builder<Change.Id> fromPs = ImmutableSet.builder();
- ImmutableSet.Builder<Change.Id> fromMeta = ImmutableSet.builder();
- for (Ref r : repo.getRefDatabase().getRefsByPrefix(RefNames.REFS_CHANGES)) {
- Change.Id id = Change.Id.fromRef(r.getName());
- if (id != null) {
- (r.getName().endsWith(RefNames.META_SUFFIX) ? fromMeta : fromPs).add(id);
- }
- }
- return new AutoValue_ChangeNotes_Factory_ScanResult(fromPs.build(), fromMeta.build());
- }
}
private final boolean shouldExist;