Merge "Use separate topic for interactive indexing" into stable-2.16
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
index 29bfa44..3eea29c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
@@ -16,12 +16,16 @@
 
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.server.index.account.AccountIndexer;
+import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Index an account using {@link AccountIndexer}. This class is meant to be used on the receiving
@@ -32,12 +36,15 @@
 @Singleton
 public class ForwardedIndexAccountHandler
     extends ForwardedIndexingHandler<Account.Id, AccountIndexEvent> {
+
   private final AccountIndexer indexer;
+  private Map<Account.Id, Operation> accountsToIndex;
 
   @Inject
   ForwardedIndexAccountHandler(AccountIndexer indexer, Configuration config) {
     super(config.index().numStripedLocks());
     this.indexer = indexer;
+    this.accountsToIndex = new HashMap<>();
   }
 
   @Override
@@ -50,4 +57,25 @@
   protected void doDelete(Account.Id id, Optional<AccountIndexEvent> event) {
     throw new UnsupportedOperationException("Delete from account index not supported");
   }
+
+  public synchronized void indexAsync(Account.Id id, Operation operation) {
+    accountsToIndex.put(id, operation);
+  }
+
+  public synchronized void doAsyncIndex() {
+    accountsToIndex =
+        accountsToIndex.entrySet().stream()
+            .filter(e -> !checkedIndex(e))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+  }
+
+  private boolean checkedIndex(Map.Entry<Account.Id, Operation> account) {
+    try {
+      index(account.getKey(), account.getValue(), Optional.empty());
+      return true;
+    } catch (IOException | OrmException e) {
+      log.error("Account {} index failed", account.getKey(), e);
+      return false;
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 0103c38..0c6d3fd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -18,6 +18,7 @@
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
 
 import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.server.config.AllUsersName;
 import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
@@ -30,6 +31,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -38,17 +40,20 @@
   private final ForwardedIndexChangeHandler indexChangeHandler;
   private final ForwardedIndexGroupHandler indexGroupHandler;
   private final ForwardedIndexProjectHandler indexProjectHandler;
+  private final AllUsersName allUsersName;
 
   @Inject
   public IndexEventRouter(
       ForwardedIndexAccountHandler indexAccountHandler,
       ForwardedIndexChangeHandler indexChangeHandler,
       ForwardedIndexGroupHandler indexGroupHandler,
-      ForwardedIndexProjectHandler indexProjectHandler) {
+      ForwardedIndexProjectHandler indexProjectHandler,
+      AllUsersName allUsersName) {
     this.indexAccountHandler = indexAccountHandler;
     this.indexChangeHandler = indexChangeHandler;
     this.indexGroupHandler = indexGroupHandler;
     this.indexProjectHandler = indexProjectHandler;
+    this.allUsersName = allUsersName;
   }
 
   @Override
@@ -62,8 +67,7 @@
           Optional.of(changeIndexEvent));
     } else if (sourceEvent instanceof AccountIndexEvent) {
       AccountIndexEvent accountIndexEvent = (AccountIndexEvent) sourceEvent;
-      indexAccountHandler.index(
-          new Account.Id(accountIndexEvent.accountId), INDEX, Optional.of(accountIndexEvent));
+      indexAccountHandler.indexAsync(new Account.Id(accountIndexEvent.accountId), INDEX);
     } else if (sourceEvent instanceof GroupIndexEvent) {
       GroupIndexEvent groupIndexEvent = (GroupIndexEvent) sourceEvent;
       indexGroupHandler.index(groupIndexEvent.groupUUID, INDEX, Optional.of(groupIndexEvent));
@@ -76,4 +80,16 @@
           String.format("Cannot route event %s", sourceEvent.getType()));
     }
   }
+
+  public void onRefReplicated(RefReplicationDoneEvent replicationEvent)
+      throws IOException, OrmException {
+    if (replicationEvent.getProjectNameKey().equals(allUsersName)) {
+      Account.Id accountId = Account.Id.fromRef(replicationEvent.getRefName());
+      if (accountId != null) {
+        indexAccountHandler.index(accountId, INDEX, Optional.empty());
+      } else {
+        indexAccountHandler.doAsyncIndex();
+      }
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 71edfb0..1ff6992 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -19,17 +19,33 @@
 import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import java.io.IOException;
 
 public class StreamEventRouter implements ForwardedEventRouter<Event> {
   private final ForwardedEventHandler streamEventHandler;
+  private final IndexEventRouter indexEventRouter;
 
   @Inject
-  public StreamEventRouter(ForwardedEventHandler streamEventHandler) {
+  public StreamEventRouter(
+      ForwardedEventHandler streamEventHandler, IndexEventRouter indexEventRouter) {
     this.streamEventHandler = streamEventHandler;
+    this.indexEventRouter = indexEventRouter;
   }
 
   @Override
-  public void route(Event sourceEvent) throws OrmException, PermissionBackendException {
+  public void route(Event sourceEvent)
+      throws OrmException, PermissionBackendException, IOException {
+    if (RefReplicationDoneEvent.TYPE.equals(sourceEvent.getType())) {
+      /* TODO: We currently explicitly ignore the status and result of the replication
+       * event because there isn't a reliable way to understand if the current node was
+       * the replication target and was successful or not.
+       *
+       * It is better to risk to reindex once more rather than missing a reindexing event.
+       */
+      indexEventRouter.onRefReplicated((RefReplicationDoneEvent) sourceEvent);
+    }
+
     streamEventHandler.dispatch(sourceEvent);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
index df7c2fc..81e832a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
@@ -18,6 +18,8 @@
 import static org.mockito.Mockito.verifyZeroInteractions;
 
 import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.server.config.AllUsersName;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexGroupHandler;
@@ -29,6 +31,8 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.util.Optional;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,12 +49,18 @@
   @Mock private ForwardedIndexChangeHandler indexChangeHandler;
   @Mock private ForwardedIndexGroupHandler indexGroupHandler;
   @Mock private ForwardedIndexProjectHandler indexProjectHandler;
+  @Mock private ForwardedEventHandler forwardedEventHandler;
+  private AllUsersName allUsersName = new AllUsersName("All-Users");
 
   @Before
   public void setUp() {
     router =
         new IndexEventRouter(
-            indexAccountHandler, indexChangeHandler, indexGroupHandler, indexProjectHandler);
+            indexAccountHandler,
+            indexChangeHandler,
+            indexGroupHandler,
+            indexProjectHandler,
+            allUsersName);
   }
 
   @Test
@@ -59,15 +69,30 @@
     router.route(event);
 
     verify(indexAccountHandler)
-        .index(
-            new Account.Id(event.accountId),
-            ForwardedIndexingHandler.Operation.INDEX,
-            Optional.of(event));
+        .indexAsync(new Account.Id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
 
     verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
   }
 
   @Test
+  public void streamEventRouterShouldTriggerAccountIndexFlush() throws Exception {
+
+    StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
+
+    final AccountIndexEvent event = new AccountIndexEvent(1);
+    router.route(event);
+
+    verify(indexAccountHandler)
+        .indexAsync(new Account.Id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
+
+    verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+
+    streamEventRouter.route(new RefReplicationDoneEvent(allUsersName.get(), "refs/any", 1));
+
+    verify(indexAccountHandler).doAsyncIndex();
+  }
+
+  @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
     final GroupIndexEvent event = new GroupIndexEvent(groupId);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
index 147f275..ee79e34 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,10 +35,11 @@
 
   private StreamEventRouter router;
   @Mock private ForwardedEventHandler streamEventHandler;
+  @Mock private IndexEventRouter indexEventRouter;
 
   @Before
   public void setUp() {
-    router = new StreamEventRouter(streamEventHandler);
+    router = new StreamEventRouter(streamEventHandler, indexEventRouter);
   }
 
   @Test