Separate change indexer threads into interactive/batch pools

This mirrors the behavior for mergeability checks, and allows
isolation of online reindexing threads from interactive threads used
by the current index version.

Change-Id: Id699fdadb325ac7ee7ad12cbf3dffc1a96afc97e
diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt
index 0c13b23..02ca6e6 100644
--- a/Documentation/config-gerrit.txt
+++ b/Documentation/config-gerrit.txt
@@ -2036,10 +2036,18 @@
 
 [[index.threads]]index.threads::
 +
-Determines the number of threads to use for indexing.
+Number of threads to use for indexing in normal interactive operations.
 +
 Defaults to 1 if not set, or set to a negative value.
 
+[[index.batchThreads]]index.batchThreads::
++
+Number of threads to use for indexing in background operations, such as
+online schema upgrades.
++
+If not set or set to a negative value, defaults to using the same
+thread pool as interactive operations.
+
 ==== Lucene configuration
 
 Open and closed changes are indexed in separate indexes named
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
index cc910ad..676bc71 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
@@ -15,6 +15,7 @@
 package com.google.gerrit.lucene;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE;
 import static com.google.gerrit.server.index.IndexRewriteImpl.CLOSED_STATUSES;
 import static com.google.gerrit.server.index.IndexRewriteImpl.OPEN_STATUSES;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -229,7 +230,7 @@
   LuceneChangeIndex(
       @GerritServerConfig Config cfg,
       SitePaths sitePaths,
-      @IndexExecutor ListeningExecutorService executor,
+      @IndexExecutor(INTERACTIVE)  ListeningExecutorService executor,
       Provider<ReviewDb> db,
       ChangeData.Factory changeDataFactory,
       FillArgs fillArgs,
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java
index 028d8fb..7cc8a98 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java
@@ -15,6 +15,7 @@
 package com.google.gerrit.server.index;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
 import static org.eclipse.jgit.lib.RefDatabase.ALL;
 
 import com.google.common.base.Stopwatch;
@@ -127,7 +128,7 @@
   ChangeBatchIndexer(SchemaFactory<ReviewDb> schemaFactory,
       ChangeData.Factory changeDataFactory,
       GitRepositoryManager repoManager,
-      @IndexExecutor ListeningExecutorService executor,
+      @IndexExecutor(BATCH) ListeningExecutorService executor,
       ChangeIndexer.Factory indexerFactory,
       @GerritServerConfig Config config) {
     this.schemaFactory = schemaFactory;
@@ -180,7 +181,7 @@
         ok.set(false);
       }
       final ListenableFuture<?> future = executor.submit(reindexProject(
-          indexerFactory.create(index), project, doneTask, failedTask,
+          indexerFactory.create(executor, index), project, doneTask, failedTask,
           verboseWriter));
       futures.add(future);
       future.addListener(new Runnable() {
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java
index 437f559..e235379 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java
@@ -55,8 +55,9 @@
       LoggerFactory.getLogger(ChangeIndexer.class);
 
   public interface Factory {
-    ChangeIndexer create(ChangeIndex index);
-    ChangeIndexer create(IndexCollection indexes);
+    ChangeIndexer create(ListeningExecutorService executor, ChangeIndex index);
+    ChangeIndexer create(ListeningExecutorService executor,
+        IndexCollection indexes);
   }
 
   private static final Function<Exception, IOException> MAPPER =
@@ -82,10 +83,10 @@
   private final ListeningExecutorService executor;
 
   @AssistedInject
-  ChangeIndexer(@IndexExecutor ListeningExecutorService executor,
-      SchemaFactory<ReviewDb> schemaFactory,
+  ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
       ChangeData.Factory changeDataFactory,
       ThreadLocalRequestContext context,
+      @Assisted ListeningExecutorService executor,
       @Assisted ChangeIndex index) {
     this.executor = executor;
     this.schemaFactory = schemaFactory;
@@ -96,10 +97,10 @@
   }
 
   @AssistedInject
-  ChangeIndexer(@IndexExecutor ListeningExecutorService executor,
-      SchemaFactory<ReviewDb> schemaFactory,
+  ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
       ChangeData.Factory changeDataFactory,
       ThreadLocalRequestContext context,
+      @Assisted ListeningExecutorService executor,
       @Assisted IndexCollection indexes) {
     this.executor = executor;
     this.schemaFactory = schemaFactory;
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java
index 0a96d1d..eb97fdc 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexExecutor.java
@@ -17,6 +17,7 @@
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.gerrit.server.git.QueueProvider.QueueType;
 import com.google.inject.BindingAnnotation;
 
 import java.lang.annotation.Retention;
@@ -28,4 +29,5 @@
 @Retention(RUNTIME)
 @BindingAnnotation
 public @interface IndexExecutor {
+  QueueType value();
 }
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java
index 3aeeef2..87e4832 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java
@@ -14,6 +14,9 @@
 
 package com.google.gerrit.server.index;
 
+import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
+import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.lifecycle.LifecycleModule;
@@ -21,7 +24,6 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.query.change.BasicChangeRewrites;
 import com.google.gerrit.server.query.change.ChangeQueryRewriter;
-import com.google.inject.AbstractModule;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provides;
@@ -48,16 +50,20 @@
   }
 
   private final int threads;
-  private final ListeningExecutorService indexExecutor;
+  private final ListeningExecutorService interactiveExecutor;
+  private final ListeningExecutorService batchExecutor;
 
   public IndexModule(int threads) {
     this.threads = threads;
-    this.indexExecutor = null;
+    this.interactiveExecutor = null;
+    this.batchExecutor = null;
   }
 
-  public IndexModule(ListeningExecutorService indexExecutor) {
+  public IndexModule(ListeningExecutorService interactiveExecutor,
+      ListeningExecutorService batchExecutor) {
     this.threads = -1;
-    this.indexExecutor = indexExecutor;
+    this.interactiveExecutor = interactiveExecutor;
+    this.batchExecutor = batchExecutor;
   }
 
   @Override
@@ -67,49 +73,54 @@
     bind(IndexCollection.class);
     listener().to(IndexCollection.class);
     factory(ChangeIndexer.Factory.class);
-
-    if (indexExecutor != null) {
-      bind(ListeningExecutorService.class)
-          .annotatedWith(IndexExecutor.class)
-          .toInstance(indexExecutor);
-    } else {
-      install(new IndexExecutorModule(threads));
-    }
   }
 
   @Provides
+  @Singleton
   ChangeIndexer getChangeIndexer(
+      @IndexExecutor(INTERACTIVE) ListeningExecutorService executor,
       ChangeIndexer.Factory factory,
       IndexCollection indexes) {
-    return factory.create(indexes);
+    // Bind default indexer to interactive executor; callers who need a
+    // different executor can use the factory directly.
+    return factory.create(executor, indexes);
   }
 
-  private static class IndexExecutorModule extends AbstractModule {
-    private final int threads;
-
-    private IndexExecutorModule(int threads) {
-      this.threads = threads;
+  @Provides
+  @Singleton
+  @IndexExecutor(INTERACTIVE)
+  ListeningExecutorService getInteractiveIndexExecutor(
+      @GerritServerConfig Config config,
+      WorkQueue workQueue) {
+    if (interactiveExecutor != null) {
+      return interactiveExecutor;
     }
-
-    @Override
-    public void configure() {
+    int threads = this.threads;
+    if (threads <= 0) {
+      threads = config.getInt("index", null, "threads", 0);
     }
-
-    @Provides
-    @Singleton
-    @IndexExecutor
-    ListeningExecutorService getIndexExecutor(
-        @GerritServerConfig Config config,
-        WorkQueue workQueue) {
-      int threads = this.threads;
-      if (threads <= 0) {
-        threads = config.getInt("index", null, "threads", 0);
-      }
-      if (threads <= 0) {
-        return MoreExecutors.newDirectExecutorService();
-      }
-      return MoreExecutors.listeningDecorator(
-          workQueue.createQueue(threads, "index"));
+    if (threads <= 0) {
+      return MoreExecutors.newDirectExecutorService();
     }
+    return MoreExecutors.listeningDecorator(
+        workQueue.createQueue(threads, "Index-Interactive"));
+  }
+
+  @Provides
+  @Singleton
+  @IndexExecutor(BATCH)
+  ListeningExecutorService getBatchIndexExecutor(
+      @IndexExecutor(INTERACTIVE) ListeningExecutorService interactive,
+      @GerritServerConfig Config config,
+      WorkQueue workQueue) {
+    if (batchExecutor != null) {
+      return batchExecutor;
+    }
+    int threads = config.getInt("index", null, "batchThreads", 0);
+    if (threads <= 0) {
+      return interactive;
+    }
+    return MoreExecutors.listeningDecorator(
+        workQueue.createQueue(threads, "Index-Batch"));
   }
 }