Index account asynchronously and flush upon replication

Account reindexing on remote nodes should not happen immediately
but must queue for asynchronous execution once replication is completed.

Triggering the reindex of the account *before* it arrives on the
All-Users repo would generate two issues:

1. The account would be reindexed with stale data and, when the new
   data will arrive, it won't be reindexed again: we do need to wait
   for the data to arrive through replication.

2. The account index would also populate the cache with stale data
   and even when the new data will arrive the in-memory data would need
   to wait the cache TTL before being visible to the end-user.

NOTE: there is not yet a reliable way to understand *when* the target
      received the new account data. That is the reason why it is
      necessary to wait for *ALL* the nodes to have completed
      replication and then process the account reindexing.

Bug: Issue 12278
Change-Id: Ie442b03f36c1238f6b977e7f5456beff078c9657
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
index 29bfa44..3eea29c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandler.java
@@ -16,12 +16,16 @@
 
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.server.index.account.AccountIndexer;
+import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Index an account using {@link AccountIndexer}. This class is meant to be used on the receiving
@@ -32,12 +36,15 @@
 @Singleton
 public class ForwardedIndexAccountHandler
     extends ForwardedIndexingHandler<Account.Id, AccountIndexEvent> {
+
   private final AccountIndexer indexer;
+  private Map<Account.Id, Operation> accountsToIndex;
 
   @Inject
   ForwardedIndexAccountHandler(AccountIndexer indexer, Configuration config) {
     super(config.index().numStripedLocks());
     this.indexer = indexer;
+    this.accountsToIndex = new HashMap<>();
   }
 
   @Override
@@ -50,4 +57,25 @@
   protected void doDelete(Account.Id id, Optional<AccountIndexEvent> event) {
     throw new UnsupportedOperationException("Delete from account index not supported");
   }
+
+  public synchronized void indexAsync(Account.Id id, Operation operation) {
+    accountsToIndex.put(id, operation);
+  }
+
+  public synchronized void doAsyncIndex() {
+    accountsToIndex =
+        accountsToIndex.entrySet().stream()
+            .filter(e -> !checkedIndex(e))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+  }
+
+  private boolean checkedIndex(Map.Entry<Account.Id, Operation> account) {
+    try {
+      index(account.getKey(), account.getValue(), Optional.empty());
+      return true;
+    } catch (IOException | OrmException e) {
+      log.error("Account {} index failed", account.getKey(), e);
+      return false;
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 0103c38..0c6d3fd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -18,6 +18,7 @@
 import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
 
 import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.server.config.AllUsersName;
 import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
@@ -30,6 +31,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -38,17 +40,20 @@
   private final ForwardedIndexChangeHandler indexChangeHandler;
   private final ForwardedIndexGroupHandler indexGroupHandler;
   private final ForwardedIndexProjectHandler indexProjectHandler;
+  private final AllUsersName allUsersName;
 
   @Inject
   public IndexEventRouter(
       ForwardedIndexAccountHandler indexAccountHandler,
       ForwardedIndexChangeHandler indexChangeHandler,
       ForwardedIndexGroupHandler indexGroupHandler,
-      ForwardedIndexProjectHandler indexProjectHandler) {
+      ForwardedIndexProjectHandler indexProjectHandler,
+      AllUsersName allUsersName) {
     this.indexAccountHandler = indexAccountHandler;
     this.indexChangeHandler = indexChangeHandler;
     this.indexGroupHandler = indexGroupHandler;
     this.indexProjectHandler = indexProjectHandler;
+    this.allUsersName = allUsersName;
   }
 
   @Override
@@ -62,8 +67,7 @@
           Optional.of(changeIndexEvent));
     } else if (sourceEvent instanceof AccountIndexEvent) {
       AccountIndexEvent accountIndexEvent = (AccountIndexEvent) sourceEvent;
-      indexAccountHandler.index(
-          new Account.Id(accountIndexEvent.accountId), INDEX, Optional.of(accountIndexEvent));
+      indexAccountHandler.indexAsync(new Account.Id(accountIndexEvent.accountId), INDEX);
     } else if (sourceEvent instanceof GroupIndexEvent) {
       GroupIndexEvent groupIndexEvent = (GroupIndexEvent) sourceEvent;
       indexGroupHandler.index(groupIndexEvent.groupUUID, INDEX, Optional.of(groupIndexEvent));
@@ -76,4 +80,16 @@
           String.format("Cannot route event %s", sourceEvent.getType()));
     }
   }
+
+  public void onRefReplicated(RefReplicationDoneEvent replicationEvent)
+      throws IOException, OrmException {
+    if (replicationEvent.getProjectNameKey().equals(allUsersName)) {
+      Account.Id accountId = Account.Id.fromRef(replicationEvent.getRefName());
+      if (accountId != null) {
+        indexAccountHandler.index(accountId, INDEX, Optional.empty());
+      } else {
+        indexAccountHandler.doAsyncIndex();
+      }
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 71edfb0..1ff6992 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -19,17 +19,33 @@
 import com.google.gwtorm.server.OrmException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import java.io.IOException;
 
 public class StreamEventRouter implements ForwardedEventRouter<Event> {
   private final ForwardedEventHandler streamEventHandler;
+  private final IndexEventRouter indexEventRouter;
 
   @Inject
-  public StreamEventRouter(ForwardedEventHandler streamEventHandler) {
+  public StreamEventRouter(
+      ForwardedEventHandler streamEventHandler, IndexEventRouter indexEventRouter) {
     this.streamEventHandler = streamEventHandler;
+    this.indexEventRouter = indexEventRouter;
   }
 
   @Override
-  public void route(Event sourceEvent) throws OrmException, PermissionBackendException {
+  public void route(Event sourceEvent)
+      throws OrmException, PermissionBackendException, IOException {
+    if (RefReplicationDoneEvent.TYPE.equals(sourceEvent.getType())) {
+      /* TODO: We currently explicitly ignore the status and result of the replication
+       * event because there isn't a reliable way to understand if the current node was
+       * the replication target and was successful or not.
+       *
+       * It is better to risk to reindex once more rather than missing a reindexing event.
+       */
+      indexEventRouter.onRefReplicated((RefReplicationDoneEvent) sourceEvent);
+    }
+
     streamEventHandler.dispatch(sourceEvent);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
index df7c2fc..81e832a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
@@ -18,6 +18,8 @@
 import static org.mockito.Mockito.verifyZeroInteractions;
 
 import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.server.config.AllUsersName;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexGroupHandler;
@@ -29,6 +31,8 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.util.Optional;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,12 +49,18 @@
   @Mock private ForwardedIndexChangeHandler indexChangeHandler;
   @Mock private ForwardedIndexGroupHandler indexGroupHandler;
   @Mock private ForwardedIndexProjectHandler indexProjectHandler;
+  @Mock private ForwardedEventHandler forwardedEventHandler;
+  private AllUsersName allUsersName = new AllUsersName("All-Users");
 
   @Before
   public void setUp() {
     router =
         new IndexEventRouter(
-            indexAccountHandler, indexChangeHandler, indexGroupHandler, indexProjectHandler);
+            indexAccountHandler,
+            indexChangeHandler,
+            indexGroupHandler,
+            indexProjectHandler,
+            allUsersName);
   }
 
   @Test
@@ -59,15 +69,30 @@
     router.route(event);
 
     verify(indexAccountHandler)
-        .index(
-            new Account.Id(event.accountId),
-            ForwardedIndexingHandler.Operation.INDEX,
-            Optional.of(event));
+        .indexAsync(new Account.Id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
 
     verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
   }
 
   @Test
+  public void streamEventRouterShouldTriggerAccountIndexFlush() throws Exception {
+
+    StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
+
+    final AccountIndexEvent event = new AccountIndexEvent(1);
+    router.route(event);
+
+    verify(indexAccountHandler)
+        .indexAsync(new Account.Id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
+
+    verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+
+    streamEventRouter.route(new RefReplicationDoneEvent(allUsersName.get(), "refs/any", 1));
+
+    verify(indexAccountHandler).doAsyncIndex();
+  }
+
+  @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
     final GroupIndexEvent event = new GroupIndexEvent(groupId);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
index 147f275..ee79e34 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,10 +35,11 @@
 
   private StreamEventRouter router;
   @Mock private ForwardedEventHandler streamEventHandler;
+  @Mock private IndexEventRouter indexEventRouter;
 
   @Before
   public void setUp() {
-    router = new StreamEventRouter(streamEventHandler);
+    router = new StreamEventRouter(streamEventHandler, indexEventRouter);
   }
 
   @Test