Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Index account asynchronously and flush upon replication
  Use separate topic for interactive indexing
  Use file-based replication in local test environment

Change-Id: Idd63ec57609e39c338b943878681b3fb34d97071
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 6092cc1..f05184e 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -41,7 +41,7 @@
 [plugin "kafka-events"]
     bootstrapServers = localhost:$KAFKA_PORT
     groupId = $KAFKA_GROUP_ID
-    numberOfSubscribers = 5
+    numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
     enableAutoCommit = true
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index 442c37e..9571513 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -4,6 +4,8 @@
 
 [broker]
         indexEventTopic = gerrit_index
+        batchIndexEventTopic = gerrit_batch_index
         streamEventTopic = gerrit_stream
         projectListEventTopic = gerrit_list_project
         cacheEventTopic = gerrit_cache_eviction
+
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
new file mode 100644
index 0000000..5bbaee0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -0,0 +1,44 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.util.UUID;
+
+@Singleton
+public class BatchIndexEventSubscriber extends AbstractSubcriber {
+  @Inject
+  public BatchIndexEventSubscriber(
+      IndexEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
+      @InstanceId UUID instanceId,
+      MessageLogger msgLog,
+      SubscriberMetrics subscriberMetrics,
+      Configuration cfg) {
+    super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+  }
+
+  @Override
+  protected EventTopic getTopic() {
+    return EventTopic.BATCH_INDEX_TOPIC;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index afd4d09..09adb18 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -28,6 +28,7 @@
     DynamicSet.setOf(binder(), DroppedEventListener.class);
 
     DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(BatchIndexEventSubscriber.class);
     DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
     DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
     DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
index c00c571..0934fd5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
@@ -20,7 +20,11 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Index an account using {@link AccountIndexer}. This class is meant to be used on the receiving
@@ -31,12 +35,15 @@
 @Singleton
 public class ForwardedIndexAccountHandler
     extends ForwardedIndexingHandler<Account.Id, AccountIndexEvent> {
+
   private final AccountIndexer indexer;
+  private Map<Account.Id, Operation> accountsToIndex;
 
   @Inject
   ForwardedIndexAccountHandler(AccountIndexer indexer, Configuration config) {
     super(config.index().numStripedLocks());
     this.indexer = indexer;
+    this.accountsToIndex = new HashMap<>();
   }
 
   @Override
@@ -49,4 +56,25 @@
   protected void doDelete(Account.Id id, Optional<AccountIndexEvent> event) {
     throw new UnsupportedOperationException("Delete from account index not supported");
   }
+
+  public synchronized void indexAsync(Account.Id id, Operation operation) {
+    accountsToIndex.put(id, operation);
+  }
+
+  public synchronized void doAsyncIndex() {
+    accountsToIndex =
+        accountsToIndex.entrySet().stream()
+            .filter(e -> !checkedIndex(e))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+  }
+
+  private boolean checkedIndex(Map.Entry<Account.Id, Operation> account) {
+    try {
+      index(account.getKey(), account.getValue(), Optional.empty());
+      return true;
+    } catch (IOException e) {
+      log.error("Account {} index failed", account.getKey(), e);
+      return false;
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
index 701c2fe..5c1f444 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/IndexEventForwarder.java
@@ -19,10 +19,18 @@
 public interface IndexEventForwarder {
 
   /**
-   * Publish an indexing event to the broker.
+   * Publish an indexing event to the broker using interactive topic.
    *
    * @param event the details of the index event.
    * @return true if successful, otherwise false.
    */
   boolean index(IndexEvent event);
+
+  /**
+   * Publish an indexing event to the broker using batch topic.
+   *
+   * @param event the details of the index event.
+   * @return true if successful, otherwise false.
+   */
+  boolean batchIndex(IndexEvent event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index 0b4252d..a86c62e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -35,4 +35,9 @@
   public boolean index(IndexEvent event) {
     return broker.send(EventTopic.INDEX_TOPIC.topic(cfg), event);
   }
+
+  @Override
+  public boolean batchIndex(IndexEvent event) {
+    return broker.send(EventTopic.BATCH_INDEX_TOPIC.topic(cfg), event);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index eaa2df9..4e7a781 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -18,6 +18,7 @@
 
 public enum EventTopic {
   INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
+  BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
   CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
   PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
   STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index dcaaea2..9f659ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -18,6 +18,7 @@
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
 
 import com.google.gerrit.entities.Account;
+import com.google.gerrit.server.config.AllUsersName;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
@@ -29,6 +30,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -37,17 +39,20 @@
   private final ForwardedIndexChangeHandler indexChangeHandler;
   private final ForwardedIndexGroupHandler indexGroupHandler;
   private final ForwardedIndexProjectHandler indexProjectHandler;
+  private final AllUsersName allUsersName;
 
   @Inject
   public IndexEventRouter(
       ForwardedIndexAccountHandler indexAccountHandler,
       ForwardedIndexChangeHandler indexChangeHandler,
       ForwardedIndexGroupHandler indexGroupHandler,
-      ForwardedIndexProjectHandler indexProjectHandler) {
+      ForwardedIndexProjectHandler indexProjectHandler,
+      AllUsersName allUsersName) {
     this.indexAccountHandler = indexAccountHandler;
     this.indexChangeHandler = indexChangeHandler;
     this.indexGroupHandler = indexGroupHandler;
     this.indexProjectHandler = indexProjectHandler;
+    this.allUsersName = allUsersName;
   }
 
   @Override
@@ -61,8 +66,7 @@
           Optional.of(changeIndexEvent));
     } else if (sourceEvent instanceof AccountIndexEvent) {
       AccountIndexEvent accountIndexEvent = (AccountIndexEvent) sourceEvent;
-      indexAccountHandler.index(
-          Account.id(accountIndexEvent.accountId), INDEX, Optional.of(accountIndexEvent));
+      indexAccountHandler.indexAsync(Account.id(accountIndexEvent.accountId), INDEX);
     } else if (sourceEvent instanceof GroupIndexEvent) {
       GroupIndexEvent groupIndexEvent = (GroupIndexEvent) sourceEvent;
       indexGroupHandler.index(groupIndexEvent.groupUUID, INDEX, Optional.of(groupIndexEvent));
@@ -75,4 +79,15 @@
           String.format("Cannot route event %s", sourceEvent.getType()));
     }
   }
+
+  public void onRefReplicated(RefReplicationDoneEvent replicationEvent) throws IOException {
+    if (replicationEvent.getProjectNameKey().equals(allUsersName)) {
+      Account.Id accountId = Account.Id.fromRef(replicationEvent.getRefName());
+      if (accountId != null) {
+        indexAccountHandler.index(accountId, INDEX, Optional.empty());
+      } else {
+        indexAccountHandler.doAsyncIndex();
+      }
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index d911c00..4ef3426 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -18,17 +18,32 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import java.io.IOException;
 
 public class StreamEventRouter implements ForwardedEventRouter<Event> {
   private final ForwardedEventHandler streamEventHandler;
+  private final IndexEventRouter indexEventRouter;
 
   @Inject
-  public StreamEventRouter(ForwardedEventHandler streamEventHandler) {
+  public StreamEventRouter(
+      ForwardedEventHandler streamEventHandler, IndexEventRouter indexEventRouter) {
     this.streamEventHandler = streamEventHandler;
+    this.indexEventRouter = indexEventRouter;
   }
 
   @Override
-  public void route(Event sourceEvent) throws PermissionBackendException {
+  public void route(Event sourceEvent) throws PermissionBackendException, IOException {
+    if (RefReplicationDoneEvent.TYPE.equals(sourceEvent.getType())) {
+      /* TODO: We currently explicitly ignore the status and result of the replication
+       * event because there isn't a reliable way to understand if the current node was
+       * the replication target and was successful or not.
+       *
+       * It is better to risk to reindex once more rather than missing a reindexing event.
+       */
+      indexEventRouter.onRefReplicated((RefReplicationDoneEvent) sourceEvent);
+    }
+
     streamEventHandler.dispatch(sourceEvent);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index ee3ddbc..ed33efa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -98,10 +98,17 @@
   private void executeIndexChangeTask(String projectName, int id) {
     if (!Context.isForwardedEvent()) {
       ChangeChecker checker = changeChecker.create(projectName + "~" + id);
+
       try {
         checker
             .newIndexEvent(projectName, id, false)
-            .map(event -> new IndexChangeTask(event))
+            .map(
+                event -> {
+                  if (Thread.currentThread().getName().contains("Batch")) {
+                    return new BatchIndexChangeTask(event);
+                  }
+                  return new IndexChangeTask(event);
+                })
             .ifPresent(
                 task -> {
                   if (queuedTasks.add(task)) {
@@ -164,6 +171,37 @@
     }
   }
 
+  class BatchIndexChangeTask extends IndexTask {
+    private final ChangeIndexEvent changeIndexEvent;
+
+    BatchIndexChangeTask(ChangeIndexEvent changeIndexEvent) {
+      this.changeIndexEvent = changeIndexEvent;
+    }
+
+    @Override
+    public void execute() {
+      forwarders.forEach(f -> f.batchIndex(changeIndexEvent));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      IndexChangeTask that = (IndexChangeTask) o;
+      return Objects.equal(changeIndexEvent, that.changeIndexEvent);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(changeIndexEvent);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+    }
+  }
+
   class IndexAccountTask extends IndexTask {
     private final AccountIndexEvent accountIndexEvent;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 923abb2..31ca13d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -19,6 +19,8 @@
 import static org.mockito.Mockito.verifyZeroInteractions;
 
 import com.google.gerrit.entities.Account;
+import com.google.gerrit.server.config.AllUsersName;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexGroupHandler;
@@ -30,6 +32,8 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.util.Optional;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,12 +49,18 @@
   @Mock private ForwardedIndexChangeHandler indexChangeHandler;
   @Mock private ForwardedIndexGroupHandler indexGroupHandler;
   @Mock private ForwardedIndexProjectHandler indexProjectHandler;
+  @Mock private ForwardedEventHandler forwardedEventHandler;
+  private AllUsersName allUsersName = new AllUsersName("All-Users");
 
   @Before
   public void setUp() {
     router =
         new IndexEventRouter(
-            indexAccountHandler, indexChangeHandler, indexGroupHandler, indexProjectHandler);
+            indexAccountHandler,
+            indexChangeHandler,
+            indexGroupHandler,
+            indexProjectHandler,
+            allUsersName);
   }
 
   @Test
@@ -59,15 +69,30 @@
     router.route(event);
 
     verify(indexAccountHandler)
-        .index(
-            Account.id(event.accountId),
-            ForwardedIndexingHandler.Operation.INDEX,
-            Optional.of(event));
+        .indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
 
     verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
   }
 
   @Test
+  public void streamEventRouterShouldTriggerAccountIndexFlush() throws Exception {
+
+    StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
+
+    final AccountIndexEvent event = new AccountIndexEvent(1);
+    router.route(event);
+
+    verify(indexAccountHandler)
+        .indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
+
+    verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+
+    streamEventRouter.route(new RefReplicationDoneEvent(allUsersName.get(), "refs/any", 1));
+
+    verify(indexAccountHandler).doAsyncIndex();
+  }
+
+  @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
     final GroupIndexEvent event = new GroupIndexEvent(groupId);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
index 605eb45..3b4bec5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,10 +35,11 @@
 
   private StreamEventRouter router;
   @Mock private ForwardedEventHandler streamEventHandler;
+  @Mock private IndexEventRouter indexEventRouter;
 
   @Before
   public void setUp() {
-    router = new StreamEventRouter(streamEventHandler);
+    router = new StreamEventRouter(streamEventHandler, indexEventRouter);
   }
 
   @Test