Separate change indexer threads into interactive/batch pools
This mirrors the behavior for mergeability checks, and allows
isolation of online reindexing threads from interactive threads used
by the current index version.
Change-Id: Id699fdadb325ac7ee7ad12cbf3dffc1a96afc97e
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt
index 0c13b23..02ca6e6 100644
--- a/Documentation/config-gerrit.txt
+++ b/Documentation/config-gerrit.txt
@@ -2036,10 +2036,18 @@
[[index.threads]]index.threads::
+
-Determines the number of threads to use for indexing.
+Number of threads to use for indexing in normal interactive operations.
+
Defaults to 1 if not set, or set to a negative value.
+[[index.batchThreads]]index.batchThreads::
++
+Number of threads to use for indexing in background operations, such as
+online schema upgrades.
++
+If not set or set to a negative value, defaults to using the same
+thread pool as interactive operations.
+
==== Lucene configuration
Open and closed changes are indexed in separate indexes named
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
index cc910ad..676bc71 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
@@ -15,6 +15,7 @@
package com.google.gerrit.lucene;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE;
import static com.google.gerrit.server.index.IndexRewriteImpl.CLOSED_STATUSES;
import static com.google.gerrit.server.index.IndexRewriteImpl.OPEN_STATUSES;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -229,7 +230,7 @@
LuceneChangeIndex(
@GerritServerConfig Config cfg,
SitePaths sitePaths,
- @IndexExecutor ListeningExecutorService executor,
+ @IndexExecutor(INTERACTIVE) ListeningExecutorService executor,
Provider<ReviewDb> db,
ChangeData.Factory changeDataFactory,
FillArgs fillArgs,
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java
index 028d8fb..7cc8a98 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java
@@ -15,6 +15,7 @@
package com.google.gerrit.server.index;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
import static org.eclipse.jgit.lib.RefDatabase.ALL;
import com.google.common.base.Stopwatch;
@@ -127,7 +128,7 @@
ChangeBatchIndexer(SchemaFactory<ReviewDb> schemaFactory,
ChangeData.Factory changeDataFactory,
GitRepositoryManager repoManager,
- @IndexExecutor ListeningExecutorService executor,
+ @IndexExecutor(BATCH) ListeningExecutorService executor,
ChangeIndexer.Factory indexerFactory,
@GerritServerConfig Config config) {
this.schemaFactory = schemaFactory;
@@ -180,7 +181,7 @@
ok.set(false);
}
final ListenableFuture<?> future = executor.submit(reindexProject(
- indexerFactory.create(index), project, doneTask, failedTask,
+ indexerFactory.create(executor, index), project, doneTask, failedTask,
verboseWriter));
futures.add(future);
future.addListener(new Runnable() {
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java
index 437f559..e235379 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java
@@ -55,8 +55,9 @@
LoggerFactory.getLogger(ChangeIndexer.class);
public interface Factory {
- ChangeIndexer create(ChangeIndex index);
- ChangeIndexer create(IndexCollection indexes);
+ ChangeIndexer create(ListeningExecutorService executor, ChangeIndex index);
+ ChangeIndexer create(ListeningExecutorService executor,
+ IndexCollection indexes);
}
private static final Function<Exception, IOException> MAPPER =
@@ -82,10 +83,10 @@
private final ListeningExecutorService executor;
@AssistedInject
- ChangeIndexer(@IndexExecutor ListeningExecutorService executor,
- SchemaFactory<ReviewDb> schemaFactory,
+ ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
ChangeData.Factory changeDataFactory,
ThreadLocalRequestContext context,
+ @Assisted ListeningExecutorService executor,
@Assisted ChangeIndex index) {
this.executor = executor;
this.schemaFactory = schemaFactory;
@@ -96,10 +97,10 @@
}
@AssistedInject
- ChangeIndexer(@IndexExecutor ListeningExecutorService executor,
- SchemaFactory<ReviewDb> schemaFactory,
+ ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
ChangeData.Factory changeDataFactory,
ThreadLocalRequestContext context,
+ @Assisted ListeningExecutorService executor,
@Assisted IndexCollection indexes) {
this.executor = executor;
this.schemaFactory = schemaFactory;
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java
index 0a96d1d..eb97fdc 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java
@@ -17,6 +17,7 @@
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.gerrit.server.git.QueueProvider.QueueType;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
@@ -28,4 +29,5 @@
@Retention(RUNTIME)
@BindingAnnotation
public @interface IndexExecutor {
+ QueueType value();
}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java
index 3aeeef2..87e4832 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java
@@ -14,6 +14,9 @@
package com.google.gerrit.server.index;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE;
+
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.lifecycle.LifecycleModule;
@@ -21,7 +24,6 @@
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.query.change.BasicChangeRewrites;
import com.google.gerrit.server.query.change.ChangeQueryRewriter;
-import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provides;
@@ -48,16 +50,20 @@
}
private final int threads;
- private final ListeningExecutorService indexExecutor;
+ private final ListeningExecutorService interactiveExecutor;
+ private final ListeningExecutorService batchExecutor;
public IndexModule(int threads) {
this.threads = threads;
- this.indexExecutor = null;
+ this.interactiveExecutor = null;
+ this.batchExecutor = null;
}
- public IndexModule(ListeningExecutorService indexExecutor) {
+ public IndexModule(ListeningExecutorService interactiveExecutor,
+ ListeningExecutorService batchExecutor) {
this.threads = -1;
- this.indexExecutor = indexExecutor;
+ this.interactiveExecutor = interactiveExecutor;
+ this.batchExecutor = batchExecutor;
}
@Override
@@ -67,49 +73,54 @@
bind(IndexCollection.class);
listener().to(IndexCollection.class);
factory(ChangeIndexer.Factory.class);
-
- if (indexExecutor != null) {
- bind(ListeningExecutorService.class)
- .annotatedWith(IndexExecutor.class)
- .toInstance(indexExecutor);
- } else {
- install(new IndexExecutorModule(threads));
- }
}
@Provides
+ @Singleton
ChangeIndexer getChangeIndexer(
+ @IndexExecutor(INTERACTIVE) ListeningExecutorService executor,
ChangeIndexer.Factory factory,
IndexCollection indexes) {
- return factory.create(indexes);
+ // Bind default indexer to interactive executor; callers who need a
+ // different executor can use the factory directly.
+ return factory.create(executor, indexes);
}
- private static class IndexExecutorModule extends AbstractModule {
- private final int threads;
-
- private IndexExecutorModule(int threads) {
- this.threads = threads;
+ @Provides
+ @Singleton
+ @IndexExecutor(INTERACTIVE)
+ ListeningExecutorService getInteractiveIndexExecutor(
+ @GerritServerConfig Config config,
+ WorkQueue workQueue) {
+ if (interactiveExecutor != null) {
+ return interactiveExecutor;
}
-
- @Override
- public void configure() {
+ int threads = this.threads;
+ if (threads <= 0) {
+ threads = config.getInt("index", null, "threads", 0);
}
-
- @Provides
- @Singleton
- @IndexExecutor
- ListeningExecutorService getIndexExecutor(
- @GerritServerConfig Config config,
- WorkQueue workQueue) {
- int threads = this.threads;
- if (threads <= 0) {
- threads = config.getInt("index", null, "threads", 0);
- }
- if (threads <= 0) {
- return MoreExecutors.newDirectExecutorService();
- }
- return MoreExecutors.listeningDecorator(
- workQueue.createQueue(threads, "index"));
+ if (threads <= 0) {
+ return MoreExecutors.newDirectExecutorService();
}
+ return MoreExecutors.listeningDecorator(
+ workQueue.createQueue(threads, "Index-Interactive"));
+ }
+
+ @Provides
+ @Singleton
+ @IndexExecutor(BATCH)
+ ListeningExecutorService getBatchIndexExecutor(
+ @IndexExecutor(INTERACTIVE) ListeningExecutorService interactive,
+ @GerritServerConfig Config config,
+ WorkQueue workQueue) {
+ if (batchExecutor != null) {
+ return batchExecutor;
+ }
+ int threads = config.getInt("index", null, "batchThreads", 0);
+ if (threads <= 0) {
+ return interactive;
+ }
+ return MoreExecutors.listeningDecorator(
+ workQueue.createQueue(threads, "Index-Batch"));
}
}