Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Do not use timer.getStartTime() when propating timers metrics

Change-Id: I1c74d27d7364c522233bf9fcffe10f56f52a02be
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index cdf573f..4fad8f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -66,7 +66,7 @@
  */
 public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
   private final ReplicationStateListener stateLog;
-  static final String ALL_REFS = "..all..";
+  public static final String ALL_REFS = "..all..";
   static final String ID_KEY = "fetchOneId";
 
   interface Factory {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index c17d5df..3e51e03 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -42,18 +42,21 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
 import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
+import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 
@@ -72,7 +75,7 @@
 
     bind(RevisionReader.class).in(Scopes.SINGLETON);
     bind(ApplyObject.class);
-
+    install(new FactoryModuleBuilder().build(FetchJob.Factory.class));
     install(new PullReplicationApiModule());
 
     install(new FetchRefReplicatedEventModule());
@@ -120,7 +123,8 @@
 
     bind(ConfigParser.class).to(SourceConfigParser.class).in(Scopes.SINGLETON);
 
-    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+    Config replicationConfig = getReplicationConfig();
+    if (replicationConfig.getBoolean("gerrit", "autoReload", false)) {
       bind(ReplicationConfig.class)
           .annotatedWith(MainReplicationConfig.class)
           .to(getReplicationConfigClass());
@@ -132,6 +136,10 @@
       bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
+    if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
+      install(new StreamEventModule());
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
     EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 48964db..fdb4f8f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -17,7 +17,6 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Strings;
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -32,6 +31,7 @@
 import com.google.gerrit.server.project.ProjectResource;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob.Factory;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -42,17 +42,20 @@
   private final WorkQueue workQueue;
   private final DynamicItem<UrlFormatter> urlFormatter;
   private final FetchPreconditions preConditions;
+  private final Factory fetchJobFactory;
 
   @Inject
   public FetchAction(
       FetchCommand command,
       WorkQueue workQueue,
       DynamicItem<UrlFormatter> urlFormatter,
-      FetchPreconditions preConditions) {
+      FetchPreconditions preConditions,
+      FetchJob.Factory fetchJobFactory) {
     this.command = command;
     this.workQueue = workQueue;
     this.urlFormatter = urlFormatter;
     this.preConditions = preConditions;
+    this.fetchJobFactory = fetchJobFactory;
   }
 
   public static class Input {
@@ -104,7 +107,7 @@
             workQueue
                 .getDefaultQueue()
                 .submit(
-                    new FetchJob(command, project, input, PullReplicationApiRequestMetrics.get()));
+                    fetchJobFactory.create(project, input, PullReplicationApiRequestMetrics.get()));
     Optional<String> url =
         urlFormatter
             .get()
@@ -113,38 +116,4 @@
     checkState(url.isPresent());
     return Response.accepted(url.get());
   }
-
-  private static class FetchJob implements Runnable {
-    private static final FluentLogger log = FluentLogger.forEnclosingClass();
-
-    private FetchCommand command;
-    private Project.NameKey project;
-    private FetchAction.Input input;
-    private final PullReplicationApiRequestMetrics apiRequestMetrics;
-
-    public FetchJob(
-        FetchCommand command,
-        Project.NameKey project,
-        FetchAction.Input input,
-        PullReplicationApiRequestMetrics apiRequestMetrics) {
-      this.command = command;
-      this.project = project;
-      this.input = input;
-      this.apiRequestMetrics = apiRequestMetrics;
-    }
-
-    @Override
-    public void run() {
-      try {
-        command.fetchAsync(project, input.label, input.refName, apiRequestMetrics);
-      } catch (InterruptedException
-          | ExecutionException
-          | RemoteConfigurationMissingException
-          | TimeoutException e) {
-        log.atSevere().withCause(e).log(
-            "Exception during the async fetch call for project %s, label %s and ref name %s",
-            project.get(), input.label, input.refName);
-      }
-    }
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
new file mode 100644
index 0000000..e15dd68
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class FetchJob implements Runnable {
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+  public interface Factory {
+    FetchJob create(
+        Project.NameKey project, FetchAction.Input input, PullReplicationApiRequestMetrics metrics);
+  }
+
+  private FetchCommand command;
+  private Project.NameKey project;
+  private FetchAction.Input input;
+  private final PullReplicationApiRequestMetrics metrics;
+
+  @Inject
+  public FetchJob(
+      FetchCommand command,
+      @Assisted Project.NameKey project,
+      @Assisted FetchAction.Input input,
+      @Assisted PullReplicationApiRequestMetrics metrics) {
+    this.command = command;
+    this.project = project;
+    this.input = input;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void run() {
+    try {
+      command.fetchAsync(project, input.label, input.refName, metrics);
+    } catch (InterruptedException
+        | ExecutionException
+        | RemoteConfigurationMissingException
+        | TimeoutException e) {
+      log.atSevere().withCause(e).log(
+          "Exception during the async fetch call for project %s, label %s and ref name %s",
+          project.get(), input.label, input.refName);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
index 3426c03..63d32c4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
@@ -103,10 +103,12 @@
         "Cannot initialize project " + projectName);
   }
 
-  protected boolean initProject(String projectName)
-      throws AuthException, PermissionBackendException {
-    permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
-
+  public boolean initProject(String projectName) throws AuthException, PermissionBackendException {
+    // When triggered internally(for example by consuming stream events) user is not provided
+    // and internal user is returned. Project creation should be always allowed for internal user.
+    if (!userProvider.get().isInternalUser()) {
+      permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
+    }
     Optional<URIish> maybeUri = gerritConfigOps.getGitRepositoryURI(projectName);
     if (!maybeUri.isPresent()) {
       logger.atSevere().log("Cannot initialize project '%s'", projectName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
index 597b66f..8e4e43f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
@@ -16,6 +16,7 @@
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.FetchReplicationMetrics;
 import java.util.Optional;
@@ -28,7 +29,7 @@
 
   public static final String HTTP_HEADER_X_START_TIME_NANOS = "X-StartTimeNanos";
 
-  private Optional<Long> startTimeNanos;
+  private Optional<Long> startTimeNanos = Optional.empty();
   private final AtomicBoolean initialised = new AtomicBoolean();
   private final FetchReplicationMetrics metrics;
 
@@ -59,6 +60,13 @@
             .map(nanoTime -> Math.min(currentTimeNanos(), nanoTime));
   }
 
+  public void start(Event event) {
+    if (!initialised.compareAndSet(false, true)) {
+      throw new IllegalStateException("PullReplicationApiRequestMetrics already initialised");
+    }
+    startTimeNanos = Optional.of(event.eventCreatedOn * 1000 * 1000 * 1000);
+  }
+
   public Optional<Long> stop(String replicationSourceName) {
     return startTimeNanos.map(
         start -> {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index ed919de..9b33c81 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -26,6 +26,7 @@
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.restapi.Url;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.inject.Inject;
@@ -70,7 +71,7 @@
   private final CredentialsFactory credentials;
   private final SourceHttpClient.Factory httpClientFactory;
   private final Source source;
-  private final String instanceLabel;
+  private final String instanceId;
   private final String pluginName;
   private final SyncRefsFilter syncRefsFilter;
 
@@ -81,18 +82,22 @@
       ReplicationConfig replicationConfig,
       SyncRefsFilter syncRefsFilter,
       @PluginName String pluginName,
+      @Nullable @GerritInstanceId String instanceId,
       @Assisted Source source) {
     this.credentials = credentials;
     this.httpClientFactory = httpClientFactory;
     this.source = source;
     this.pluginName = pluginName;
     this.syncRefsFilter = syncRefsFilter;
-    this.instanceLabel =
-        Strings.nullToEmpty(
+    this.instanceId =
+        Optional.ofNullable(
                 replicationConfig.getConfig().getString("replication", null, "instanceLabel"))
+            .orElse(instanceId)
             .trim();
+
     requireNonNull(
-        Strings.emptyToNull(instanceLabel), "replication.instanceLabel cannot be null or empty");
+        Strings.emptyToNull(this.instanceId),
+        "gerrit.instanceId or replication.instanceLabel must be set");
   }
 
   /* (non-Javadoc)
@@ -112,7 +117,7 @@
         new StringEntity(
             String.format(
                 "{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
-                instanceLabel, refName, callAsync),
+                instanceId, refName, callAsync),
             StandardCharsets.UTF_8));
     post.addHeader(new BasicHeader("Content-Type", "application/json"));
     post.addHeader(
@@ -180,7 +185,7 @@
     } else {
       requireNull(revisionData, "DELETE ref-updates cannot be associated with a RevisionData");
     }
-    RevisionInput input = new RevisionInput(instanceLabel, refName, revisionData);
+    RevisionInput input = new RevisionInput(instanceId, refName, revisionData);
 
     String url = formatUrl(project, targetUri, "apply-object");
 
@@ -199,8 +204,7 @@
     }
 
     RevisionData[] inputData = new RevisionData[revisionData.size()];
-    RevisionsInput input =
-        new RevisionsInput(instanceLabel, refName, revisionData.toArray(inputData));
+    RevisionsInput input = new RevisionsInput(instanceId, refName, revisionData.toArray(inputData));
 
     String url = formatUrl(project, targetUri, "apply-objects");
     HttpPost post = new HttpPost(url);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
new file mode 100644
index 0000000..0f092e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob.Factory;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import org.eclipse.jgit.lib.ObjectId;
+
+public class StreamEventListener implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private String instanceId;
+  private WorkQueue workQueue;
+  private ProjectInitializationAction projectInitializationAction;
+
+  private Factory fetchJobFactory;
+  private final Provider<PullReplicationApiRequestMetrics> metricsProvider;
+
+  @Inject
+  public StreamEventListener(
+      @Nullable @GerritInstanceId String instanceId,
+      ProjectInitializationAction projectInitializationAction,
+      WorkQueue workQueue,
+      FetchJob.Factory fetchJobFactory,
+      Provider<PullReplicationApiRequestMetrics> metricsProvider) {
+    this.instanceId = instanceId;
+    this.projectInitializationAction = projectInitializationAction;
+    this.workQueue = workQueue;
+    this.fetchJobFactory = fetchJobFactory;
+    this.metricsProvider = metricsProvider;
+
+    requireNonNull(
+        Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    if (!instanceId.equals(event.instanceId)) {
+      PullReplicationApiRequestMetrics metrics = metricsProvider.get();
+      metrics.start(event);
+      if (event instanceof RefUpdatedEvent) {
+        RefUpdatedEvent refUpdatedEvent = (RefUpdatedEvent) event;
+        if (!isProjectDelete(refUpdatedEvent)) {
+          fetchRefsAsync(
+              refUpdatedEvent.getRefName(),
+              refUpdatedEvent.instanceId,
+              refUpdatedEvent.getProjectNameKey(),
+              metrics);
+        }
+      }
+      if (event instanceof ProjectCreatedEvent) {
+        ProjectCreatedEvent projectCreatedEvent = (ProjectCreatedEvent) event;
+        try {
+          projectInitializationAction.initProject(getProjectRepositoryName(projectCreatedEvent));
+          fetchRefsAsync(
+              FetchOne.ALL_REFS,
+              projectCreatedEvent.instanceId,
+              projectCreatedEvent.getProjectNameKey(),
+              metrics);
+        } catch (AuthException | PermissionBackendException e) {
+          logger.atSevere().withCause(e).log(
+              "Cannot initialise project:%s", projectCreatedEvent.projectName);
+        }
+      }
+    }
+  }
+
+  private boolean isProjectDelete(RefUpdatedEvent event) {
+    return RefNames.isConfigRef(event.getRefName())
+        && ObjectId.zeroId().equals(ObjectId.fromString(event.refUpdate.get().newRev));
+  }
+
+  protected void fetchRefsAsync(
+      String refName,
+      String sourceInstanceId,
+      NameKey projectNameKey,
+      PullReplicationApiRequestMetrics metrics) {
+    FetchAction.Input input = new FetchAction.Input();
+    input.refName = refName;
+    input.label = sourceInstanceId;
+    workQueue.getDefaultQueue().submit(fetchJobFactory.create(projectNameKey, input, metrics));
+  }
+
+  private String getProjectRepositoryName(ProjectCreatedEvent projectCreatedEvent) {
+    return String.format("%s.git", projectCreatedEvent.projectName);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
new file mode 100644
index 0000000..2389678
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
+import com.google.inject.AbstractModule;
+
+public class StreamEventModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b2470a4..f29e572 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -31,8 +31,13 @@
     threads = 3
     authGroup = Public Mirror Group
     authGroup = Second Public Mirror Group
-  [replication]
-    instanceLabel = host-one
+```
+
+And make sure that instanceId is setup in `$site_path/etc/gerrit.config`:
+
+```
+[gerrit]
+    instanceId = host-one
 ```
 
 Then reload the replication plugin to pick up the new configuration:
@@ -121,6 +126,21 @@
 	provided in the remote configuration section which name is equal
 	to instanceLabel.
 
+	Deprecated: This property is kept for backward compatibility and
+	will be removed in the future release. Use [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit.instanceId)
+	instead.
+
+replication.consumeStreamEvents
+:	Use stream events to trigger pull-replication actions alongside the
+	REST approach. This mechanism is useful together with event-broker
+	and multi-site to provide backfill mechanism when a node has to
+	catch up with the events after being unreachable.
+
+	NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
+
+	Default: false
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
@@ -441,7 +461,6 @@
     autoReload = true
     replicateOnStartup = false
 [replication]
-	instanceLabel = host-one
     lockErrorMaxRetries = 5
     maxRetries = 5
 ```
@@ -477,7 +496,6 @@
     autoReload = true
     replicateOnStartup = false
 [replication]
-    instanceLabel = host-one
     lockErrorMaxRetries = 5
     maxRetries = 5
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
index cb5af42..ee5876f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
@@ -23,6 +23,7 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
@@ -95,6 +96,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRef() throws Exception {
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
@@ -122,6 +124,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRefAfterConfigReloaded() throws Exception {
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
@@ -157,6 +160,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranch() throws Exception {
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
     createTestProject(testProjectName);
@@ -190,6 +194,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldAutoReloadConfiguration() throws Exception {
     SourcesCollection sources = getInstance(SourcesCollection.class);
     AutoReloadConfigDecorator autoReloadConfigDecorator =
@@ -202,6 +207,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldAutoReloadConfigurationWhenRemoteConfigAdded() throws Exception {
     FileBasedConfig newRemoteConfig =
         new FileBasedConfig(
@@ -224,6 +230,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldAutoReloadConfigurationWhenRemoteConfigDeleted() throws Exception {
     SourcesCollection sources = getInstance(SourcesCollection.class);
     AutoReloadConfigDecorator autoReloadConfigDecorator =
@@ -259,7 +266,6 @@
   }
 
   private void setReplicationSource(String remoteName) throws IOException {
-    config.setString("replication", null, "instanceLabel", remoteName);
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index ec60793..62f42c3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -27,6 +27,7 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Permission;
 import com.google.gerrit.entities.Project;
@@ -124,6 +125,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRef() throws Exception {
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
@@ -151,6 +153,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranch() throws Exception {
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
     createTestProject(testProjectName);
@@ -185,6 +188,7 @@
 
   @Test
   @UseLocalDisk
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateForceUpdatedBranch() throws Exception {
     boolean forcedPush = true;
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
@@ -254,6 +258,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRefCGitClient() throws Exception {
     AutoReloadConfigDecorator autoReloadConfigDecorator =
         getInstance(AutoReloadConfigDecorator.class);
@@ -289,6 +294,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranchCGitClient() throws Exception {
     AutoReloadConfigDecorator autoReloadConfigDecorator =
         getInstance(AutoReloadConfigDecorator.class);
@@ -330,6 +336,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateProjectDeletion() throws Exception {
     String projectToDelete = project.get();
     setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(projectToDelete));
@@ -358,6 +365,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateHeadUpdate() throws Exception {
     String testProjectName = project.get();
     setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(testProjectName));
@@ -416,7 +424,6 @@
         replicaSuffixes.stream()
             .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
             .collect(toList());
-    config.setString("replication", null, "instanceLabel", remoteName);
     config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
     config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
index 1019b86..7cef485 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
@@ -200,7 +200,6 @@
         replicaSuffixes.stream()
             .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
             .collect(toList());
-    config.setString("replication", null, "instanceLabel", remoteName);
     config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
     config.setString("remote", remoteName, "fetch", "+refs/tags/*:refs/tags/*");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
index fb7f3d1..ce0b9d3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -55,6 +55,8 @@
   int taskId = 1234;
 
   @Mock FetchCommand fetchCommand;
+  @Mock FetchJob fetchJob;
+  @Mock FetchJob.Factory fetchJobFactory;
   @Mock ProjectResource projectResource;
   @Mock WorkQueue workQueue;
   @Mock ScheduledExecutorService exceutorService;
@@ -65,6 +67,7 @@
 
   @Before
   public void setup() {
+    when(fetchJobFactory.create(any(), any(), any())).thenReturn(fetchJob);
     when(workQueue.getDefaultQueue()).thenReturn(exceutorService);
     when(urlFormatter.getRestUrl(anyString())).thenReturn(Optional.of(location));
     when(exceutorService.submit(any(Runnable.class)))
@@ -79,7 +82,9 @@
     when(task.getTaskId()).thenReturn(taskId);
     when(preConditions.canCallFetchApi()).thenReturn(true);
 
-    fetchAction = new FetchAction(fetchCommand, workQueue, urlFormatterDynamicItem, preConditions);
+    fetchAction =
+        new FetchAction(
+            fetchCommand, workQueue, urlFormatterDynamicItem, preConditions, fetchJobFactory);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index c45e7be..00904c7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -80,7 +80,7 @@
 
   String api = "http://gerrit-host";
   String pluginName = "pull-replication";
-  String label = "Replication";
+  String instanceId = "Replication";
   String refName = RefNames.REFS_HEADS + "master";
 
   String expectedPayload =
@@ -172,7 +172,6 @@
     when(replicationConfig.getConfig()).thenReturn(config);
     when(config.getStringList("replication", null, "syncRefs")).thenReturn(new String[0]);
     when(source.getRemoteConfigName()).thenReturn("Replication");
-    when(config.getString("replication", null, "instanceLabel")).thenReturn(label);
 
     HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
     when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult);
@@ -180,7 +179,13 @@
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
         new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            source);
   }
 
   @Test
@@ -204,7 +209,13 @@
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
         new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            source);
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
@@ -223,7 +234,13 @@
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
         new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            source);
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
@@ -245,7 +262,13 @@
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
         new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            source);
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
@@ -334,7 +357,6 @@
 
   @Test
   public void shouldThrowExceptionWhenInstanceLabelIsNull() {
-    when(config.getString("replication", null, "instanceLabel")).thenReturn(null);
     assertThrows(
         NullPointerException.class,
         () ->
@@ -344,12 +366,12 @@
                 replicationConfig,
                 syncRefsFilter,
                 pluginName,
+                null,
                 source));
   }
 
   @Test
   public void shouldTrimInstanceLabel() {
-    when(config.getString("replication", null, "instanceLabel")).thenReturn(" ");
     assertThrows(
         NullPointerException.class,
         () ->
@@ -359,12 +381,12 @@
                 replicationConfig,
                 syncRefsFilter,
                 pluginName,
+                " ",
                 source));
   }
 
   @Test
   public void shouldThrowExceptionWhenInstanceLabelIsEmpty() {
-    when(config.getString("replication", null, "instanceLabel")).thenReturn("");
     assertThrows(
         NullPointerException.class,
         () ->
@@ -374,10 +396,32 @@
                 replicationConfig,
                 syncRefsFilter,
                 pluginName,
+                "",
                 source));
   }
 
   @Test
+  public void shouldUseReplicationLabelWhenProvided()
+      throws ClientProtocolException, IOException, URISyntaxException {
+    when(config.getString("replication", null, "instanceLabel")).thenReturn(instanceId);
+    FetchRestApiClient objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            "",
+            source);
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
   public void shouldCallInitProjectEndpoint() throws IOException, URISyntaxException {
 
     objectUnderTest.initProject(Project.nameKey("test_repo"), new URIish(api));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
new file mode 100644
index 0000000..c673011
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -0,0 +1,150 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import java.util.concurrent.ScheduledExecutorService;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamEventListenerTest {
+
+  private static final String TEST_REF_NAME = "refs/changes/01/1/1";
+  private static final String TEST_PROJECT = "test-project";
+  private static final String INSTANCE_ID = "node_instance_id";
+  private static final String REMOTE_INSTANCE_ID = "remote_node_instance_id";
+
+  @Mock private ProjectInitializationAction projectInitializationAction;
+  @Mock private WorkQueue workQueue;
+  @Mock private ScheduledExecutorService executor;
+  @Mock private FetchJob fetchJob;
+  @Mock private FetchJob.Factory fetchJobFactory;
+  @Captor ArgumentCaptor<Input> inputCaptor;
+  @Mock private PullReplicationApiRequestMetrics metrics;
+
+  private StreamEventListener objectUnderTest;
+
+  @Before
+  public void setup() {
+    when(workQueue.getDefaultQueue()).thenReturn(executor);
+    when(fetchJobFactory.create(eq(Project.nameKey(TEST_PROJECT)), any(), any()))
+        .thenReturn(fetchJob);
+    objectUnderTest =
+        new StreamEventListener(
+            INSTANCE_ID, projectInitializationAction, workQueue, fetchJobFactory, () -> metrics);
+  }
+
+  @Test
+  public void shouldSkipEventsGeneratedByTheSameInstance() {
+    Event event = new RefUpdatedEvent();
+    event.instanceId = INSTANCE_ID;
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldSkipFetchForProjectDeleteEvent() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = RefNames.REFS_CONFIG;
+    refUpdate.newRev = ObjectId.zeroId().getName();
+    refUpdate.project = TEST_PROJECT;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldScheduleFetchJobForRefUpdateEvent() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = TEST_REF_NAME;
+    refUpdate.project = TEST_PROJECT;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(fetchJobFactory).create(eq(Project.nameKey(TEST_PROJECT)), inputCaptor.capture(), any());
+
+    Input input = inputCaptor.getValue();
+    assertThat(input.label).isEqualTo(REMOTE_INSTANCE_ID);
+    assertThat(input.refName).isEqualTo(TEST_REF_NAME);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
+
+  @Test
+  public void shouldCreateProjectForProjectCreatedEvent()
+      throws AuthException, PermissionBackendException {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.projectName = TEST_PROJECT;
+
+    objectUnderTest.onEvent(event);
+
+    verify(projectInitializationAction).initProject(String.format("%s.git", TEST_PROJECT));
+  }
+
+  @Test
+  public void shouldScheduleAllRefsFetchForProjectCreatedEvent() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.projectName = TEST_PROJECT;
+
+    objectUnderTest.onEvent(event);
+
+    verify(fetchJobFactory).create(eq(Project.nameKey(TEST_PROJECT)), inputCaptor.capture(), any());
+
+    Input input = inputCaptor.getValue();
+    assertThat(input.label).isEqualTo(REMOTE_INSTANCE_ID);
+    assertThat(input.refName).isEqualTo(FetchOne.ALL_REFS);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
+}