Use stream events to fetch refs and create repositories

Add stream events listener to trigger git fetch references and to create
repositories. This functionality in combination with events-broker and
multi-site provides a backfill mechanism for REST API calls missed when
node was unreachable.

NOTE: This change do not include removing refs, project deletion and
head update.

Bug: Issue 15636
Change-Id: I573d4b403878ad781eeabf2b88416f3b29c25787
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index cee52b5..9b2f905 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -65,7 +65,7 @@
  */
 public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
   private final ReplicationStateListener stateLog;
-  static final String ALL_REFS = "..all..";
+  public static final String ALL_REFS = "..all..";
   static final String ID_KEY = "fetchOneId";
 
   interface Factory {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index c17d5df..ebc36b3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -48,12 +48,14 @@
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
+import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 
@@ -120,7 +122,8 @@
 
     bind(ConfigParser.class).to(SourceConfigParser.class).in(Scopes.SINGLETON);
 
-    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+    Config replicationConfig = getReplicationConfig();
+    if (replicationConfig.getBoolean("gerrit", "autoReload", false)) {
       bind(ReplicationConfig.class)
           .annotatedWith(MainReplicationConfig.class)
           .to(getReplicationConfigClass());
@@ -132,6 +135,10 @@
       bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
+    if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
+      install(new StreamEventModule());
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
     EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 2767f1d..23b7018 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -111,7 +111,7 @@
     return Response.accepted(url.get());
   }
 
-  private static class FetchJob implements Runnable {
+  public static class FetchJob implements Runnable {
     private static final FluentLogger log = FluentLogger.forEnclosingClass();
 
     private FetchCommand command;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
index 01635e6..33c1353 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
@@ -103,10 +103,12 @@
         "Cannot initialize project " + projectName);
   }
 
-  protected boolean initProject(String projectName)
-      throws AuthException, PermissionBackendException {
-    permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
-
+  public boolean initProject(String projectName) throws AuthException, PermissionBackendException {
+    // When triggered internally(for example by consuming stream events) user is not provided
+    // and internal user is returned. Project creation should be always allowed for internal user.
+    if (!userProvider.get().isInternalUser()) {
+      permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
+    }
     Optional<URIish> maybeUri = gerritConfigOps.getGitRepositoryURI(projectName);
     if (!maybeUri.isPresent()) {
       logger.atSevere().log("Cannot initialize project '{}'", projectName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
new file mode 100644
index 0000000..4360fcf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -0,0 +1,106 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchCommand;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import org.eclipse.jgit.lib.ObjectId;
+
+public class StreamEventListener implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private String instanceId;
+  private FetchCommand fetchCommand;
+  private WorkQueue workQueue;
+  private ProjectInitializationAction projectInitializationAction;
+
+  @Inject
+  public StreamEventListener(
+      @Nullable @GerritInstanceId String instanceId,
+      FetchCommand command,
+      ProjectInitializationAction projectInitializationAction,
+      WorkQueue workQueue) {
+    this.instanceId = instanceId;
+    this.fetchCommand = command;
+    this.projectInitializationAction = projectInitializationAction;
+    this.workQueue = workQueue;
+
+    requireNonNull(
+        Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    if (!instanceId.equals(event.instanceId)) {
+      if (event instanceof RefUpdatedEvent) {
+        RefUpdatedEvent refUpdatedEvent = (RefUpdatedEvent) event;
+        if (!isProjectDelete(refUpdatedEvent)) {
+          fetchRefsAsync(
+              refUpdatedEvent.getRefName(),
+              refUpdatedEvent.instanceId,
+              refUpdatedEvent.getProjectNameKey());
+        }
+      }
+      if (event instanceof ProjectCreatedEvent) {
+        ProjectCreatedEvent projectCreatedEvent = (ProjectCreatedEvent) event;
+        try {
+          projectInitializationAction.initProject(getProjectRepositoryName(projectCreatedEvent));
+          fetchRefsAsync(
+              FetchOne.ALL_REFS,
+              projectCreatedEvent.instanceId,
+              projectCreatedEvent.getProjectNameKey());
+        } catch (AuthException | PermissionBackendException e) {
+          logger.atSevere().withCause(e).log(
+              "Cannot initialise project:%s", projectCreatedEvent.projectName);
+        }
+      }
+    }
+  }
+
+  private boolean isProjectDelete(RefUpdatedEvent event) {
+    return RefNames.isConfigRef(event.getRefName())
+        && ObjectId.zeroId().equals(ObjectId.fromString(event.refUpdate.get().newRev));
+  }
+
+  protected void fetchRefsAsync(String refName, String sourceInstanceId, NameKey projectNameKey) {
+    FetchAction.Input input = new FetchAction.Input();
+    input.refName = refName;
+    input.label = sourceInstanceId;
+    workQueue.getDefaultQueue().submit(new FetchJob(fetchCommand, projectNameKey, input));
+  }
+
+  private String getProjectRepositoryName(ProjectCreatedEvent projectCreatedEvent) {
+    return String.format("%s.git", projectCreatedEvent.projectName);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
new file mode 100644
index 0000000..2389678
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
+import com.google.inject.AbstractModule;
+
+public class StreamEventModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 53541ba..f29e572 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -130,6 +130,17 @@
 	will be removed in the future release. Use [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit.instanceId)
 	instead.
 
+replication.consumeStreamEvents
+:	Use stream events to trigger pull-replication actions alongside the
+	REST approach. This mechanism is useful together with event-broker
+	and multi-site to provide backfill mechanism when a node has to
+	catch up with the events after being unreachable.
+
+	NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
+
+	Default: false
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
new file mode 100644
index 0000000..8f80883
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchCommand;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import java.util.concurrent.ScheduledExecutorService;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamEventListenerTest {
+
+  private static final String TEST_PROJECT = "test-project";
+  private static final String INSTANCE_ID = "node_instance_id";
+  private static final String REMOTE_INSTANCE_ID = "remote_node_instance_id";
+
+  @Mock private FetchCommand fetchCommand;
+  @Mock private ProjectInitializationAction projectInitializationAction;
+  @Mock private WorkQueue workQueue;
+  @Mock private ScheduledExecutorService executor;
+
+  private StreamEventListener objectUnderTest;
+
+  @Before
+  public void setup() {
+    when(workQueue.getDefaultQueue()).thenReturn(executor);
+    objectUnderTest =
+        new StreamEventListener(INSTANCE_ID, fetchCommand, projectInitializationAction, workQueue);
+  }
+
+  @Test
+  public void shouldSkipEventsGeneratedByTheSameInstance() {
+    Event event = new RefUpdatedEvent();
+    event.instanceId = INSTANCE_ID;
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldSkipFetchForProjectDeleteEvent() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = RefNames.REFS_CONFIG;
+    refUpdate.newRev = ObjectId.zeroId().getName();
+    refUpdate.project = TEST_PROJECT;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldScheduleFetchJobForRefUpdateEvent() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = "refs/changes/01/1/1";
+    refUpdate.project = TEST_PROJECT;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
+
+  @Test
+  public void shouldCreateProjectForProjectCreatedEvent()
+      throws AuthException, PermissionBackendException {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.projectName = TEST_PROJECT;
+
+    objectUnderTest.onEvent(event);
+
+    verify(projectInitializationAction, times(1))
+        .initProject(String.format("%s.git", TEST_PROJECT));
+  }
+
+  @Test
+  public void shouldScheduleAllRefsFetchForProjectCreatedEvent() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.projectName = TEST_PROJECT;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
+}