Manage FetchRefReplicatedEventHandler inside multi-site

The FetchRefReplicatedEventHandler logic is moved from pull-replication
into multi-site in Icd41a6f07, so that the local reindexing of changes
following a pull-replication ref-replicated stream event would not cause
a loop of indexing forwarding events.

This move also makes the overall architecture more consistent, where
the responsibility of propagating the change reindexing is kept inside
multi-site.

This is the proper solution to Issue 16934 which was temporary
worked around in I0a28e302e.

Bug: Issue 16934
Change-Id: I6f9c8627b5b07acbf82f85f702240592b8ee263a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/FetchRefReplicatedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/FetchRefReplicatedEventHandler.java
new file mode 100644
index 0000000..d45f526
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/FetchRefReplicatedEventHandler.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+
+public class FetchRefReplicatedEventHandler implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private ChangeIndexer changeIndexer;
+  private final String instanceId;
+
+  @Inject
+  FetchRefReplicatedEventHandler(ChangeIndexer changeIndexer, @GerritInstanceId String instanceId) {
+    this.changeIndexer = changeIndexer;
+    this.instanceId = instanceId;
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    boolean isForwarded = Context.isForwardedEvent();
+    try {
+      Context.setForwardedEvent(true);
+
+      if (event instanceof FetchRefReplicatedEvent && isLocalEvent(event)) {
+        FetchRefReplicatedEvent fetchRefReplicatedEvent = (FetchRefReplicatedEvent) event;
+        if (!RefNames.isNoteDbMetaRef(fetchRefReplicatedEvent.getRefName())
+            || !fetchRefReplicatedEvent
+                .getStatus()
+                .equals(ReplicationState.RefFetchResult.SUCCEEDED.toString())) {
+          return;
+        }
+
+        Project.NameKey projectNameKey = fetchRefReplicatedEvent.getProjectNameKey();
+        logger.atFine().log(
+            "Indexing ref '%s' for project %s",
+            fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
+        Change.Id changeId = Change.Id.fromRef(fetchRefReplicatedEvent.getRefName());
+        if (changeId != null) {
+          changeIndexer.index(projectNameKey, changeId);
+        } else {
+          logger.atWarning().log(
+              "Couldn't get changeId from refName. Skipping indexing of change %s for project %s",
+              fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
+        }
+      }
+    } finally {
+      Context.setForwardedEvent(isForwarded);
+    }
+  }
+
+  private boolean isLocalEvent(Event event) {
+    return instanceId.equals(event.instanceId);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
index 075388f..5f47371 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
@@ -20,6 +20,7 @@
 import com.google.gerrit.extensions.events.ProjectIndexedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +38,7 @@
     DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), GroupIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), ProjectIndexedListener.class).to(IndexEventHandler.class);
+    DynamicSet.bind(binder(), EventListener.class).to(FetchRefReplicatedEventHandler.class);
 
     bind(ProjectChecker.class).to(ProjectCheckerImpl.class);
     bind(GroupChecker.class).to(GroupCheckerImpl.class);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/FetchRefReplicatedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/FetchRefReplicatedEventHandlerTest.java
new file mode 100644
index 0000000..47b6072
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/FetchRefReplicatedEventHandlerTest.java
@@ -0,0 +1,128 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FetchRefReplicatedEventHandlerTest {
+  private static final String LOCAL_INSTANCE_ID = "local-instance-id";
+  private static final String REMOTE_INSTANCE_ID = "remote-instance-id";
+  private ChangeIndexer changeIndexerMock;
+  private FetchRefReplicatedEventHandler fetchRefReplicatedEventHandler;
+  private static URIish sourceUri;
+
+  @Before
+  public void setUp() throws Exception {
+    changeIndexerMock = mock(ChangeIndexer.class);
+    fetchRefReplicatedEventHandler =
+        new FetchRefReplicatedEventHandler(changeIndexerMock, LOCAL_INSTANCE_ID);
+    sourceUri = new URIish("git://aSourceNode/testProject.git");
+  }
+
+  @Test
+  public void onEventShouldIndexExistingChange() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    try {
+      Context.setLocalEvent(true);
+      fetchRefReplicatedEventHandler.onEvent(
+          newFetchRefReplicatedEvent(
+              projectNameKey, ref, ReplicationState.RefFetchResult.SUCCEEDED, LOCAL_INSTANCE_ID));
+      verify(changeIndexerMock, times(1)).index(eq(projectNameKey), eq(changeId));
+    } finally {
+      Context.unsetLocalEvent();
+    }
+  }
+
+  @Test
+  public void onEventShouldNotIndexIfNotLocalEvent() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    fetchRefReplicatedEventHandler.onEvent(
+        newFetchRefReplicatedEvent(
+            projectNameKey, ref, ReplicationState.RefFetchResult.SUCCEEDED, REMOTE_INSTANCE_ID));
+    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
+  }
+
+  @Test
+  public void onEventShouldIndexOnlyMetaRef() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/1";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    fetchRefReplicatedEventHandler.onEvent(
+        newFetchRefReplicatedEvent(
+            projectNameKey, ref, ReplicationState.RefFetchResult.SUCCEEDED, LOCAL_INSTANCE_ID));
+    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
+  }
+
+  @Test
+  public void onEventShouldNotIndexMissingChange() {
+    fetchRefReplicatedEventHandler.onEvent(
+        newFetchRefReplicatedEvent(
+            Project.nameKey("testProject"),
+            "invalidRef",
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            LOCAL_INSTANCE_ID));
+    verify(changeIndexerMock, never()).index(any(), any());
+  }
+
+  @Test
+  public void onEventShouldNotIndexFailingChange() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    fetchRefReplicatedEventHandler.onEvent(
+        newFetchRefReplicatedEvent(
+            projectNameKey, ref, ReplicationState.RefFetchResult.FAILED, LOCAL_INSTANCE_ID));
+    verify(changeIndexerMock, never()).index(any(), any());
+  }
+
+  @Test
+  public void onEventShouldNotIndexNotAttemptedChange() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    fetchRefReplicatedEventHandler.onEvent(
+        newFetchRefReplicatedEvent(
+            projectNameKey, ref, ReplicationState.RefFetchResult.NOT_ATTEMPTED, LOCAL_INSTANCE_ID));
+    verify(changeIndexerMock, never()).index(any(), any());
+  }
+
+  private FetchRefReplicatedEvent newFetchRefReplicatedEvent(
+      Project.NameKey projectNameKey, String ref, RefFetchResult fetchResult, String instanceId) {
+    FetchRefReplicatedEvent event =
+        new FetchRefReplicatedEvent(
+            projectNameKey.get(), ref, sourceUri, fetchResult, RefUpdate.Result.FAST_FORWARD);
+    event.instanceId = instanceId;
+    return event;
+  }
+}