Merge "Use separate topic for interactive indexing" into stable-2.16
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index ed9e6ad..6b631a2 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -5,6 +5,7 @@
 	bootstrapServers = localhost:$KAFKA_PORT
 	securityProtocol = PLAINTEXT
 	indexEventTopic = gerrit_index
+	batchIndexEventTopic = gerrit_batch_index
 	streamEventTopic = gerrit_stream
 	projectListEventTopic = gerrit_list_project
 	cacheEventTopic = gerrit_cache_eviction
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
new file mode 100644
index 0000000..9c430c6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.util.UUID;
+
+@Singleton
+public class BatchIndexEventSubscriber extends AbstractSubcriber {
+  @Inject
+  public BatchIndexEventSubscriber(
+      BrokerApiWrapper brokerApi,
+      IndexEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
+      @BrokerGson Gson gsonProvider,
+      @InstanceId UUID instanceId,
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics) {
+    super(
+        brokerApi,
+        eventRouter,
+        droppedEventListeners,
+        gsonProvider,
+        instanceId,
+        msgLog,
+        subscriberMetrics);
+  }
+
+  @Override
+  protected EventTopic getTopic() {
+    return EventTopic.BATCH_INDEX_TOPIC;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
index 701c2fe..5c1f444 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
@@ -19,10 +19,18 @@
 public interface IndexEventForwarder {
 
   /**
-   * Publish an indexing event to the broker.
+   * Publish an indexing event to the broker using interactive topic.
    *
    * @param event the details of the index event.
    * @return true if successful, otherwise false.
    */
   boolean index(IndexEvent event);
+
+  /**
+   * Publish an indexing event to the broker using batch topic.
+   *
+   * @param event the details of the index event.
+   * @return true if successful, otherwise false.
+   */
+  boolean batchIndex(IndexEvent event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index c2cc3dc..214d47d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -33,4 +33,9 @@
   public boolean index(IndexEvent event) {
     return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
   }
+
+  @Override
+  public boolean batchIndex(IndexEvent event) {
+    return broker.send(EventTopic.BATCH_INDEX_TOPIC.topic(), event);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 7d42acc..11e77a9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -16,6 +16,7 @@
 
 public enum EventTopic {
   INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+  BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
   CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
   PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
   STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index ee3ddbc..ed33efa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -98,10 +98,17 @@
   private void executeIndexChangeTask(String projectName, int id) {
     if (!Context.isForwardedEvent()) {
       ChangeChecker checker = changeChecker.create(projectName + "~" + id);
+
       try {
         checker
             .newIndexEvent(projectName, id, false)
-            .map(event -> new IndexChangeTask(event))
+            .map(
+                event -> {
+                  if (Thread.currentThread().getName().contains("Batch")) {
+                    return new BatchIndexChangeTask(event);
+                  }
+                  return new IndexChangeTask(event);
+                })
             .ifPresent(
                 task -> {
                   if (queuedTasks.add(task)) {
@@ -164,6 +171,37 @@
     }
   }
 
+  class BatchIndexChangeTask extends IndexTask {
+    private final ChangeIndexEvent changeIndexEvent;
+
+    BatchIndexChangeTask(ChangeIndexEvent changeIndexEvent) {
+      this.changeIndexEvent = changeIndexEvent;
+    }
+
+    @Override
+    public void execute() {
+      forwarders.forEach(f -> f.batchIndex(changeIndexEvent));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      IndexChangeTask that = (IndexChangeTask) o;
+      return Objects.equal(changeIndexEvent, that.changeIndexEvent);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(changeIndexEvent);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+    }
+  }
+
   class IndexAccountTask extends IndexTask {
     private final AccountIndexEvent accountIndexEvent;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
index 5fdb07e..ad04fb8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -22,6 +22,7 @@
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
 import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.BatchIndexEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
@@ -58,6 +59,9 @@
       if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
         DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
       }
+      if (config.kafkaSubscriber().enabledEvent(EventTopic.BATCH_INDEX_TOPIC)) {
+        DynamicSet.bind(binder(), AbstractSubcriber.class).to(BatchIndexEventSubscriber.class);
+      }
       if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
         DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
       }