Merge changes from topic "asynchronous_fetch" into stable-3.1

* changes:
  Add replication delay for asynchronous fetch calls
  Allow asynchronous fetch calls
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Context.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Context.java
new file mode 100644
index 0000000..34bf3d2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Context.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+/**
+ * Allows to tag event as local to avoid consuming remote events.
+ *
+ * <p>TODO: Gerrit v3.1 doesn't have concept of the instanceId so ThreadLocal must be used. From
+ * Gerrit v3.2 replace ThreadLocal with instanceId.
+ */
+public class Context {
+  private static final ThreadLocal<Boolean> localEvent = ThreadLocal.withInitial(() -> false);
+
+  private Context() {}
+
+  public static Boolean isLocalEvent() {
+    return localEvent.get();
+  }
+
+  public static void setLocalEvent(Boolean b) {
+    localEvent.set(b);
+  }
+
+  public static void unsetLocalEvent() {
+    localEvent.remove();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
index 2d51239..ab16318 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
@@ -118,12 +118,15 @@
       }
       writeStdOut(sb.toString());
       try {
+        Context.setLocalEvent(true);
         dispatcher.postEvent(
             new FetchRefReplicatedEvent(
                 project, ref, resolveNodeName(uri), status, refUpdateResult));
       } catch (PermissionBackendException e) {
         logger.atSevere().withCause(e).log(
             "Cannot post event for ref '%s', project %s", ref, project);
+      } finally {
+        Context.unsetLocalEvent();
       }
     }
 
@@ -200,9 +203,12 @@
 
     private void postEvent(RefEvent event) {
       try {
+        Context.setLocalEvent(true);
         dispatcher.postEvent(event);
       } catch (PermissionBackendException e) {
         logger.atSevere().withCause(e).log("Cannot post event");
+      } finally {
+        Context.unsetLocalEvent();
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 3847426..ea2ce32 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -765,15 +765,20 @@
   private void postReplicationFailedEvent(FetchOne fetchOp, RefUpdate.Result result) {
     Project.NameKey project = fetchOp.getProjectNameKey();
     String sourceNode = resolveNodeName(fetchOp.getURI());
-    for (String ref : fetchOp.getRefs()) {
-      FetchRefReplicatedEvent event =
-          new FetchRefReplicatedEvent(
-              project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
-      try {
-        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
-      } catch (PermissionBackendException e) {
-        repLog.error("error posting event", e);
+    try {
+      Context.setLocalEvent(true);
+      for (String ref : fetchOp.getRefs()) {
+        FetchRefReplicatedEvent event =
+            new FetchRefReplicatedEvent(
+                project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
+        try {
+          eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
+        } catch (PermissionBackendException e) {
+          repLog.error("error posting event", e);
+        }
       }
+    } finally {
+      Context.unsetLocalEvent();
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
index ac7e79b..276aa16 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -26,6 +26,7 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
@@ -78,6 +79,7 @@
     long elapsed = NANOSECONDS.toMillis(context.stop());
 
     try {
+      Context.setLocalEvent(true);
       eventDispatcher
           .get()
           .postEvent(
@@ -90,6 +92,8 @@
     } catch (PermissionBackendException e) {
       logger.atSevere().withCause(e).log(
           "Cannot post event for ref '%s', project %s", refName, name);
+    } finally {
+      Context.unsetLocalEvent();
     }
 
     if (!isSuccessful(refUpdateState.getResult())) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
index fc96394..a618e16 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
@@ -17,10 +17,12 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.index.change.ChangeIndexer;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
 
@@ -35,18 +37,21 @@
 
   @Override
   public void onEvent(Event event) {
-    if (event instanceof FetchRefReplicatedEvent) {
+    if (event instanceof FetchRefReplicatedEvent && isLocalEvent()) {
       FetchRefReplicatedEvent fetchRefReplicatedEvent = (FetchRefReplicatedEvent) event;
+      if (!RefNames.isNoteDbMetaRef(fetchRefReplicatedEvent.getRefName())
+          || !fetchRefReplicatedEvent
+              .getStatus()
+              .equals(ReplicationState.RefFetchResult.SUCCEEDED.toString())) {
+        return;
+      }
+
       Project.NameKey projectNameKey = fetchRefReplicatedEvent.getProjectNameKey();
       logger.atFine().log(
           "Indexing ref '%s' for project %s",
           fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
-
       Change.Id changeId = Change.Id.fromRef(fetchRefReplicatedEvent.getRefName());
-      if (changeId != null
-          && fetchRefReplicatedEvent
-              .getStatus()
-              .equals(ReplicationState.RefFetchResult.SUCCEEDED.toString())) {
+      if (changeId != null) {
         changeIndexer.index(projectNameKey, changeId);
       } else {
         logger.atWarning().log(
@@ -55,4 +60,8 @@
       }
     }
   }
+
+  private boolean isLocalEvent() {
+    return Context.isLocalEvent();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
index 1d87195..e528eca 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -45,6 +46,26 @@
     Project.NameKey projectNameKey = Project.nameKey("testProject");
     String ref = "refs/changes/41/41/meta";
     Change.Id changeId = Change.Id.fromRef(ref);
+    try {
+      Context.setLocalEvent(true);
+      fetchRefReplicatedEventHandler.onEvent(
+          new FetchRefReplicatedEvent(
+              projectNameKey.get(),
+              ref,
+              "aSourceNode",
+              ReplicationState.RefFetchResult.SUCCEEDED,
+              RefUpdate.Result.FAST_FORWARD));
+      verify(changeIndexerMock, times(1)).index(eq(projectNameKey), eq(changeId));
+    } finally {
+      Context.unsetLocalEvent();
+    }
+  }
+
+  @Test
+  public void onEventShouldNotIndexIfNotLocalEvent() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    Change.Id changeId = Change.Id.fromRef(ref);
     fetchRefReplicatedEventHandler.onEvent(
         new FetchRefReplicatedEvent(
             projectNameKey.get(),
@@ -52,7 +73,22 @@
             "aSourceNode",
             ReplicationState.RefFetchResult.SUCCEEDED,
             RefUpdate.Result.FAST_FORWARD));
-    verify(changeIndexerMock, times(1)).index(eq(projectNameKey), eq(changeId));
+    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
+  }
+
+  @Test
+  public void onEventShouldIndexOnlyMetaRef() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/1";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    fetchRefReplicatedEventHandler.onEvent(
+        new FetchRefReplicatedEvent(
+            projectNameKey.get(),
+            ref,
+            "aSourceNode",
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.FAST_FORWARD));
+    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
   }
 
   @Test