Use separate topic for interactive indexing

Gerrit has two separate executors for interactive and
batch indexing operations, for preventing massive background
reindexing to block the interactive operations that need
immediate feedback.

Feature: Issue 12228
Change-Id: I0d10cd9cde38def0c099b33bb85e71b112d85287
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index ed9e6ad..6b631a2 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -5,6 +5,7 @@
 	bootstrapServers = localhost:$KAFKA_PORT
 	securityProtocol = PLAINTEXT
 	indexEventTopic = gerrit_index
+	batchIndexEventTopic = gerrit_batch_index
 	streamEventTopic = gerrit_stream
 	projectListEventTopic = gerrit_list_project
 	cacheEventTopic = gerrit_cache_eviction
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
new file mode 100644
index 0000000..9c430c6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.util.UUID;
+
+@Singleton
+public class BatchIndexEventSubscriber extends AbstractSubcriber {
+  @Inject
+  public BatchIndexEventSubscriber(
+      BrokerApiWrapper brokerApi,
+      IndexEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
+      @BrokerGson Gson gsonProvider,
+      @InstanceId UUID instanceId,
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
+    super(
+        brokerApi,
+        eventRouter,
+        droppedEventListeners,
+        gsonProvider,
+        instanceId,
+        msgLog,
+        subscriberMetrics);
+  }
+
+  @Override
+  protected EventTopic getTopic() {
+    return EventTopic.BATCH_INDEX_TOPIC;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
index 701c2fe..5c1f444 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
@@ -19,10 +19,18 @@
 public interface IndexEventForwarder {
 
   /**
-   * Publish an indexing event to the broker.
+   * Publish an indexing event to the broker using interactive topic.
    *
    * @param event the details of the index event.
    * @return true if successful, otherwise false.
    */
   boolean index(IndexEvent event);
+
+  /**
+   * Publish an indexing event to the broker using batch topic.
+   *
+   * @param event the details of the index event.
+   * @return true if successful, otherwise false.
+   */
+  boolean batchIndex(IndexEvent event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index c2cc3dc..214d47d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -33,4 +33,9 @@
   public boolean index(IndexEvent event) {
     return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
   }
+
+  @Override
+  public boolean batchIndex(IndexEvent event) {
+    return broker.send(EventTopic.BATCH_INDEX_TOPIC.topic(), event);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 7d42acc..11e77a9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -16,6 +16,7 @@
 
 public enum EventTopic {
   INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+  BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
   CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
   PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
   STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index ee3ddbc..ed33efa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -98,10 +98,17 @@
   private void executeIndexChangeTask(String projectName, int id) {
     if (!Context.isForwardedEvent()) {
       ChangeChecker checker = changeChecker.create(projectName + "~" + id);
+
       try {
         checker
             .newIndexEvent(projectName, id, false)
-            .map(event -> new IndexChangeTask(event))
+            .map(
+                event -> {
+                  if (Thread.currentThread().getName().contains("Batch")) {
+                    return new BatchIndexChangeTask(event);
+                  }
+                  return new IndexChangeTask(event);
+                })
             .ifPresent(
                 task -> {
                   if (queuedTasks.add(task)) {
@@ -164,6 +171,37 @@
     }
   }
 
+  class BatchIndexChangeTask extends IndexTask {
+    private final ChangeIndexEvent changeIndexEvent;
+
+    BatchIndexChangeTask(ChangeIndexEvent changeIndexEvent) {
+      this.changeIndexEvent = changeIndexEvent;
+    }
+
+    @Override
+    public void execute() {
+      forwarders.forEach(f -> f.batchIndex(changeIndexEvent));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      IndexChangeTask that = (IndexChangeTask) o;
+      return Objects.equal(changeIndexEvent, that.changeIndexEvent);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(changeIndexEvent);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+    }
+  }
+
   class IndexAccountTask extends IndexTask {
     private final AccountIndexEvent accountIndexEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
index 5fdb07e..ad04fb8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -22,6 +22,7 @@
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
 import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.BatchIndexEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
@@ -58,6 +59,9 @@
       if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
         DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
       }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.BATCH_INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(BatchIndexEventSubscriber.class);
+      }
       if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
         DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
       }