Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Return 400 when deleting/creating an invalid project name

Change-Id: Ic51d36acf6e884acff15cd78b1efbecf61c17f83
diff --git a/BUILD b/BUILD
index 82112d1..5c5f240 100644
--- a/BUILD
+++ b/BUILD
@@ -46,6 +46,6 @@
     ),
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
-         "//plugins/replication:replication",
+        "//plugins/replication:replication",
     ],
 )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/BearerTokenProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/BearerTokenProvider.java
new file mode 100644
index 0000000..be34319
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/BearerTokenProvider.java
@@ -0,0 +1,38 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Config;
+
+@Singleton
+public class BearerTokenProvider implements Provider<Optional<String>> {
+
+  private final Optional<String> bearerToken;
+
+  @Inject
+  public BearerTokenProvider(@GerritServerConfig Config gerritConfig) {
+    this.bearerToken = Optional.ofNullable(gerritConfig.getString("auth", null, "bearerToken"));
+  }
+
+  @Override
+  public Optional<String> get() {
+    return bearerToken;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index cdf573f..4fad8f9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -66,7 +66,7 @@
  */
 public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
   private final ReplicationStateListener stateLog;
-  static final String ALL_REFS = "..all..";
+  public static final String ALL_REFS = "..all..";
   static final String ID_KEY = "fetchOneId";
 
   interface Factory {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/LocalGitRepositoryManagerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/LocalGitRepositoryManagerProvider.java
new file mode 100644
index 0000000..ea8aa00
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/LocalGitRepositoryManagerProvider.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.LocalDiskRepositoryManager;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+@Singleton
+public class LocalGitRepositoryManagerProvider implements Provider<GitRepositoryManager> {
+
+  @Inject(optional = true)
+  @Named("LocalDiskRepositoryManager")
+  @Nullable
+  private GitRepositoryManager gitRepositoryManager;
+
+  private final LocalDiskRepositoryManager localDiskRepositoryManager;
+
+  @VisibleForTesting
+  @Inject
+  public LocalGitRepositoryManagerProvider(LocalDiskRepositoryManager localDiskRepositoryManager) {
+    this.localDiskRepositoryManager = localDiskRepositoryManager;
+  }
+
+  @Override
+  public GitRepositoryManager get() {
+    return MoreObjects.firstNonNull(gitRepositoryManager, localDiskRepositoryManager);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index c17d5df..1671410 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -42,18 +42,22 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
 import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule;
+import com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupModule;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
+import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 
@@ -70,9 +74,11 @@
   @Override
   protected void configure() {
 
+    install(new PullReplicationGroupModule());
+    bind(BearerTokenProvider.class).in(Scopes.SINGLETON);
     bind(RevisionReader.class).in(Scopes.SINGLETON);
     bind(ApplyObject.class);
-
+    install(new FactoryModuleBuilder().build(FetchJob.Factory.class));
     install(new PullReplicationApiModule());
 
     install(new FetchRefReplicatedEventModule());
@@ -120,7 +126,8 @@
 
     bind(ConfigParser.class).to(SourceConfigParser.class).in(Scopes.SINGLETON);
 
-    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+    Config replicationConfig = getReplicationConfig();
+    if (replicationConfig.getBoolean("gerrit", "autoReload", false)) {
       bind(ReplicationConfig.class)
           .annotatedWith(MainReplicationConfig.class)
           .to(getReplicationConfigClass());
@@ -132,6 +139,10 @@
       bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
+    if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
+      install(new StreamEventModule());
+    }
+
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
     DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
     EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
new file mode 100644
index 0000000..8147149
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
@@ -0,0 +1,140 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.httpd.AllRequestFilter;
+import com.google.gerrit.httpd.WebSession;
+import com.google.gerrit.server.AccessPath;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationInternalUser;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Authenticates the current user by HTTP bearer token authentication.
+ *
+ * <p>* @see <a href="https://www.rfc-editor.org/rfc/rfc6750">RFC 6750</a>
+ */
+public class BearerAuthenticationFilter extends AllRequestFilter {
+
+  private static final String BEARER_TOKEN = "BearerToken";
+  private final DynamicItem<WebSession> session;
+  private final String pluginName;
+  private final PullReplicationInternalUser pluginUser;
+  private final Provider<ThreadLocalRequestContext> threadLocalRequestContext;
+  private final String bearerToken;
+  private final Pattern bearerTokenRegex = Pattern.compile("^Bearer\\s(.+)$");
+
+  @Inject
+  BearerAuthenticationFilter(
+      DynamicItem<WebSession> session,
+      @PluginName String pluginName,
+      PullReplicationInternalUser pluginUser,
+      Provider<ThreadLocalRequestContext> threadLocalRequestContext,
+      @Named(BEARER_TOKEN) String bearerToken) {
+    this.session = session;
+    this.pluginName = pluginName;
+    this.pluginUser = pluginUser;
+    this.threadLocalRequestContext = threadLocalRequestContext;
+    this.bearerToken = bearerToken;
+  }
+
+  @Override
+  public void doFilter(
+      ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
+      throws IOException, ServletException {
+
+    if (!(servletRequest instanceof HttpServletRequest)
+        || !(servletResponse instanceof HttpServletResponse)) {
+      filterChain.doFilter(servletRequest, servletResponse);
+      return;
+    }
+
+    HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
+    HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
+    String requestURI = httpRequest.getRequestURI();
+
+    if (isBasicAuthenticationRequest(requestURI)) {
+      filterChain.doFilter(servletRequest, servletResponse);
+    } else if (isPullReplicationApiRequest(requestURI) || isGitUploadPackRequest(httpRequest)) {
+      Optional<String> authorizationHeader =
+          Optional.ofNullable(httpRequest.getHeader("Authorization"));
+
+      if (isBearerTokenAuthenticated(authorizationHeader, bearerToken))
+        try (ManualRequestContext ctx =
+            new ManualRequestContext(pluginUser, threadLocalRequestContext.get())) {
+          WebSession ws = session.get();
+          ws.setAccessPathOk(AccessPath.REST_API, true);
+          filterChain.doFilter(servletRequest, servletResponse);
+        }
+      else httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+
+    } else {
+      filterChain.doFilter(servletRequest, servletResponse);
+    }
+  }
+
+  private boolean isGitUploadPackRequest(HttpServletRequest requestURI) {
+    return requestURI.getRequestURI().contains("git-upload-pack")
+        || Optional.ofNullable(requestURI.getQueryString())
+            .map(q -> q.contains("git-upload-pack"))
+            .orElse(false);
+  }
+
+  private boolean isBearerTokenAuthenticated(
+      Optional<String> authorizationHeader, String bearerToken) {
+    return authorizationHeader
+        .flatMap(this::extractBearerToken)
+        .map(bt -> bt.equals(bearerToken))
+        .orElse(false);
+  }
+
+  private boolean isBasicAuthenticationRequest(String requestURI) {
+    return requestURI.startsWith("/a/");
+  }
+
+  private boolean isPullReplicationApiRequest(String requestURI) {
+    return (requestURI.contains(pluginName)
+            && (requestURI.endsWith(String.format("/%s~apply-object", pluginName))
+                || requestURI.endsWith(String.format("/%s~apply-objects", pluginName))
+                || requestURI.endsWith(String.format("/%s~fetch", pluginName))
+                || requestURI.endsWith(String.format("/%s~delete-project", pluginName))
+                || requestURI.contains(String.format("/%s/init-project/", pluginName))))
+        || requestURI.matches(".*/projects/[^/]+/HEAD");
+  }
+
+  private Optional<String> extractBearerToken(String authorizationHeader) {
+    Matcher projectGroupMatcher = bearerTokenRegex.matcher(authorizationHeader);
+
+    if (projectGroupMatcher.find()) {
+      return Optional.of(projectGroupMatcher.group(1));
+    }
+    return Optional.empty();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
index 2a3a79d..40e03f1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
@@ -22,7 +22,7 @@
 import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.server.events.EventDispatcher;
-import com.google.gerrit.server.git.LocalDiskRepositoryManager;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.permissions.RefPermission;
@@ -31,6 +31,7 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.Context;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
@@ -49,7 +50,7 @@
   private final DynamicItem<EventDispatcher> eventDispatcher;
   private final ProjectCache projectCache;
   private final PermissionBackend permissionBackend;
-  private final LocalDiskRepositoryManager gitManager;
+  private final GitRepositoryManager gitManager;
 
   @Inject
   public DeleteRefCommand(
@@ -58,13 +59,13 @@
       ApplyObject applyObject,
       PermissionBackend permissionBackend,
       DynamicItem<EventDispatcher> eventDispatcher,
-      LocalDiskRepositoryManager gitManager) {
+      LocalGitRepositoryManagerProvider gitManagerProvider) {
     this.fetchStateLog = fetchStateLog;
     this.projectCache = projectCache;
     this.applyObject = applyObject;
     this.eventDispatcher = eventDispatcher;
     this.permissionBackend = permissionBackend;
-    this.gitManager = gitManager;
+    this.gitManager = gitManagerProvider.get();
   }
 
   public void deleteRef(Project.NameKey name, String refName, String sourceLabel)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 48964db..fdb4f8f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -17,7 +17,6 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Strings;
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -32,6 +31,7 @@
 import com.google.gerrit.server.project.ProjectResource;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob.Factory;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -42,17 +42,20 @@
   private final WorkQueue workQueue;
   private final DynamicItem<UrlFormatter> urlFormatter;
   private final FetchPreconditions preConditions;
+  private final Factory fetchJobFactory;
 
   @Inject
   public FetchAction(
       FetchCommand command,
       WorkQueue workQueue,
       DynamicItem<UrlFormatter> urlFormatter,
-      FetchPreconditions preConditions) {
+      FetchPreconditions preConditions,
+      FetchJob.Factory fetchJobFactory) {
     this.command = command;
     this.workQueue = workQueue;
     this.urlFormatter = urlFormatter;
     this.preConditions = preConditions;
+    this.fetchJobFactory = fetchJobFactory;
   }
 
   public static class Input {
@@ -104,7 +107,7 @@
             workQueue
                 .getDefaultQueue()
                 .submit(
-                    new FetchJob(command, project, input, PullReplicationApiRequestMetrics.get()));
+                    fetchJobFactory.create(project, input, PullReplicationApiRequestMetrics.get()));
     Optional<String> url =
         urlFormatter
             .get()
@@ -113,38 +116,4 @@
     checkState(url.isPresent());
     return Response.accepted(url.get());
   }
-
-  private static class FetchJob implements Runnable {
-    private static final FluentLogger log = FluentLogger.forEnclosingClass();
-
-    private FetchCommand command;
-    private Project.NameKey project;
-    private FetchAction.Input input;
-    private final PullReplicationApiRequestMetrics apiRequestMetrics;
-
-    public FetchJob(
-        FetchCommand command,
-        Project.NameKey project,
-        FetchAction.Input input,
-        PullReplicationApiRequestMetrics apiRequestMetrics) {
-      this.command = command;
-      this.project = project;
-      this.input = input;
-      this.apiRequestMetrics = apiRequestMetrics;
-    }
-
-    @Override
-    public void run() {
-      try {
-        command.fetchAsync(project, input.label, input.refName, apiRequestMetrics);
-      } catch (InterruptedException
-          | ExecutionException
-          | RemoteConfigurationMissingException
-          | TimeoutException e) {
-        log.atSevere().withCause(e).log(
-            "Exception during the async fetch call for project %s, label %s and ref name %s",
-            project.get(), input.label, input.refName);
-      }
-    }
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
new file mode 100644
index 0000000..e15dd68
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class FetchJob implements Runnable {
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+  public interface Factory {
+    FetchJob create(
+        Project.NameKey project, FetchAction.Input input, PullReplicationApiRequestMetrics metrics);
+  }
+
+  private FetchCommand command;
+  private Project.NameKey project;
+  private FetchAction.Input input;
+  private final PullReplicationApiRequestMetrics metrics;
+
+  @Inject
+  public FetchJob(
+      FetchCommand command,
+      @Assisted Project.NameKey project,
+      @Assisted FetchAction.Input input,
+      @Assisted PullReplicationApiRequestMetrics metrics) {
+    this.command = command;
+    this.project = project;
+    this.input = input;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void run() {
+    try {
+      command.fetchAsync(project, input.label, input.refName, metrics);
+    } catch (InterruptedException
+        | ExecutionException
+        | RemoteConfigurationMissingException
+        | TimeoutException e) {
+      log.atSevere().withCause(e).log(
+          "Exception during the async fetch call for project %s, label %s and ref name %s",
+          project.get(), input.label, input.refName);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java
index ca1557a..161bcf4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java
@@ -40,8 +40,10 @@
   }
 
   public Boolean canCallFetchApi() {
-    PermissionBackend.WithUser userPermission = permissionBackend.user(userProvider.get());
-    return userPermission.testOrFalse(GlobalPermission.ADMINISTRATE_SERVER)
+    CurrentUser currentUser = userProvider.get();
+    PermissionBackend.WithUser userPermission = permissionBackend.user(currentUser);
+    return currentUser.isInternalUser()
+        || userPermission.testOrFalse(GlobalPermission.ADMINISTRATE_SERVER)
         || userPermission.testOrFalse(new PluginPermission(pluginName, CALL_FETCH_ACTION));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
index b140cb4..0f3e1e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpModule.java
@@ -19,18 +19,36 @@
 import com.google.gerrit.server.config.GerritIsReplica;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
+import com.google.inject.name.Names;
 import com.google.inject.servlet.ServletModule;
+import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 
 public class HttpModule extends ServletModule {
   private boolean isReplica;
+  private final BearerTokenProvider bearerTokenProvider;
 
   @Inject
-  public HttpModule(@GerritIsReplica Boolean isReplica) {
+  public HttpModule(@GerritIsReplica Boolean isReplica, BearerTokenProvider bearerTokenProvider) {
     this.isReplica = isReplica;
+    this.bearerTokenProvider = bearerTokenProvider;
   }
 
   @Override
   protected void configureServlets() {
+    DynamicSet.bind(binder(), AllRequestFilter.class)
+        .to(PullReplicationApiMetricsFilter.class)
+        .in(Scopes.SINGLETON);
+
+    bearerTokenProvider
+        .get()
+        .ifPresent(
+            bt -> {
+              bind(String.class).annotatedWith(Names.named("BearerToken")).toInstance(bt);
+              DynamicSet.bind(binder(), AllRequestFilter.class)
+                  .to(BearerAuthenticationFilter.class)
+                  .in(Scopes.SINGLETON);
+            });
+
     if (isReplica) {
       DynamicSet.bind(binder(), AllRequestFilter.class)
           .to(PullReplicationFilter.class)
@@ -38,9 +56,5 @@
     } else {
       serveRegex("/init-project/.*$").with(ProjectInitializationAction.class);
     }
-
-    DynamicSet.bind(binder(), AllRequestFilter.class)
-        .to(PullReplicationApiMetricsFilter.class)
-        .in(Scopes.SINGLETON);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java
index 8915e78..2e1c5d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionAction.java
@@ -22,9 +22,11 @@
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestModifyView;
 import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
+import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.project.ProjectResource;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.LocalFS;
 import com.googlesource.gerrit.plugins.replication.pull.GerritConfigOps;
 import java.util.Optional;
@@ -37,20 +39,29 @@
 
   static class DeleteInput {}
 
+  private final Provider<CurrentUser> userProvider;
   private final GerritConfigOps gerritConfigOps;
   private final PermissionBackend permissionBackend;
 
   @Inject
-  ProjectDeletionAction(GerritConfigOps gerritConfigOps, PermissionBackend permissionBackend) {
+  ProjectDeletionAction(
+      GerritConfigOps gerritConfigOps,
+      PermissionBackend permissionBackend,
+      Provider<CurrentUser> userProvider) {
     this.gerritConfigOps = gerritConfigOps;
     this.permissionBackend = permissionBackend;
+    this.userProvider = userProvider;
   }
 
   @Override
   public Response<?> apply(ProjectResource projectResource, DeleteInput input)
       throws AuthException, BadRequestException, ResourceConflictException, Exception {
 
-    permissionBackend.user(projectResource.getUser()).check(DELETE_PROJECT);
+    // When triggered internally(for example by consuming stream events) user is not provided
+    // and internal user is returned. Project deletion should be always allowed for internal user.
+    if (!userProvider.get().isInternalUser()) {
+      permissionBackend.user(projectResource.getUser()).check(DELETE_PROJECT);
+    }
 
     Optional<URIish> maybeRepoURI =
         gerritConfigOps.getGitRepositoryURI(String.format("%s.git", projectResource.getName()));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
index 3426c03..2214fb3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
@@ -14,7 +14,6 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
-import static com.googlesource.gerrit.plugins.replication.pull.api.FetchApiCapability.CALL_FETCH_ACTION;
 import static com.googlesource.gerrit.plugins.replication.pull.api.HttpServletOps.checkAcceptHeader;
 import static com.googlesource.gerrit.plugins.replication.pull.api.HttpServletOps.setResponse;
 
@@ -70,17 +69,8 @@
       return;
     }
 
-    if (!userProvider.get().isIdentifiedUser()) {
-      setResponse(
-          httpServletResponse,
-          HttpServletResponse.SC_UNAUTHORIZED,
-          "Unauthorized user. '" + CALL_FETCH_ACTION + "' capability needed.");
-      return;
-    }
-
     String path = httpServletRequest.getRequestURI();
     String projectName = Url.decode(path.substring(path.lastIndexOf('/') + 1));
-
     try {
       if (initProject(projectName)) {
         setResponse(
@@ -103,10 +93,12 @@
         "Cannot initialize project " + projectName);
   }
 
-  protected boolean initProject(String projectName)
-      throws AuthException, PermissionBackendException {
-    permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
-
+  public boolean initProject(String projectName) throws AuthException, PermissionBackendException {
+    // When triggered internally(for example by consuming stream events) user is not provided
+    // and internal user is returned. Project creation should be always allowed for internal user.
+    if (!userProvider.get().isInternalUser()) {
+      permissionBackend.user(userProvider.get()).check(GlobalPermission.CREATE_PROJECT);
+    }
     Optional<URIish> maybeUri = gerritConfigOps.getGitRepositoryURI(projectName);
     if (!maybeUri.isPresent()) {
       logger.atSevere().log("Cannot initialize project '%s'", projectName);
@@ -116,9 +108,4 @@
     Project.NameKey projectNameKey = Project.NameKey.parse(projectName);
     return localFS.createProject(projectNameKey, RefNames.HEAD);
   }
-
-  public static String getProjectInitializationUrl(String pluginName, String projectName) {
-    return String.format(
-        "a/plugins/%s/init-project/%s", pluginName, Url.encode(projectName) + ".git");
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
index 597b66f..8e4e43f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiRequestMetrics.java
@@ -16,6 +16,7 @@
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.FetchReplicationMetrics;
 import java.util.Optional;
@@ -28,7 +29,7 @@
 
   public static final String HTTP_HEADER_X_START_TIME_NANOS = "X-StartTimeNanos";
 
-  private Optional<Long> startTimeNanos;
+  private Optional<Long> startTimeNanos = Optional.empty();
   private final AtomicBoolean initialised = new AtomicBoolean();
   private final FetchReplicationMetrics metrics;
 
@@ -59,6 +60,13 @@
             .map(nanoTime -> Math.min(currentTimeNanos(), nanoTime));
   }
 
+  public void start(Event event) {
+    if (!initialised.compareAndSet(false, true)) {
+      throw new IllegalStateException("PullReplicationApiRequestMetrics already initialised");
+    }
+    startTimeNanos = Optional.of(event.eventCreatedOn * 1000 * 1000 * 1000);
+  }
+
   public Optional<Long> stop(String replicationSourceName) {
     return startTimeNanos.map(
         start -> {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
index 882a74c..20122a7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
@@ -23,9 +23,7 @@
 import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN;
 import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
 import static javax.servlet.http.HttpServletResponse.SC_OK;
-import static javax.servlet.http.HttpServletResponse.SC_UNAUTHORIZED;
 
-import com.google.common.base.Splitter;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.api.projects.HeadInput;
@@ -38,11 +36,9 @@
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.extensions.restapi.TopLevelResource;
 import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
-import com.google.gerrit.extensions.restapi.Url;
 import com.google.gerrit.httpd.AllRequestFilter;
 import com.google.gerrit.httpd.restapi.RestApiServlet;
 import com.google.gerrit.json.OutputFormat;
-import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.project.ProjectResource;
 import com.google.gerrit.server.restapi.project.ProjectsCollection;
@@ -51,7 +47,6 @@
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.MalformedJsonException;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
@@ -62,9 +57,11 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.file.InvalidPathException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
@@ -75,6 +72,10 @@
 public class PullReplicationFilter extends AllRequestFilter {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
+  private static final Pattern projectNameInGerritUrl = Pattern.compile(".*/projects/([^/]+)/.*");
+  private static final Pattern projectNameInitProjectUrl =
+      Pattern.compile(".*/init-project/([^/]+.git)");
+
   private FetchAction fetchAction;
   private ApplyObjectAction applyObjectAction;
   private ApplyObjectsAction applyObjectsAction;
@@ -83,7 +84,6 @@
   private ProjectDeletionAction projectDeletionAction;
   private ProjectsCollection projectsCollection;
   private Gson gson;
-  private Provider<CurrentUser> userProvider;
   private String pluginName;
 
   @Inject
@@ -95,7 +95,6 @@
       UpdateHeadAction updateHEADAction,
       ProjectDeletionAction projectDeletionAction,
       ProjectsCollection projectsCollection,
-      Provider<CurrentUser> userProvider,
       @PluginName String pluginName) {
     this.fetchAction = fetchAction;
     this.applyObjectAction = applyObjectAction;
@@ -104,7 +103,6 @@
     this.updateHEADAction = updateHEADAction;
     this.projectDeletionAction = projectDeletionAction;
     this.projectsCollection = projectsCollection;
-    this.userProvider = userProvider;
     this.pluginName = pluginName;
     this.gson = OutputFormat.JSON.newGsonBuilder().create();
   }
@@ -121,44 +119,20 @@
     HttpServletRequest httpRequest = (HttpServletRequest) request;
     try {
       if (isFetchAction(httpRequest)) {
-        if (userProvider.get().isIdentifiedUser()) {
-          writeResponse(httpResponse, doFetch(httpRequest));
-        } else {
-          httpResponse.sendError(SC_UNAUTHORIZED);
-        }
+        writeResponse(httpResponse, doFetch(httpRequest));
       } else if (isApplyObjectAction(httpRequest)) {
-        if (userProvider.get().isIdentifiedUser()) {
-          writeResponse(httpResponse, doApplyObject(httpRequest));
-        } else {
-          httpResponse.sendError(SC_UNAUTHORIZED);
-        }
+        writeResponse(httpResponse, doApplyObject(httpRequest));
       } else if (isApplyObjectsAction(httpRequest)) {
-        if (userProvider.get().isIdentifiedUser()) {
-          writeResponse(httpResponse, doApplyObjects(httpRequest));
-        } else {
-          httpResponse.sendError(SC_UNAUTHORIZED);
-        }
+        writeResponse(httpResponse, doApplyObjects(httpRequest));
       } else if (isInitProjectAction(httpRequest)) {
-        if (userProvider.get().isIdentifiedUser()) {
-          if (!checkAcceptHeader(httpRequest, httpResponse)) {
-            return;
-          }
-          doInitProject(httpRequest, httpResponse);
-        } else {
-          httpResponse.sendError(SC_UNAUTHORIZED);
+        if (!checkAcceptHeader(httpRequest, httpResponse)) {
+          return;
         }
+        doInitProject(httpRequest, httpResponse);
       } else if (isUpdateHEADAction(httpRequest)) {
-        if (userProvider.get().isIdentifiedUser()) {
-          writeResponse(httpResponse, doUpdateHEAD(httpRequest));
-        } else {
-          httpResponse.sendError(SC_UNAUTHORIZED);
-        }
+        writeResponse(httpResponse, doUpdateHEAD(httpRequest));
       } else if (isDeleteProjectAction(httpRequest)) {
-        if (userProvider.get().isIdentifiedUser()) {
-          writeResponse(httpResponse, doDeleteProject(httpRequest));
-        } else {
-          httpResponse.sendError(SC_UNAUTHORIZED);
-        }
+        writeResponse(httpResponse, doDeleteProject(httpRequest));
       } else {
         chain.doFilter(request, response);
       }
@@ -181,6 +155,9 @@
     } catch (InitProjectException | ResourceNotFoundException e) {
       RestApiServlet.replyError(
           httpRequest, httpResponse, SC_INTERNAL_SERVER_ERROR, e.getMessage(), e.caching(), e);
+    } catch (NoSuchElementException e) {
+      RestApiServlet.replyError(
+          httpRequest, httpResponse, SC_BAD_REQUEST, "Project name not present in the url", e);
     } catch (Exception e) {
       if (e instanceof InvalidPathException || e.getCause() instanceof InvalidPathException) {
         RestApiServlet.replyError(
@@ -194,8 +171,8 @@
   private void doInitProject(HttpServletRequest httpRequest, HttpServletResponse httpResponse)
       throws RestApiException, IOException, PermissionBackendException {
 
-    String path = httpRequest.getRequestURI();
-    String projectName = Url.decode(path.substring(path.lastIndexOf('/') + 1));
+    IdString id = getInitProjectName(httpRequest).get();
+    String projectName = id.get();
     if (projectInitializationAction.initProject(projectName)) {
       setResponse(
           httpResponse, HttpServletResponse.SC_CREATED, "Project " + projectName + " initialized");
@@ -208,7 +185,7 @@
   private Response<Map<String, Object>> doApplyObject(HttpServletRequest httpRequest)
       throws RestApiException, IOException, PermissionBackendException {
     RevisionInput input = readJson(httpRequest, TypeLiteral.get(RevisionInput.class));
-    IdString id = getProjectName(httpRequest);
+    IdString id = getProjectName(httpRequest).get();
     ProjectResource projectResource = projectsCollection.parse(TopLevelResource.INSTANCE, id);
 
     return (Response<Map<String, Object>>) applyObjectAction.apply(projectResource, input);
@@ -218,7 +195,7 @@
   private Response<Map<String, Object>> doApplyObjects(HttpServletRequest httpRequest)
       throws RestApiException, IOException, PermissionBackendException {
     RevisionsInput input = readJson(httpRequest, TypeLiteral.get(RevisionsInput.class));
-    IdString id = getProjectName(httpRequest);
+    IdString id = getProjectName(httpRequest).get();
     ProjectResource projectResource = projectsCollection.parse(TopLevelResource.INSTANCE, id);
 
     return (Response<Map<String, Object>>) applyObjectsAction.apply(projectResource, input);
@@ -227,16 +204,16 @@
   @SuppressWarnings("unchecked")
   private Response<String> doUpdateHEAD(HttpServletRequest httpRequest) throws Exception {
     HeadInput input = readJson(httpRequest, TypeLiteral.get(HeadInput.class));
-    ProjectResource projectResource =
-        projectsCollection.parse(TopLevelResource.INSTANCE, getProjectName(httpRequest));
+    IdString id = getProjectName(httpRequest).get();
+    ProjectResource projectResource = projectsCollection.parse(TopLevelResource.INSTANCE, id);
 
     return (Response<String>) updateHEADAction.apply(projectResource, input);
   }
 
   @SuppressWarnings("unchecked")
   private Response<String> doDeleteProject(HttpServletRequest httpRequest) throws Exception {
-    ProjectResource projectResource =
-        projectsCollection.parse(TopLevelResource.INSTANCE, getProjectName(httpRequest));
+    IdString id = getProjectName(httpRequest).get();
+    ProjectResource projectResource = projectsCollection.parse(TopLevelResource.INSTANCE, id);
     return (Response<String>)
         projectDeletionAction.apply(projectResource, new ProjectDeletionAction.DeleteInput());
   }
@@ -245,7 +222,7 @@
   private Response<Map<String, Object>> doFetch(HttpServletRequest httpRequest)
       throws IOException, RestApiException, PermissionBackendException {
     Input input = readJson(httpRequest, TypeLiteral.get(Input.class));
-    IdString id = getProjectName(httpRequest);
+    IdString id = getProjectName(httpRequest).get();
     ProjectResource projectResource = projectsCollection.parse(TopLevelResource.INSTANCE, id);
 
     return (Response<Map<String, Object>>) fetchAction.apply(projectResource, input);
@@ -302,42 +279,48 @@
    * @param req
    * @return project name
    */
-  private IdString getProjectName(HttpServletRequest req) {
-    String path = req.getRequestURI();
+  private Optional<IdString> getInitProjectName(HttpServletRequest req) {
+    return extractProjectName(req, projectNameInitProjectUrl);
+  }
 
-    List<IdString> out = new ArrayList<>();
-    for (String p : Splitter.on('/').split(path)) {
-      out.add(IdString.fromUrl(p));
+  private Optional<IdString> getProjectName(HttpServletRequest req) {
+    return extractProjectName(req, projectNameInGerritUrl);
+  }
+
+  private Optional<IdString> extractProjectName(HttpServletRequest req, Pattern urlPattern) {
+    String path = req.getRequestURI();
+    Matcher projectGroupMatcher = urlPattern.matcher(path);
+
+    if (projectGroupMatcher.find()) {
+      return Optional.of(IdString.fromUrl(projectGroupMatcher.group(1)));
     }
-    if (!out.isEmpty() && out.get(out.size() - 1).isEmpty()) {
-      out.remove(out.size() - 1);
-    }
-    return out.get(3);
+
+    return Optional.empty();
   }
 
   private boolean isApplyObjectAction(HttpServletRequest httpRequest) {
-    return httpRequest.getRequestURI().endsWith("pull-replication~apply-object");
+    return httpRequest.getRequestURI().endsWith(String.format("/%s~apply-object", pluginName));
   }
 
   private boolean isApplyObjectsAction(HttpServletRequest httpRequest) {
-    return httpRequest.getRequestURI().endsWith("pull-replication~apply-objects");
+    return httpRequest.getRequestURI().endsWith(String.format("/%s~apply-objects", pluginName));
   }
 
   private boolean isFetchAction(HttpServletRequest httpRequest) {
-    return httpRequest.getRequestURI().endsWith("pull-replication~fetch");
+    return httpRequest.getRequestURI().endsWith(String.format("/%s~fetch", pluginName));
   }
 
   private boolean isInitProjectAction(HttpServletRequest httpRequest) {
-    return httpRequest.getRequestURI().contains("pull-replication/init-project/");
+    return httpRequest.getRequestURI().contains(String.format("/%s/init-project/", pluginName));
   }
 
   private boolean isUpdateHEADAction(HttpServletRequest httpRequest) {
-    return httpRequest.getRequestURI().matches("(/a)?/projects/[^/]+/HEAD")
+    return httpRequest.getRequestURI().matches(".*/projects/[^/]+/HEAD")
         && "PUT".equals(httpRequest.getMethod());
   }
 
   private boolean isDeleteProjectAction(HttpServletRequest httpRequest) {
-    return httpRequest.getRequestURI().endsWith(String.format("%s~delete-project", pluginName))
+    return httpRequest.getRequestURI().endsWith(String.format("/%s~delete-project", pluginName))
         && "DELETE".equals(httpRequest.getMethod());
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupBackend.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupBackend.java
new file mode 100644
index 0000000..9521004
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupBackend.java
@@ -0,0 +1,98 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.auth;
+
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.GroupDescription;
+import com.google.gerrit.entities.GroupReference;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.account.AbstractGroupBackend;
+import com.google.gerrit.server.account.GroupMembership;
+import com.google.gerrit.server.account.ListGroupMembership;
+import com.google.gerrit.server.group.SystemGroupBackend;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Backend to expose the pull-replication internal user group membership. */
+@Singleton
+class PullReplicationGroupBackend extends AbstractGroupBackend {
+  public static final AccountGroup.UUID INTERNAL_GROUP_UUID =
+      AccountGroup.uuid("pullreplication:internal-user");
+  public static final String INTERNAL_GROUP_NAME = "Pull-replication Internal User";
+  public static final String NAME_PREFIX = "pullreplication/";
+  public static final GroupDescription.Basic INTERNAL_GROUP_DESCRIPTION =
+      new GroupDescription.Basic() {
+
+        @Override
+        public String getUrl() {
+          return null;
+        }
+
+        @Override
+        public String getName() {
+          return INTERNAL_GROUP_NAME;
+        }
+
+        @Override
+        public AccountGroup.UUID getGroupUUID() {
+          return INTERNAL_GROUP_UUID;
+        }
+
+        @Override
+        public String getEmailAddress() {
+          return null;
+        }
+      };
+  private final PullReplicationInternalUser internalUser;
+
+  static final ListGroupMembership INTERNAL_GROUP_MEMBERSHIP =
+      new ListGroupMembership(
+          Arrays.asList(INTERNAL_GROUP_UUID, SystemGroupBackend.ANONYMOUS_USERS));
+
+  @Inject
+  public PullReplicationGroupBackend(PullReplicationInternalUser internalUser) {
+    this.internalUser = internalUser;
+  }
+
+  @Override
+  public boolean handles(AccountGroup.UUID uuid) {
+    return INTERNAL_GROUP_UUID.equals(uuid);
+  }
+
+  @Override
+  public GroupDescription.Basic get(AccountGroup.UUID uuid) {
+    return handles(uuid) ? INTERNAL_GROUP_DESCRIPTION : null;
+  }
+
+  @Override
+  public Collection<GroupReference> suggest(String name, ProjectState project) {
+    return Arrays.asList(
+        NAME_PREFIX.contains(name.toLowerCase())
+            ? GroupReference.create(INTERNAL_GROUP_UUID, INTERNAL_GROUP_NAME)
+            : GroupReference.create(name));
+  }
+
+  @Override
+  public GroupMembership membershipsOf(CurrentUser user) {
+    if (user.equals(internalUser)) {
+      return INTERNAL_GROUP_MEMBERSHIP;
+    }
+
+    return ListGroupMembership.EMPTY;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupModule.java
new file mode 100644
index 0000000..583ba8e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupModule.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.auth;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.account.GroupBackend;
+import com.google.inject.AbstractModule;
+
+public class PullReplicationGroupModule extends AbstractModule {
+  @Override
+  protected void configure() {
+    DynamicSet.bind(binder(), GroupBackend.class).to(PullReplicationGroupBackend.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationInternalUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationInternalUser.java
new file mode 100644
index 0000000..e8b7666
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationInternalUser.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.auth;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.PluginUser;
+import com.google.gerrit.server.account.GroupMembership;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PullReplicationInternalUser extends PluginUser {
+
+  @Inject
+  protected PullReplicationInternalUser(@PluginName String pluginName) {
+    super(pluginName);
+  }
+
+  @Override
+  public GroupMembership getEffectiveGroups() {
+    return PullReplicationGroupBackend.INTERNAL_GROUP_MEMBERSHIP;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index ed919de..0afbecf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.replication.pull.client;
 
 import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
-import static com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction.getProjectInitializationUrl;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Strings;
@@ -26,12 +25,14 @@
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.restapi.Url;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -42,19 +43,19 @@
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Optional;
+import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.ParseException;
-import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.AuthenticationException;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.auth.BasicScheme;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.util.EntityUtils;
 import org.eclipse.jgit.transport.CredentialItem;
@@ -70,9 +71,11 @@
   private final CredentialsFactory credentials;
   private final SourceHttpClient.Factory httpClientFactory;
   private final Source source;
-  private final String instanceLabel;
+  private final String instanceId;
   private final String pluginName;
   private final SyncRefsFilter syncRefsFilter;
+  private final BearerTokenProvider bearerTokenProvider;
+  private final String urlAuthenticationPrefix;
 
   @Inject
   FetchRestApiClient(
@@ -81,18 +84,26 @@
       ReplicationConfig replicationConfig,
       SyncRefsFilter syncRefsFilter,
       @PluginName String pluginName,
+      @Nullable @GerritInstanceId String instanceId,
+      BearerTokenProvider bearerTokenProvider,
       @Assisted Source source) {
     this.credentials = credentials;
     this.httpClientFactory = httpClientFactory;
     this.source = source;
     this.pluginName = pluginName;
     this.syncRefsFilter = syncRefsFilter;
-    this.instanceLabel =
-        Strings.nullToEmpty(
+    this.instanceId =
+        Optional.ofNullable(
                 replicationConfig.getConfig().getString("replication", null, "instanceLabel"))
+            .orElse(instanceId)
             .trim();
+
     requireNonNull(
-        Strings.emptyToNull(instanceLabel), "replication.instanceLabel cannot be null or empty");
+        Strings.emptyToNull(this.instanceId),
+        "gerrit.instanceId or replication.instanceLabel must be set");
+
+    this.bearerTokenProvider = bearerTokenProvider;
+    this.urlAuthenticationPrefix = bearerTokenProvider.get().map(br -> "").orElse("a/");
   }
 
   /* (non-Javadoc)
@@ -101,24 +112,21 @@
   @Override
   public HttpResult callFetch(
       Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
-      throws ClientProtocolException, IOException {
-    String url =
-        String.format(
-            "%s/a/projects/%s/pull-replication~fetch",
-            targetUri.toString(), Url.encode(project.get()));
+      throws IOException {
+    String url = formatUrl(targetUri.toString(), project, "fetch");
     Boolean callAsync = !syncRefsFilter.match(refName);
     HttpPost post = new HttpPost(url);
     post.setEntity(
         new StringEntity(
             String.format(
                 "{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
-                instanceLabel, refName, callAsync),
+                instanceId, refName, callAsync),
             StandardCharsets.UTF_8));
     post.addHeader(new BasicHeader("Content-Type", "application/json"));
     post.addHeader(
         PullReplicationApiRequestMetrics.HTTP_HEADER_X_START_TIME_NANOS,
         Long.toString(startTimeNanos));
-    return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
+    return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
   /* (non-Javadoc)
@@ -126,13 +134,11 @@
    */
   @Override
   public HttpResult initProject(Project.NameKey project, URIish uri) throws IOException {
-    String url =
-        String.format(
-            "%s/%s", uri.toString(), getProjectInitializationUrl(pluginName, project.get()));
+    String url = formatInitProjectUrl(uri.toString(), project);
     HttpPut put = new HttpPut(url);
     put.addHeader(new BasicHeader("Accept", MediaType.ANY_TEXT_TYPE.toString()));
     put.addHeader(new BasicHeader("Content-Type", MediaType.PLAIN_TEXT_UTF_8.toString()));
-    return httpClientFactory.create(source).execute(put, this, getContext(uri));
+    return executeRequest(put, bearerTokenProvider.get(), uri);
   }
 
   /* (non-Javadoc)
@@ -140,10 +146,9 @@
    */
   @Override
   public HttpResult deleteProject(Project.NameKey project, URIish apiUri) throws IOException {
-    String url =
-        String.format("%s/%s", apiUri.toASCIIString(), getProjectDeletionUrl(project.get()));
+    String url = formatUrl(apiUri.toASCIIString(), project, "delete-project");
     HttpDelete delete = new HttpDelete(url);
-    return httpClientFactory.create(source).execute(delete, this, getContext(apiUri));
+    return executeRequest(delete, bearerTokenProvider.get(), apiUri);
   }
 
   /* (non-Javadoc)
@@ -153,13 +158,12 @@
   public HttpResult updateHead(Project.NameKey project, String newHead, URIish apiUri)
       throws IOException {
     logger.atFine().log("Updating head of %s on %s", project.get(), newHead);
-    String url =
-        String.format("%s/%s", apiUri.toASCIIString(), getProjectUpdateHeadUrl(project.get()));
+    String url = formatUrl(apiUri.toASCIIString(), project, "HEAD");
     HttpPut req = new HttpPut(url);
     req.setEntity(
         new StringEntity(String.format("{\"ref\": \"%s\"}", newHead), StandardCharsets.UTF_8));
     req.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
-    return httpClientFactory.create(source).execute(req, this, getContext(apiUri));
+    return executeRequest(req, bearerTokenProvider.get(), apiUri);
   }
 
   /* (non-Javadoc)
@@ -180,14 +184,14 @@
     } else {
       requireNull(revisionData, "DELETE ref-updates cannot be associated with a RevisionData");
     }
-    RevisionInput input = new RevisionInput(instanceLabel, refName, revisionData);
+    RevisionInput input = new RevisionInput(instanceId, refName, revisionData);
 
-    String url = formatUrl(project, targetUri, "apply-object");
+    String url = formatUrl(targetUri.toString(), project, "apply-object");
 
     HttpPost post = new HttpPost(url);
     post.setEntity(new StringEntity(GSON.toJson(input)));
     post.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
-    return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
+    return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
   @Override
@@ -199,22 +203,25 @@
     }
 
     RevisionData[] inputData = new RevisionData[revisionData.size()];
-    RevisionsInput input =
-        new RevisionsInput(instanceLabel, refName, revisionData.toArray(inputData));
+    RevisionsInput input = new RevisionsInput(instanceId, refName, revisionData.toArray(inputData));
 
-    String url = formatUrl(project, targetUri, "apply-objects");
+    String url = formatUrl(targetUri.toString(), project, "apply-objects");
     HttpPost post = new HttpPost(url);
     post.setEntity(new StringEntity(GSON.toJson(input)));
     post.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
-    return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
+    return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
-  private String formatUrl(Project.NameKey project, URIish targetUri, String api) {
-    String url =
-        String.format(
-            "%s/a/projects/%s/%s~%s",
-            targetUri.toString(), Url.encode(project.get()), pluginName, api);
-    return url;
+  private String formatUrl(String targetUri, Project.NameKey project, String api) {
+    return String.format(
+        "%s/%sprojects/%s/%s~%s",
+        targetUri, urlAuthenticationPrefix, Url.encode(project.get()), pluginName, api);
+  }
+
+  private String formatInitProjectUrl(String targetUri, Project.NameKey project) {
+    return String.format(
+        "%s/%splugins/%s/init-project/%s.git",
+        targetUri, urlAuthenticationPrefix, pluginName, Url.encode(project.get()));
   }
 
   private void requireNull(Object object, String string) {
@@ -242,30 +249,37 @@
     return new HttpResult(response.getStatusLine().getStatusCode(), responseBody);
   }
 
-  private HttpClientContext getContext(URIish targetUri) {
-    HttpClientContext ctx = HttpClientContext.create();
-    ctx.setCredentialsProvider(adapt(credentials.create(source.getRemoteConfigName()), targetUri));
-    return ctx;
+  private HttpResult executeRequest(
+      HttpRequestBase httpRequest, Optional<String> bearerToken, URIish targetUri)
+      throws IOException {
+
+    HttpRequestBase reqWithAuthentication =
+        bearerToken.isPresent()
+            ? withBearerTokenAuthentication(httpRequest, bearerToken.get())
+            : withBasicAuthentication(targetUri, httpRequest);
+
+    return httpClientFactory.create(source).execute(reqWithAuthentication, this);
   }
 
-  private CredentialsProvider adapt(org.eclipse.jgit.transport.CredentialsProvider cp, URIish uri) {
+  private HttpRequestBase withBasicAuthentication(URIish targetUri, HttpRequestBase req) {
+    org.eclipse.jgit.transport.CredentialsProvider cp =
+        credentials.create(source.getRemoteConfigName());
     CredentialItem.Username user = new CredentialItem.Username();
     CredentialItem.Password pass = new CredentialItem.Password();
-    if (cp.supports(user, pass) && cp.get(uri, user, pass)) {
-      CredentialsProvider adapted = new BasicCredentialsProvider();
-      adapted.setCredentials(
-          AuthScope.ANY,
-          new UsernamePasswordCredentials(user.getValue(), new String(pass.getValue())));
-      return adapted;
+    if (cp.supports(user, pass) && cp.get(targetUri, user, pass)) {
+      UsernamePasswordCredentials creds =
+          new UsernamePasswordCredentials(user.getValue(), new String(pass.getValue()));
+      try {
+        req.addHeader(new BasicScheme().authenticate(creds, req, null));
+      } catch (AuthenticationException e) {
+        logger.atFine().log(String.format("Anonymous Basic Authentication for uri: %s", targetUri));
+      }
     }
-    return null;
+    return req;
   }
 
-  String getProjectDeletionUrl(String projectName) {
-    return String.format("a/projects/%s/%s~delete-project", Url.encode(projectName), pluginName);
-  }
-
-  String getProjectUpdateHeadUrl(String projectName) {
-    return String.format("a/projects/%s/%s~HEAD", Url.encode(projectName), pluginName);
+  private HttpRequestBase withBearerTokenAuthentication(HttpRequestBase req, String bearerToken) {
+    req.addHeader(new BasicHeader(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken));
+    return req;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
index 7bfc7d1..6254a42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
@@ -18,14 +18,11 @@
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.protocol.HttpContext;
 
 /** HTTP client for executing URI requests to a remote site */
 public interface HttpClient {
 
   public <T> T execute(
-      final HttpUriRequest request,
-      final ResponseHandler<? extends T> responseHandler,
-      final HttpContext context)
+      final HttpUriRequest request, final ResponseHandler<? extends T> responseHandler)
       throws ClientProtocolException, IOException;
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
index ee0fe79..fa700e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
@@ -25,7 +25,6 @@
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.apache.http.protocol.HttpContext;
 
 /** Apache HTTP client implementation based on Source-specific parameters */
 public class SourceHttpClient implements HttpClient {
@@ -41,8 +40,7 @@
   }
 
   @Override
-  public <T> T execute(
-      HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context)
+  public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler)
       throws ClientProtocolException, IOException {
     return source
         .memoize(
@@ -51,7 +49,7 @@
                     .setConnectionManager(customConnectionManager(source))
                     .setDefaultRequestConfig(customRequestConfig(source))
                     .build())
-        .execute(request, responseHandler, context);
+        .execute(request, responseHandler);
   }
 
   private static RequestConfig customRequestConfig(Source source) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
new file mode 100644
index 0000000..0f092e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob.Factory;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import org.eclipse.jgit.lib.ObjectId;
+
+public class StreamEventListener implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private String instanceId;
+  private WorkQueue workQueue;
+  private ProjectInitializationAction projectInitializationAction;
+
+  private Factory fetchJobFactory;
+  private final Provider<PullReplicationApiRequestMetrics> metricsProvider;
+
+  @Inject
+  public StreamEventListener(
+      @Nullable @GerritInstanceId String instanceId,
+      ProjectInitializationAction projectInitializationAction,
+      WorkQueue workQueue,
+      FetchJob.Factory fetchJobFactory,
+      Provider<PullReplicationApiRequestMetrics> metricsProvider) {
+    this.instanceId = instanceId;
+    this.projectInitializationAction = projectInitializationAction;
+    this.workQueue = workQueue;
+    this.fetchJobFactory = fetchJobFactory;
+    this.metricsProvider = metricsProvider;
+
+    requireNonNull(
+        Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    if (!instanceId.equals(event.instanceId)) {
+      PullReplicationApiRequestMetrics metrics = metricsProvider.get();
+      metrics.start(event);
+      if (event instanceof RefUpdatedEvent) {
+        RefUpdatedEvent refUpdatedEvent = (RefUpdatedEvent) event;
+        if (!isProjectDelete(refUpdatedEvent)) {
+          fetchRefsAsync(
+              refUpdatedEvent.getRefName(),
+              refUpdatedEvent.instanceId,
+              refUpdatedEvent.getProjectNameKey(),
+              metrics);
+        }
+      }
+      if (event instanceof ProjectCreatedEvent) {
+        ProjectCreatedEvent projectCreatedEvent = (ProjectCreatedEvent) event;
+        try {
+          projectInitializationAction.initProject(getProjectRepositoryName(projectCreatedEvent));
+          fetchRefsAsync(
+              FetchOne.ALL_REFS,
+              projectCreatedEvent.instanceId,
+              projectCreatedEvent.getProjectNameKey(),
+              metrics);
+        } catch (AuthException | PermissionBackendException e) {
+          logger.atSevere().withCause(e).log(
+              "Cannot initialise project:%s", projectCreatedEvent.projectName);
+        }
+      }
+    }
+  }
+
+  private boolean isProjectDelete(RefUpdatedEvent event) {
+    return RefNames.isConfigRef(event.getRefName())
+        && ObjectId.zeroId().equals(ObjectId.fromString(event.refUpdate.get().newRev));
+  }
+
+  protected void fetchRefsAsync(
+      String refName,
+      String sourceInstanceId,
+      NameKey projectNameKey,
+      PullReplicationApiRequestMetrics metrics) {
+    FetchAction.Input input = new FetchAction.Input();
+    input.refName = refName;
+    input.label = sourceInstanceId;
+    workQueue.getDefaultQueue().submit(fetchJobFactory.create(projectNameKey, input, metrics));
+  }
+
+  private String getProjectRepositoryName(ProjectCreatedEvent projectCreatedEvent) {
+    return String.format("%s.git", projectCreatedEvent.projectName);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
new file mode 100644
index 0000000..2389678
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
+import com.google.inject.AbstractModule;
+
+public class StreamEventModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
index 2bb1caf..36356e9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
@@ -15,8 +15,9 @@
 package com.googlesource.gerrit.plugins.replication.pull.fetch;
 
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.git.LocalDiskRepositoryManager;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
@@ -30,15 +31,15 @@
 
 public class ApplyObject {
 
-  private final LocalDiskRepositoryManager gitManager;
+  private final GitRepositoryManager gitManager;
 
-  // NOTE: We do need specifically the LocalDiskRepositoryManager to make sure
+  // NOTE: We do need specifically the local GitRepositoryManager to make sure
   // to be able to write onto the directly physical repository without any wrapper.
   // Using for instance the multi-site wrapper injected by Guice would result
   // in a split-brain because of the misalignment of local vs. global refs values.
   @Inject
-  public ApplyObject(LocalDiskRepositoryManager gitManager) {
-    this.gitManager = gitManager;
+  public ApplyObject(LocalGitRepositoryManagerProvider gitManagerProvider) {
+    this.gitManager = gitManagerProvider.get();
   }
 
   public RefUpdateState apply(Project.NameKey name, RefSpec refSpec, RevisionData[] revisionsData)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
index 5de82f1..89972cf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
@@ -18,34 +18,22 @@
 
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
-import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.NullProgressMonitor;
 import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.transport.CredentialsProvider;
-import org.eclipse.jgit.transport.FetchResult;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.Transport;
-import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.transport.*;
 
 public class JGitFetch implements Fetch {
-  private final RemoteConfig config;
-  private final CredentialsProvider credentialsProvider;
   URIish uri;
   Repository git;
+  private final TransportProvider transportProvider;
 
   @Inject
   public JGitFetch(
-      SourceConfiguration sourceConfig,
-      CredentialsFactory cpFactory,
-      @Assisted URIish uri,
-      @Assisted Repository git) {
-    this.config = sourceConfig.getRemoteConfig();
-    this.credentialsProvider = cpFactory.create(config.getName());
+      TransportProvider transportProvider, @Assisted URIish uri, @Assisted Repository git) {
+    this.transportProvider = transportProvider;
     this.uri = uri;
     this.git = git;
   }
@@ -53,7 +41,7 @@
   @Override
   public List<RefUpdateState> fetch(List<RefSpec> refs) throws IOException {
     FetchResult res;
-    try (Transport tn = Transport.open(git, uri)) {
+    try (Transport tn = transportProvider.open(git, uri)) {
       res = fetchVia(tn, refs);
     }
     return res.getTrackingRefUpdates().stream()
@@ -62,9 +50,6 @@
   }
 
   private FetchResult fetchVia(Transport tn, List<RefSpec> fetchRefSpecs) throws IOException {
-    tn.applyConfig(config);
-    tn.setCredentialsProvider(credentialsProvider);
-
     repLog.info("Fetch references {} from {}", fetchRefSpecs, uri);
     return tn.fetch(NullProgressMonitor.INSTANCE, fetchRefSpecs);
   }
diff --git a/src/main/java/org/eclipse/jgit/transport/TransportHttpWithBearerToken.java b/src/main/java/org/eclipse/jgit/transport/TransportHttpWithBearerToken.java
new file mode 100644
index 0000000..68ff6f8
--- /dev/null
+++ b/src/main/java/org/eclipse/jgit/transport/TransportHttpWithBearerToken.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.eclipse.jgit.transport;
+
+import static org.eclipse.jgit.util.HttpSupport.HDR_AUTHORIZATION;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.lib.ProgressMonitor;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.http.HttpConnection;
+
+/**
+ * This is a hack in order to allow git over http/https with Bearer Token Authentication.
+ *
+ * <p>Currently {@link org.eclipse.jgit.transport.TransportHttp} does NOT provide Bearer Token
+ * Authentication and unfortunately it is not possible to extend the functionality because some
+ * classes, methods and instance variables are private or protected. This package in the pull
+ * replication plugin provides the visibility needed and allows to extend the functionality.
+ *
+ * <p>It is important to mention that in the case of git push operation, this class cannot be used
+ * because the push operation needs to initialise a push hook: {@link
+ * org.eclipse.jgit.transport.Transport#push(ProgressMonitor, Collection, OutputStream)} and it is
+ * defined as a private in the code.
+ */
+public class TransportHttpWithBearerToken extends TransportHttp {
+
+  private static final String SCHEME_HTTP = "http";
+  private static final String SCHEME_HTTPS = "https";
+  private static final Set<String> SCHEMES_ALLOWED = ImmutableSet.of(SCHEME_HTTP, SCHEME_HTTPS);
+  private final String bearerToken;
+
+  public TransportHttpWithBearerToken(Repository local, URIish uri, String bearerToken)
+      throws NotSupportedException {
+    super(local, uri);
+    this.bearerToken = bearerToken;
+  }
+
+  protected HttpConnection httpOpen(String method, URL u, AcceptEncoding acceptEncoding)
+      throws IOException {
+    HttpConnection conn = super.httpOpen(method, u, acceptEncoding);
+    conn.setRequestProperty(HDR_AUTHORIZATION, "Bearer " + bearerToken); // $NON-NLS-1$
+    return conn;
+  }
+
+  /**
+   * This method copies the behaviour of {@link
+   * org.eclipse.jgit.transport.TransportProtocol#canHandle(URIish, Repository, String)} in the case
+   * of {@link org.eclipse.jgit.transport.TransportHttp} where scheme, host and path are compulsory.
+   */
+  public static boolean canHandle(URIish uri) {
+    return SCHEMES_ALLOWED.contains(uri.getScheme())
+        && !Strings.isNullOrEmpty(uri.getHost())
+        && !Strings.isNullOrEmpty(uri.getPath());
+  }
+}
diff --git a/src/main/java/org/eclipse/jgit/transport/TransportProvider.java b/src/main/java/org/eclipse/jgit/transport/TransportProvider.java
new file mode 100644
index 0000000..ca15b6c
--- /dev/null
+++ b/src/main/java/org/eclipse/jgit/transport/TransportProvider.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.eclipse.jgit.transport;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
+import java.util.Optional;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+
+/**
+ * This class is responsible for providing a Custom Git HTTP Transport with Bearer Token
+ * Authentication or a concrete implementation of {@link org.eclipse.jgit.transport.Transport}.
+ */
+@Singleton
+public class TransportProvider {
+  private final RemoteConfig remoteConfig;
+  private final CredentialsProvider credentialsProvider;
+  private final Optional<String> bearerToken;
+
+  @Inject
+  public TransportProvider(
+      SourceConfiguration sourceConfig,
+      CredentialsFactory cpFactory,
+      BearerTokenProvider bearerTokenProvider) {
+    this.remoteConfig = sourceConfig.getRemoteConfig();
+    this.credentialsProvider = cpFactory.create(remoteConfig.getName());
+    this.bearerToken = bearerTokenProvider.get();
+  }
+
+  public Transport open(Repository local, URIish uri)
+      throws NotSupportedException, TransportException {
+    return (bearerToken.isPresent() && TransportHttpWithBearerToken.canHandle(uri))
+        ? provideTransportHttpWithBearerToken(local, uri)
+        : provideNativeTransport(local, uri);
+  }
+
+  private Transport provideTransportHttpWithBearerToken(Repository local, URIish uri)
+      throws NotSupportedException {
+    Transport tn = new TransportHttpWithBearerToken(local, uri, bearerToken.get());
+    tn.applyConfig(remoteConfig);
+    return tn;
+  }
+
+  private Transport provideNativeTransport(Repository local, URIish uri)
+      throws NotSupportedException, TransportException {
+    Transport tn = Transport.open(local, uri);
+    tn.applyConfig(remoteConfig);
+    tn.setCredentialsProvider(credentialsProvider);
+    return tn;
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b2470a4..d3a2859 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -31,8 +31,13 @@
     threads = 3
     authGroup = Public Mirror Group
     authGroup = Second Public Mirror Group
-  [replication]
-    instanceLabel = host-one
+```
+
+And make sure that instanceId is setup in `$site_path/etc/gerrit.config`:
+
+```
+[gerrit]
+    instanceId = host-one
 ```
 
 Then reload the replication plugin to pick up the new configuration:
@@ -121,6 +126,21 @@
 	provided in the remote configuration section which name is equal
 	to instanceLabel.
 
+	Deprecated: This property is kept for backward compatibility and
+	will be removed in the future release. Use [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit.instanceId)
+	instead.
+
+replication.consumeStreamEvents
+:	Use stream events to trigger pull-replication actions alongside the
+	REST approach. This mechanism is useful together with event-broker
+	and multi-site to provide backfill mechanism when a node has to
+	catch up with the events after being unreachable.
+
+	NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
+
+	Default: false
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
@@ -441,7 +461,6 @@
     autoReload = true
     replicateOnStartup = false
 [replication]
-	instanceLabel = host-one
     lockErrorMaxRetries = 5
     maxRetries = 5
 ```
@@ -477,7 +496,6 @@
     autoReload = true
     replicateOnStartup = false
 [replication]
-    instanceLabel = host-one
     lockErrorMaxRetries = 5
     maxRetries = 5
 
@@ -500,8 +518,15 @@
 
 The optional file `$site_path/secure.config` is a Git-style config
 file that provides secure values that should not be world-readable,
-such as passwords. Passwords for HTTP remotes can be obtained from
-this file.
+such as passwords. The HTTP authentication can be configured in 2 
+different flavours:
+
+*HTTP Bearer Token Authentication*
+
+auth.bearerToken
+:	shared secret configured on all replication endpoints. 
+
+*HTTP Basic Authentication*
 
 remote.NAME.username
 :	Username to use for HTTP authentication on this remote, if not
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
index cb5af42..ee5876f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
@@ -23,6 +23,7 @@
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
@@ -95,6 +96,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRef() throws Exception {
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
@@ -122,6 +124,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRefAfterConfigReloaded() throws Exception {
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
@@ -157,6 +160,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranch() throws Exception {
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
     createTestProject(testProjectName);
@@ -190,6 +194,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldAutoReloadConfiguration() throws Exception {
     SourcesCollection sources = getInstance(SourcesCollection.class);
     AutoReloadConfigDecorator autoReloadConfigDecorator =
@@ -202,6 +207,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldAutoReloadConfigurationWhenRemoteConfigAdded() throws Exception {
     FileBasedConfig newRemoteConfig =
         new FileBasedConfig(
@@ -224,6 +230,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldAutoReloadConfigurationWhenRemoteConfigDeleted() throws Exception {
     SourcesCollection sources = getInstance(SourcesCollection.class);
     AutoReloadConfigDecorator autoReloadConfigDecorator =
@@ -259,7 +266,6 @@
   }
 
   private void setReplicationSource(String remoteName) throws IOException {
-    config.setString("replication", null, "instanceLabel", remoteName);
     config.setBoolean("gerrit", null, "autoReload", true);
     config.save();
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index ec60793..f3c62c5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -19,60 +19,37 @@
 import static com.google.gerrit.acceptance.GitUtil.pushOne;
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow;
 import static com.google.gerrit.server.group.SystemGroupBackend.REGISTERED_USERS;
-import static java.util.stream.Collectors.toList;
 
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.SkipProjectClone;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Permission;
-import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
 import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.google.gerrit.extensions.common.Input;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.extensions.restapi.AuthException;
-import com.google.gerrit.extensions.restapi.BadRequestException;
-import com.google.gerrit.extensions.restapi.ResourceConflictException;
-import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestApiException;
-import com.google.gerrit.extensions.restapi.RestModifyView;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.project.ProjectResource;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Supplier;
-import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
 import org.eclipse.jgit.junit.TestRepository;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.PushResult;
 import org.eclipse.jgit.transport.ReceiveCommand;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
-import org.eclipse.jgit.util.FS;
 import org.junit.Test;
 
 @SkipProjectClone
@@ -81,49 +58,31 @@
     name = "pull-replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
     httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
-public class PullReplicationIT extends LightweightPluginDaemonTest {
-  private static final Optional<String> ALL_PROJECTS = Optional.empty();
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int TEST_REPLICATION_DELAY = 1;
-  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2000);
-  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
-  private static final String TEST_REPLICATION_REMOTE = "remote1";
+public class PullReplicationIT extends PullReplicationSetupBase {
 
-  @Inject private SitePaths sitePaths;
-  @Inject private ProjectOperations projectOperations;
-  @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
-  private Path gitPath;
-  private FileBasedConfig config;
-  private FileBasedConfig secureConfig;
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    List<String> fetchUrls =
+        buildReplicaURLs(replicaSuffixes, s -> gitPath.resolve("${name}" + s + ".git").toString());
+    config.setStringList("remote", remoteName, "url", fetchUrls);
+    config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+    config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
+    config.setInt("remote", remoteName, "timeout", 600);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
 
   @Override
   public void setUpTestPlugin() throws Exception {
     setUpTestPlugin(false);
   }
 
-  protected void setUpTestPlugin(boolean loadExisting) throws Exception {
-    gitPath = sitePaths.site_path.resolve("git");
-
-    File configFile = sitePaths.etc_dir.resolve("replication.config").toFile();
-    config = new FileBasedConfig(configFile, FS.DETECTED);
-    if (loadExisting && configFile.exists()) {
-      config.load();
-    }
-    setReplicationSource(
-        TEST_REPLICATION_REMOTE,
-        TEST_REPLICATION_SUFFIX,
-        ALL_PROJECTS); // Simulates a full replication.config initialization
-    config.save();
-
-    secureConfig =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED);
-    setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
-    secureConfig.save();
-
-    super.setUpTestPlugin();
-  }
-
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRef() throws Exception {
     testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
@@ -151,6 +110,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranch() throws Exception {
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
     createTestProject(testProjectName);
@@ -185,6 +145,7 @@
 
   @Test
   @UseLocalDisk
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateForceUpdatedBranch() throws Exception {
     boolean forcedPush = true;
     String testProjectName = project + TEST_REPLICATION_SUFFIX;
@@ -254,6 +215,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewChangeRefCGitClient() throws Exception {
     AutoReloadConfigDecorator autoReloadConfigDecorator =
         getInstance(AutoReloadConfigDecorator.class);
@@ -289,6 +251,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateNewBranchCGitClient() throws Exception {
     AutoReloadConfigDecorator autoReloadConfigDecorator =
         getInstance(AutoReloadConfigDecorator.class);
@@ -330,6 +293,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateProjectDeletion() throws Exception {
     String projectToDelete = project.get();
     setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(projectToDelete));
@@ -358,6 +322,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
   public void shouldReplicateHeadUpdate() throws Exception {
     String testProjectName = project.get();
     setReplicationSource(TEST_REPLICATION_REMOTE, "", Optional.of(testProjectName));
@@ -389,80 +354,32 @@
         });
   }
 
-  private Ref getRef(Repository repo, String branchName) throws IOException {
-    return repo.getRefDatabase().exactRef(branchName);
-  }
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  @GerritConfig(name = "container.replica", value = "true")
+  public void shouldReplicateNewChangeRefToReplica() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
 
-  private Ref checkedGetRef(Repository repo, String branchName) {
-    try {
-      return repo.getRefDatabase().exactRef(branchName);
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
-      return null;
-    }
-  }
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
 
-  private void setReplicationSource(
-      String remoteName, String replicaSuffix, Optional<String> project)
-      throws IOException, ConfigInvalidException {
-    setReplicationSource(remoteName, Arrays.asList(replicaSuffix), project);
-  }
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            sourceRef,
+            ObjectId.zeroId().getName(),
+            sourceCommit.getId().getName(),
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
 
-  private void setReplicationSource(
-      String remoteName, List<String> replicaSuffixes, Optional<String> project)
-      throws IOException, ConfigInvalidException {
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
 
-    List<String> replicaUrls =
-        replicaSuffixes.stream()
-            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
-            .collect(toList());
-    config.setString("replication", null, "instanceLabel", remoteName);
-    config.setStringList("remote", remoteName, "url", replicaUrls);
-    config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
-    config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
-    config.setInt("remote", remoteName, "timeout", 600);
-    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
-    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
-    config.setBoolean("gerrit", null, "autoReload", true);
-    config.save();
-  }
-
-  private void setReplicationCredentials(String remoteName, String username, String password)
-      throws IOException {
-    secureConfig.setString("remote", remoteName, "username", username);
-    secureConfig.setString("remote", remoteName, "password", password);
-    secureConfig.save();
-  }
-
-  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
-    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
-  }
-
-  private <T> T getInstance(Class<T> classObj) {
-    return plugin.getSysInjector().getInstance(classObj);
-  }
-
-  private Project.NameKey createTestProject(String name) throws Exception {
-    return projectOperations.newProject().name(name).create();
-  }
-
-  @Singleton
-  public static class FakeDeleteProjectPlugin implements RestModifyView<ProjectResource, Input> {
-    private int deleteEndpointCalls;
-
-    FakeDeleteProjectPlugin() {
-      this.deleteEndpointCalls = 0;
-    }
-
-    @Override
-    public Response<?> apply(ProjectResource resource, Input input)
-        throws AuthException, BadRequestException, ResourceConflictException, Exception {
-      deleteEndpointCalls += 1;
-      return Response.ok();
-    }
-
-    int getDeleteEndpointCalls() {
-      return deleteEndpointCalls;
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
new file mode 100644
index 0000000..e07d481
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationSetupBase.java
@@ -0,0 +1,124 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+public abstract class PullReplicationSetupBase extends LightweightPluginDaemonTest {
+
+  protected static final Optional<String> ALL_PROJECTS = Optional.empty();
+  protected static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  protected static final int TEST_REPLICATION_DELAY = 1;
+  protected static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2000);
+  protected static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  protected static final String TEST_REPLICATION_REMOTE = "remote1";
+  @Inject protected SitePaths sitePaths;
+  @Inject protected ProjectOperations projectOperations;
+  @Inject protected DynamicSet<ProjectDeletedListener> deletedListeners;
+  protected Path gitPath;
+  protected FileBasedConfig config;
+  protected FileBasedConfig secureConfig;
+
+  protected void setUpTestPlugin(boolean loadExisting) throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    File configFile = sitePaths.etc_dir.resolve("replication.config").toFile();
+    config = new FileBasedConfig(configFile, FS.DETECTED);
+    if (loadExisting && configFile.exists()) {
+      config.load();
+    }
+    setReplicationSource(
+        TEST_REPLICATION_REMOTE,
+        TEST_REPLICATION_SUFFIX,
+        ALL_PROJECTS); // Simulates a full replication.config initialization
+    config.save();
+
+    secureConfig =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED);
+    setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
+    secureConfig.save();
+
+    super.setUpTestPlugin();
+  }
+
+  protected Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  protected Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  protected void setReplicationSource(
+      String remoteName, String replicaSuffix, Optional<String> project)
+      throws IOException, ConfigInvalidException {
+    setReplicationSource(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  protected abstract void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project) throws IOException;
+
+  protected void setReplicationCredentials(String remoteName, String username, String password)
+      throws IOException {
+    secureConfig.setString("remote", remoteName, "username", username);
+    secureConfig.setString("remote", remoteName, "password", password);
+    secureConfig.save();
+  }
+
+  protected void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  protected <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
+  protected NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  protected List<String> buildReplicaURLs(
+      List<String> replicaSuffixes, Function<String, String> toURL) {
+    return replicaSuffixes.stream().map(suffix -> toURL.apply(suffix)).collect(toList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java
new file mode 100644
index 0000000..d8e5947
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationWithGitHttpTransportProtocolIT.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.ReceiveCommand;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+    httpModule = "com.googlesource.gerrit.plugins.replication.pull.api.HttpModule")
+public class PullReplicationWithGitHttpTransportProtocolIT extends PullReplicationSetupBase {
+
+  @Override
+  protected void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    List<String> fetchUrls =
+        buildReplicaURLs(replicaSuffixes, s -> adminRestSession.url() + "/${name}" + s + ".git");
+    config.setStringList("remote", remoteName, "url", fetchUrls);
+    config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+    config.setString("remote", remoteName, "fetch", "+refs/*:refs/*");
+    config.setInt("remote", remoteName, "timeout", 600);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    setUpTestPlugin(false);
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  public void shouldReplicateWithBasicAuthentication() throws Exception {
+    runTest();
+  }
+
+  @Test
+  @GerritConfig(name = "gerrit.instanceId", value = TEST_REPLICATION_REMOTE)
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldReplicateWithBearerTokenAuthentication() throws Exception {
+    runTest();
+  }
+
+  private void runTest() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            sourceRef,
+            ObjectId.zeroId().getName(),
+            sourceCommit.getId().getName(),
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
index 1019b86..20c1fea 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
@@ -42,18 +42,18 @@
 import java.util.Base64;
 import java.util.List;
 import java.util.Optional;
+import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
-import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.AuthenticationException;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.auth.BasicScheme;
 import org.apache.http.message.BasicHeader;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -83,7 +83,11 @@
   SourceHttpClient.Factory httpClientFactory;
   String url;
 
-  protected abstract String getURL(String projectName);
+  protected abstract String getURLWithAuthenticationPrefix(String projectName);
+
+  protected String getURLWithoutAuthenticationPrefix(String projectName) {
+    return getURLWithAuthenticationPrefix(projectName).replace("a/", "");
+  }
 
   @Override
   public void setUpTestPlugin() throws Exception {
@@ -109,7 +113,7 @@
     revisionReader = plugin.getSysInjector().getInstance(RevisionReader.class);
     source = plugin.getSysInjector().getInstance(SourcesCollection.class).getAll().get(0);
 
-    url = getURL(project.get());
+    url = getURLWithAuthenticationPrefix(project.get());
   }
 
   protected HttpPost createRequest(String sendObjectPayload) {
@@ -170,17 +174,28 @@
     };
   }
 
-  protected HttpClientContext getContext() {
-    return getContextForAccount(admin);
+  protected HttpRequestBase withBearerTokenAuthentication(
+      HttpRequestBase httpRequest, String bearerToken) {
+    httpRequest.addHeader(new BasicHeader(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken));
+    return httpRequest;
   }
 
-  protected HttpClientContext getUserContext() {
-    return getContextForAccount(user);
+  protected HttpRequestBase withBasicAuthenticationAsAdmin(HttpRequestBase httpRequest)
+      throws AuthenticationException {
+    return withBasicAuthentication(httpRequest, admin);
   }
 
-  protected HttpClientContext getAnonymousContext() {
-    HttpClientContext ctx = HttpClientContext.create();
-    return ctx;
+  protected HttpRequestBase withBasicAuthenticationAsUser(HttpRequestBase httpRequest)
+      throws AuthenticationException {
+    return withBasicAuthentication(httpRequest, user);
+  }
+
+  private HttpRequestBase withBasicAuthentication(HttpRequestBase httpRequest, TestAccount account)
+      throws AuthenticationException {
+    UsernamePasswordCredentials creds =
+        new UsernamePasswordCredentials(account.username(), account.httpPassword());
+    httpRequest.addHeader(new BasicScheme().authenticate(creds, httpRequest, null));
+    return httpRequest;
   }
 
   private Project.NameKey createTestProject(String name) throws Exception {
@@ -200,7 +215,6 @@
         replicaSuffixes.stream()
             .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
             .collect(toList());
-    config.setString("replication", null, "instanceLabel", remoteName);
     config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
     config.setString("remote", remoteName, "fetch", "+refs/tags/*:refs/tags/*");
@@ -217,13 +231,4 @@
     secureConfig.setString("remote", remoteName, "password", password);
     secureConfig.save();
   }
-
-  private HttpClientContext getContextForAccount(TestAccount account) {
-    HttpClientContext ctx = HttpClientContext.create();
-    CredentialsProvider adapted = new BasicCredentialsProvider();
-    adapted.setCredentials(
-        AuthScope.ANY, new UsernamePasswordCredentials(account.username(), account.httpPassword()));
-    ctx.setCredentialsProvider(adapted);
-    return ctx;
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
index aad3903..2ab5caf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
@@ -22,7 +22,6 @@
 import com.google.gerrit.extensions.restapi.Url;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import java.util.Optional;
-import org.apache.http.client.methods.HttpPost;
 import org.junit.Test;
 
 public class ApplyObjectActionIT extends ActionITBase {
@@ -41,8 +40,11 @@
     RevisionData revisionData = revisionDataOption.get();
     String sendObjectPayload = createPayload(payloadWithAsyncFieldTemplate, refName, revisionData);
 
-    HttpPost post = createRequest(sendObjectPayload);
-    httpClientFactory.create(source).execute(post, assertHttpResponseCode(201), getContext());
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(201));
   }
 
   @Test
@@ -60,8 +62,11 @@
     String sendObjectPayload =
         createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
 
-    HttpPost post = createRequest(sendObjectPayload);
-    httpClientFactory.create(source).execute(post, assertHttpResponseCode(201), getContext());
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(201));
   }
 
   @Test
@@ -80,8 +85,11 @@
     String sendObjectPayload =
         createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
 
-    HttpPost post = createRequest(sendObjectPayload);
-    httpClientFactory.create(source).execute(post, assertHttpResponseCode(201), getContext());
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(201));
   }
 
   @Test
@@ -103,13 +111,16 @@
         String.format(
             "%s/a/projects/%s/pull-replication~apply-object",
             adminRestSession.url(), Url.encode(projectName.get()));
-    HttpPost post = createRequest(sendObjectPayload);
-    httpClientFactory.create(source).execute(post, assertHttpResponseCode(201), getContext());
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(201));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
-  public void shouldReturnUnauthorizedWhenNodeIsAReplicaAndUSerIsAnonymous() throws Exception {
+  public void shouldReturnForbiddenWhenNodeIsAReplicaAndUSerIsAnonymous() throws Exception {
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
@@ -123,10 +134,9 @@
     String sendObjectPayload =
         createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
 
-    HttpPost post = createRequest(sendObjectPayload);
     httpClientFactory
         .create(source)
-        .execute(post, assertHttpResponseCode(401), getAnonymousContext());
+        .execute(createRequest(sendObjectPayload), assertHttpResponseCode(403));
   }
 
   @Test
@@ -142,8 +152,11 @@
     String sendObjectPayload =
         createPayload(payloadWithoutLabelFieldTemplate, refName, revisionData);
 
-    HttpPost post = createRequest(sendObjectPayload);
-    httpClientFactory.create(source).execute(post, assertHttpResponseCode(400), getContext());
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(400));
   }
 
   @Test
@@ -160,8 +173,61 @@
     RevisionData revisionData = revisionDataOption.get();
     String sendObjectPayload = createPayload(wrongPayloadTemplate, refName, revisionData);
 
-    HttpPost post = createRequest(sendObjectPayload);
-    httpClientFactory.create(source).execute(post, assertHttpResponseCode(400), getContext());
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(400));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "true")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldAcceptPayloadWhenNodeIsAReplicaWithBearerToken() throws Exception {
+    url = getURLWithoutAuthenticationPrefix(project.get());
+    String payloadWithoutAsyncFieldTemplate =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+
+    String refName = createRef();
+    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    assertThat(revisionDataOption.isPresent()).isTrue();
+
+    RevisionData revisionData = revisionDataOption.get();
+    String sendObjectPayload =
+        createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createRequest(sendObjectPayload), "some-bearer-token"),
+            assertHttpResponseCode(201));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "false")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldAcceptPayloadWhenNodeIsAPrimaryWithBearerToken() throws Exception {
+    url = getURLWithoutAuthenticationPrefix(project.get());
+    String payloadWithoutAsyncFieldTemplate =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+
+    String refName = createRef();
+    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    assertThat(revisionDataOption.isPresent()).isTrue();
+
+    RevisionData revisionData = revisionDataOption.get();
+    String sendObjectPayload =
+        createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createRequest(sendObjectPayload), "some-bearer-token"),
+            assertHttpResponseCode(201));
   }
 
   private String createPayload(
@@ -176,7 +242,7 @@
   }
 
   @Override
-  protected String getURL(String projectName) {
+  protected String getURLWithAuthenticationPrefix(String projectName) {
     return String.format(
         "%s/a/projects/%s/pull-replication~apply-object",
         adminRestSession.url(), Url.encode(projectName));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
new file mode 100644
index 0000000..824496a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
@@ -0,0 +1,212 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static javax.servlet.http.HttpServletResponse.SC_UNAUTHORIZED;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.httpd.WebSession;
+import com.google.gerrit.server.AccessPath;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationInternalUser;
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BearerAuthenticationFilterTest {
+
+  private final Optional<String> NO_QUERY_PARAMETERS = Optional.empty();
+  private final Optional<String> GIT_UPLOAD_PACK_QUERY_PARAMETER =
+      Optional.of("service=git-upload-pack");
+  @Mock private DynamicItem<WebSession> session;
+  @Mock private WebSession webSession;
+  @Mock private Provider<ThreadLocalRequestContext> threadLocalRequestContextProvider;
+  @Mock private PullReplicationInternalUser pluginUser;
+  @Mock private ThreadLocalRequestContext threadLocalRequestContext;
+  @Mock private HttpServletRequest httpServletRequest;
+  @Mock private HttpServletResponse httpServletResponse;
+  @Mock private FilterChain filterChain;
+  private final String pluginName = "pull-replication";
+
+  private void authenticateAndFilter(String uri, Optional<String> queryStringMaybe)
+      throws ServletException, IOException {
+    final String bearerToken = "some-bearer-token";
+    when(httpServletRequest.getRequestURI()).thenReturn(uri);
+    queryStringMaybe.ifPresent(qs -> when(httpServletRequest.getQueryString()).thenReturn(qs));
+    when(httpServletRequest.getHeader("Authorization"))
+        .thenReturn(String.format("Bearer %s", bearerToken));
+    when(threadLocalRequestContextProvider.get()).thenReturn(threadLocalRequestContext);
+    when(session.get()).thenReturn(webSession);
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session, pluginName, pluginUser, threadLocalRequestContextProvider, bearerToken);
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest, atMost(2)).getRequestURI();
+    verify(httpServletRequest, atMost(1)).getQueryString();
+    verify(httpServletRequest).getHeader("Authorization");
+    verify(threadLocalRequestContextProvider).get();
+    verify(session).get();
+    verify(webSession).setAccessPathOk(AccessPath.REST_API, true);
+    verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenFetch() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/pull-replication~fetch", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenApplyObject() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/pull-replication~apply-object", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenApplyObjects() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/pull-replication~apply-objects", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenDeleteProject() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/pull-replication~delete-project", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenUpdateHead() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/projects/my-project/HEAD", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenInitProject() throws ServletException, IOException {
+    authenticateAndFilter(
+        "any-prefix/pull-replication/init-project/my-project.git", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenGitUploadPacket() throws ServletException, IOException {
+    authenticateAndFilter("any-prefix/git-upload-pack", NO_QUERY_PARAMETERS);
+  }
+
+  @Test
+  public void shouldAuthenticateWhenGitUploadPacketInQueryParameter()
+      throws ServletException, IOException {
+    authenticateAndFilter("any-prefix", GIT_UPLOAD_PACK_QUERY_PARAMETER);
+  }
+
+  @Test
+  public void shouldBe401WhenBearerTokenDoesNotMatch() throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI()).thenReturn("any-prefix/pull-replication~fetch");
+    when(httpServletRequest.getHeader("Authorization"))
+        .thenReturn(String.format("Bearer %s", "some-different-bearer-token"));
+
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest).getRequestURI();
+    verify(httpServletRequest).getHeader("Authorization");
+    verify(httpServletResponse).sendError(SC_UNAUTHORIZED);
+  }
+
+  @Test
+  public void shouldBe401WhenBearerTokenCannotBeExtracted() throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI()).thenReturn("any-prefix/pull-replication~fetch");
+    when(httpServletRequest.getHeader("Authorization")).thenReturn("bearer token");
+
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest).getRequestURI();
+    verify(httpServletRequest).getHeader("Authorization");
+    verify(httpServletResponse).sendError(SC_UNAUTHORIZED);
+  }
+
+  @Test
+  public void shouldBe401WhenNoAuthorizationHeaderInRequest() throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI()).thenReturn("any-prefix/pull-replication~fetch");
+
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest).getRequestURI();
+    verify(httpServletResponse).sendError(SC_UNAUTHORIZED);
+  }
+
+  @Test
+  public void shouldGoNextInChainWhenUriDoesNotMatch() throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI()).thenReturn("any-url");
+
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest, times(2)).getRequestURI();
+    verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
+  }
+
+  @Test
+  public void shouldGoNextInChainWhenBasicAuthorizationIsRequired()
+      throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI())
+        .thenReturn("/a/projects/my-project/pull-replication~fetch");
+
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest).getRequestURI();
+    verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java
index 4415a4b..eb4d322 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java
@@ -33,6 +33,7 @@
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.project.ProjectState;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.util.Optional;
@@ -94,7 +95,7 @@
             applyObject,
             permissionBackend,
             eventDispatcherDataItem,
-            gitManager);
+            new LocalGitRepositoryManagerProvider(gitManager));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionIT.java
index 4ad8ce6..cc01c2a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionIT.java
@@ -35,7 +35,9 @@
 
     httpClientFactory
         .create(source)
-        .execute(createRequest(sendObjectPayload), assertHttpResponseCode(201), getContext());
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(201));
   }
 
   @Test
@@ -55,12 +57,14 @@
             adminRestSession.url(), Url.encode(projectName.get()));
     httpClientFactory
         .create(source)
-        .execute(createRequest(sendObjectPayload), assertHttpResponseCode(201), getContext());
+        .execute(
+            withBasicAuthenticationAsAdmin(createRequest(sendObjectPayload)),
+            assertHttpResponseCode(201));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
-  public void shouldReturnUnauthorizedWhenNodeIsAReplicaAndUSerIsAnonymous() throws Exception {
+  public void shouldReturnForbiddenWhenNodeIsAReplicaAndUSerIsAnonymous() throws Exception {
     String refName = createRef();
     String sendObjectPayload =
         "{\"label\":\""
@@ -71,12 +75,51 @@
 
     httpClientFactory
         .create(source)
+        .execute(createRequest(sendObjectPayload), assertHttpResponseCode(403));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "true")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldFetchRefWhenNodeIsAReplicaWithBearerToken() throws Exception {
+    String refName = createRef();
+    url = getURLWithoutAuthenticationPrefix(project.get());
+    String sendObjectPayload =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":false}";
+
+    httpClientFactory
+        .create(source)
         .execute(
-            createRequest(sendObjectPayload), assertHttpResponseCode(401), getAnonymousContext());
+            withBearerTokenAuthentication(createRequest(sendObjectPayload), "some-bearer-token"),
+            assertHttpResponseCode(201));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "false")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldFetchRefWhenNodeIsAPrimaryWithBearerToken() throws Exception {
+    String refName = createRef();
+    url = getURLWithoutAuthenticationPrefix(project.get());
+    String sendObjectPayload =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\", \"ref_name\": \""
+            + refName
+            + "\", \"async\":false}";
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createRequest(sendObjectPayload), "some-bearer-token"),
+            assertHttpResponseCode(201));
   }
 
   @Override
-  protected String getURL(String projectName) {
+  protected String getURLWithAuthenticationPrefix(String projectName) {
     return String.format(
         "%s/a/projects/%s/pull-replication~fetch", adminRestSession.url(), Url.encode(projectName));
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
index fb7f3d1..ce0b9d3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -55,6 +55,8 @@
   int taskId = 1234;
 
   @Mock FetchCommand fetchCommand;
+  @Mock FetchJob fetchJob;
+  @Mock FetchJob.Factory fetchJobFactory;
   @Mock ProjectResource projectResource;
   @Mock WorkQueue workQueue;
   @Mock ScheduledExecutorService exceutorService;
@@ -65,6 +67,7 @@
 
   @Before
   public void setup() {
+    when(fetchJobFactory.create(any(), any(), any())).thenReturn(fetchJob);
     when(workQueue.getDefaultQueue()).thenReturn(exceutorService);
     when(urlFormatter.getRestUrl(anyString())).thenReturn(Optional.of(location));
     when(exceutorService.submit(any(Runnable.class)))
@@ -79,7 +82,9 @@
     when(task.getTaskId()).thenReturn(taskId);
     when(preConditions.canCallFetchApi()).thenReturn(true);
 
-    fetchAction = new FetchAction(fetchCommand, workQueue, urlFormatterDynamicItem, preConditions);
+    fetchAction =
+        new FetchAction(
+            fetchCommand, workQueue, urlFormatterDynamicItem, preConditions, fetchJobFactory);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java
index 56a397e..a46a721 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectDeletionActionIT.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.server.group.SystemGroupBackend;
 import com.google.inject.Inject;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -36,21 +37,18 @@
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED),
-            getAnonymousContext());
+            createDeleteRequest(), assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED));
   }
 
   @Test
   public void shouldDeleteRepositoryWhenUserHasProjectDeletionCapabilities() throws Exception {
     String testProjectName = project.get();
-    url = getURL(testProjectName);
+    url = getURLWithAuthenticationPrefix(testProjectName);
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+            withBasicAuthenticationAsUser(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
 
     projectOperations
         .project(allProjects)
@@ -61,56 +59,53 @@
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_OK),
-            getUserContext());
+            withBasicAuthenticationAsUser(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Test
   public void shouldReturnOKWhenProjectIsDeleted() throws Exception {
     String testProjectName = project.get();
-    url = getURL(testProjectName);
+    url = getURLWithAuthenticationPrefix(testProjectName);
 
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(), assertHttpResponseCode(HttpServletResponse.SC_OK), getContext());
+            withBasicAuthenticationAsAdmin(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Test
   @Ignore("Failing in RestApiServlet: to be enabled again once that is fixed in core")
   public void shouldReturnBadRequestWhenDeletingAnInvalidProjectName() throws Exception {
-    url = getURL(INVALID_TEST_PROJECT_NAME);
+    url = getURLWithAuthenticationPrefix(INVALID_TEST_PROJECT_NAME);
 
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
-  public void shouldReturnUnauthorizedForUserWithoutPermissionsOnReplica() throws Exception {
+  public void shouldReturnForbiddenForUserWithoutPermissionsOnReplica() throws Exception {
     httpClientFactory
         .create(source)
-        .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED),
-            getAnonymousContext());
+        .execute(createDeleteRequest(), assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
   public void shouldReturnOKWhenProjectIsDeletedOnReplica() throws Exception {
     String testProjectName = project.get();
-    url = getURL(testProjectName);
+    url = getURLWithAuthenticationPrefix(testProjectName);
 
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(), assertHttpResponseCode(HttpServletResponse.SC_OK), getContext());
+            withBasicAuthenticationAsAdmin(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Test
@@ -118,13 +113,12 @@
   public void shouldDeleteRepositoryWhenUserHasProjectDeletionCapabilitiesAndNodeIsAReplica()
       throws Exception {
     String testProjectName = project.get();
-    url = getURL(testProjectName);
+    url = getURLWithAuthenticationPrefix(testProjectName);
+    HttpRequestBase deleteRequest = withBasicAuthenticationAsUser(createDeleteRequest());
+
     httpClientFactory
         .create(source)
-        .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+        .execute(deleteRequest, assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
 
     projectOperations
         .project(allProjects)
@@ -134,28 +128,52 @@
 
     httpClientFactory
         .create(source)
-        .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_OK),
-            getUserContext());
+        .execute(deleteRequest, assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
   public void shouldReturnBadRequestWhenDeletingAnInvalidProjectNameWhenNodeIsAReplica()
       throws Exception {
-    url = getURL(INVALID_TEST_PROJECT_NAME);
+    url = getURLWithAuthenticationPrefix(INVALID_TEST_PROJECT_NAME);
 
     httpClientFactory
         .create(source)
         .execute(
-            createDeleteRequest(),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createDeleteRequest()),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "true")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldReturnOKWhenProjectIsDeletedOnReplicaWithBearerToken() throws Exception {
+    String testProjectName = project.get();
+    url = getURLWithoutAuthenticationPrefix(testProjectName);
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createDeleteRequest(), "some-bearer-token"),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "false")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldReturnOKWhenProjectIsDeletedOnPrimaryWithBearerToken() throws Exception {
+    String testProjectName = project.get();
+    url = getURLWithoutAuthenticationPrefix(testProjectName);
+
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createDeleteRequest(), "some-bearer-token"),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Override
-  protected String getURL(String projectName) {
+  protected String getURLWithAuthenticationPrefix(String projectName) {
     return String.format(
         "%s/a/projects/%s/pull-replication~delete-project",
         adminRestSession.url(), Url.encode(projectName));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationActionIT.java
index d8ff02a..77f05a1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationActionIT.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allowCapability;
-import static com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction.getProjectInitializationUrl;
 
 import com.google.common.net.MediaType;
 import com.google.gerrit.acceptance.config.GerritConfig;
@@ -27,6 +26,7 @@
 import javax.servlet.http.HttpServletResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.message.BasicHeader;
 import org.junit.Test;
 
@@ -40,8 +40,7 @@
         .create(source)
         .execute(
             createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED),
-            getAnonymousContext());
+            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED));
   }
 
   @Test
@@ -49,40 +48,37 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithoutHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequestWithoutHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
   }
 
   @Test
   public void shouldCreateRepository() throws Exception {
     String newProjectName = "new/newProjectForPrimary";
-    url = getURL(newProjectName);
+    url = getURLWithAuthenticationPrefix(newProjectName);
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_CREATED),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequestWithHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_CREATED));
 
-    HttpGet getNewProjectRequest =
-        new HttpGet(userRestSession.url() + "/a/projects/" + Url.encode(newProjectName));
+    HttpRequestBase getNewProjectRequest =
+        withBasicAuthenticationAsAdmin(
+            new HttpGet(userRestSession.url() + "/a/projects/" + Url.encode(newProjectName)));
+
     httpClientFactory
         .create(source)
-        .execute(
-            getNewProjectRequest, assertHttpResponseCode(HttpServletResponse.SC_OK), getContext());
+        .execute(getNewProjectRequest, assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Test
   public void shouldCreateRepositoryWhenUserHasProjectCreationCapabilities() throws Exception {
     String newProjectName = "new/newProjectForUserWithCapabilities";
-    url = getURL(newProjectName);
+    url = getURLWithAuthenticationPrefix(newProjectName);
+    HttpRequestBase put = withBasicAuthenticationAsUser(createPutRequestWithHeaders());
     httpClientFactory
         .create(source)
-        .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+        .execute(put, assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
 
     projectOperations
         .project(allProjects)
@@ -94,10 +90,7 @@
 
     httpClientFactory
         .create(source)
-        .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_CREATED),
-            getUserContext());
+        .execute(put, assertHttpResponseCode(HttpServletResponse.SC_CREATED));
   }
 
   @Test
@@ -105,22 +98,20 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+            withBasicAuthenticationAsUser(createPutRequestWithHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
   public void shouldCreateRepositoryWhenNodeIsAReplica() throws Exception {
     String newProjectName = "new/newProjectForReplica";
-    url = getURL(newProjectName);
+    url = getURLWithAuthenticationPrefix(newProjectName);
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_CREATED),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequestWithHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_CREATED));
   }
 
   @Test
@@ -129,9 +120,8 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+            withBasicAuthenticationAsUser(createPutRequestWithHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
   }
 
   @Test
@@ -139,13 +129,11 @@
   public void shouldCreateRepositoryWhenUserHasProjectCreationCapabilitiesAndNodeIsAReplica()
       throws Exception {
     String newProjectName = "new/newProjectForUserWithCapabilitiesReplica";
-    url = getURL(newProjectName);
+    url = getURLWithAuthenticationPrefix(newProjectName);
+    HttpRequestBase put = withBasicAuthenticationAsUser(createPutRequestWithHeaders());
     httpClientFactory
         .create(source)
-        .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+        .execute(put, assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
 
     projectOperations
         .project(allProjects)
@@ -157,24 +145,19 @@
 
     httpClientFactory
         .create(source)
-        .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_CREATED),
-            getUserContext());
+        .execute(put, assertHttpResponseCode(HttpServletResponse.SC_CREATED));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
   public void shouldReturnBadRequestIfProjectNameIsInvalidAndCannotBeCreatedWhenNodeIsAReplica()
       throws Exception {
-    url = getURL(INVALID_TEST_PROJECT_NAME);
-
+    url = getURLWithAuthenticationPrefix(INVALID_TEST_PROJECT_NAME);
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequestWithHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
   }
 
   @Test
@@ -183,28 +166,50 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequestWithoutHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequestWithoutHeaders()),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
   }
 
   @Test
   @GerritConfig(name = "container.replica", value = "true")
-  public void shouldReturnUnauthorizedForUserWithoutPermissionsWhenNodeIsAReplica()
-      throws Exception {
+  public void shouldReturnForbiddenForUserWithoutPermissionsWhenNodeIsAReplica() throws Exception {
     httpClientFactory
         .create(source)
         .execute(
             createPutRequestWithHeaders(),
-            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED),
-            getAnonymousContext());
+            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "true")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldCreateRepositoryWhenNodeIsAReplicaWithBearerToken() throws Exception {
+    String newProjectName = "new/newProjectForReplica";
+    url = getURLWithoutAuthenticationPrefix(newProjectName);
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createPutRequestWithHeaders(), "some-bearer-token"),
+            assertHttpResponseCode(HttpServletResponse.SC_CREATED));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "false")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  public void shouldCreateRepositoryWhenNodeIsAPrimaryWithBearerToken() throws Exception {
+    String newProjectName = "new/newProjectForReplica";
+    url = getURLWithoutAuthenticationPrefix(newProjectName);
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(createPutRequestWithHeaders(), "some-bearer-token"),
+            assertHttpResponseCode(HttpServletResponse.SC_CREATED));
   }
 
   @Override
-  protected String getURL(String projectName) {
+  protected String getURLWithAuthenticationPrefix(String projectName) {
     return userRestSession.url()
-        + "/"
-        + getProjectInitializationUrl("pull-replication", Url.encode(projectName));
+        + String.format("/a/plugins/pull-replication/init-project/%s.git", Url.encode(projectName));
   }
 
   protected HttpPut createPutRequestWithHeaders() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
new file mode 100644
index 0000000..2ed1466
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
@@ -0,0 +1,331 @@
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.net.HttpHeaders.ACCEPT;
+import static com.google.gerrit.httpd.restapi.RestApiServlet.SC_UNPROCESSABLE_ENTITY;
+import static javax.servlet.http.HttpServletResponse.SC_CONFLICT;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.internal.verification.VerificationModeFactory.atLeastOnce;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+import com.google.common.net.MediaType;
+import com.google.gerrit.extensions.restapi.*;
+import com.google.gerrit.server.project.ProjectResource;
+import com.google.gerrit.server.restapi.project.ProjectsCollection;
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+import javax.servlet.FilterChain;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullReplicationFilterTest {
+
+  @Mock HttpServletRequest request;
+  @Mock HttpServletResponse response;
+  @Mock FilterChain filterChain;
+  @Mock private FetchAction fetchAction;
+  @Mock private ApplyObjectAction applyObjectAction;
+  @Mock private ApplyObjectsAction applyObjectsAction;
+  @Mock private ProjectInitializationAction projectInitializationAction;
+  @Mock private UpdateHeadAction updateHEADAction;
+  @Mock private ProjectDeletionAction projectDeletionAction;
+  @Mock private ProjectsCollection projectsCollection;
+  @Mock private ProjectResource projectResource;
+  @Mock private ServletOutputStream outputStream;
+  @Mock private PrintWriter printWriter;
+  private final String PLUGIN_NAME = "pull-replication";
+  private final String PROJECT_NAME = "some-project";
+  private final String PROJECT_NAME_GIT = "some-project.git";
+  private final String FETCH_URI =
+      String.format("any-prefix/projects/%s/%s~fetch", PROJECT_NAME, PLUGIN_NAME);
+  private final String APPLY_OBJECT_URI =
+      String.format("any-prefix/projects/%s/%s~apply-object", PROJECT_NAME, PLUGIN_NAME);
+  private final String APPLY_OBJECTS_URI =
+      String.format("any-prefix/projects/%s/%s~apply-objects", PROJECT_NAME, PLUGIN_NAME);
+  private final String HEAD_URI = String.format("any-prefix/projects/%s/HEAD", PROJECT_NAME);
+  private final String DELETE_PROJECT_URI =
+      String.format("any-prefix/projects/%s/%s~delete-project", PROJECT_NAME, PLUGIN_NAME);
+  private final String INIT_PROJECT_URI =
+      String.format("any-prefix/%s/init-project/%s", PLUGIN_NAME, PROJECT_NAME_GIT);
+
+  private final Response OK_RESPONSE = Response.ok();
+
+  private PullReplicationFilter createPullReplicationFilter() {
+    return new PullReplicationFilter(
+        fetchAction,
+        applyObjectAction,
+        applyObjectsAction,
+        projectInitializationAction,
+        updateHEADAction,
+        projectDeletionAction,
+        projectsCollection,
+        PLUGIN_NAME);
+  }
+
+  private void defineBehaviours(byte[] payload, String uri) throws Exception {
+    when(request.getRequestURI()).thenReturn(uri);
+    InputStream is = new ByteArrayInputStream(payload);
+    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
+    when(request.getReader()).thenReturn(bufferedReader);
+    when(projectsCollection.parse(TopLevelResource.INSTANCE, IdString.fromDecoded(PROJECT_NAME)))
+        .thenReturn(projectResource);
+    when(response.getWriter()).thenReturn(printWriter);
+  }
+
+  private void verifyBehaviours() throws Exception {
+    verify(request, atLeastOnce()).getRequestURI();
+    verify(request).getReader();
+    verify(projectsCollection).parse(TopLevelResource.INSTANCE, IdString.fromDecoded(PROJECT_NAME));
+    verify(response).getWriter();
+    verify(response).setContentType("application/json");
+    verify(response).setStatus(HttpServletResponse.SC_OK);
+  }
+
+  @Test
+  public void shouldFilterFetchAction() throws Exception {
+    byte[] payloadFetch =
+        ("{"
+                + "\"label\":\"Replication\", "
+                + "\"ref_name\": \"refs/heads/master\", "
+                + "\"async\":false"
+                + "}")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadFetch, FETCH_URI);
+    when(fetchAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verifyBehaviours();
+    verify(fetchAction).apply(eq(projectResource), any());
+  }
+
+  @Test
+  public void shouldFilterApplyObjectAction() throws Exception {
+
+    byte[] payloadApplyObject =
+        ("{\"label\":\"Replication\",\"ref_name\":\"refs/heads/master\","
+                + "\"revision_data\":{"
+                + "\"commit_object\":{\"type\":1,\"content\":\"some-content\"},"
+                + "\"tree_object\":{\"type\":2,\"content\":\"some-content\"},"
+                + "\"blobs\":[]}"
+                + "}")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadApplyObject, APPLY_OBJECT_URI);
+
+    when(applyObjectAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verifyBehaviours();
+    verify(applyObjectAction).apply(eq(projectResource), any());
+  }
+
+  @Test
+  public void shouldFilterApplyObjectsAction() throws Exception {
+
+    byte[] payloadApplyObjects =
+        ("{\"label\":\"Replication\",\"ref_name\":\"refs/heads/master\","
+                + "\"revisions_data\":[{"
+                + "\"commit_object\":{\"type\":1,\"content\":\"some-content\"},"
+                + "\"tree_object\":{\"type\":2,\"content\":\"some-content\"},"
+                + "\"blobs\":[]}]}")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadApplyObjects, APPLY_OBJECTS_URI);
+
+    when(applyObjectsAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verifyBehaviours();
+    verify(applyObjectsAction).apply(eq(projectResource), any());
+  }
+
+  @Test
+  public void shouldFilterProjectInitializationAction() throws Exception {
+
+    when(request.getRequestURI()).thenReturn(INIT_PROJECT_URI);
+    when(request.getHeader(ACCEPT)).thenReturn(MediaType.PLAIN_TEXT_UTF_8.toString());
+    when(projectInitializationAction.initProject(PROJECT_NAME_GIT)).thenReturn(true);
+    when(response.getWriter()).thenReturn(printWriter);
+
+    final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(request, times(5)).getRequestURI();
+    verify(projectInitializationAction).initProject(eq(PROJECT_NAME_GIT));
+    verify(response).getWriter();
+  }
+
+  @Test
+  public void shouldFilterUpdateHEADAction() throws Exception {
+
+    byte[] payloadUpdateHead = "{\"ref\":\"some-ref\"}".getBytes(StandardCharsets.UTF_8);
+    defineBehaviours(payloadUpdateHead, HEAD_URI);
+    when(request.getMethod()).thenReturn("PUT");
+    when(updateHEADAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+
+    final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verifyBehaviours();
+    verify(updateHEADAction).apply(eq(projectResource), any());
+  }
+
+  @Test
+  public void shouldFilterProjectDeletionAction() throws Exception {
+    when(request.getRequestURI()).thenReturn(DELETE_PROJECT_URI);
+    when(request.getMethod()).thenReturn("DELETE");
+    when(projectsCollection.parse(TopLevelResource.INSTANCE, IdString.fromDecoded(PROJECT_NAME)))
+        .thenReturn(projectResource);
+    when(projectDeletionAction.apply(any(), any())).thenReturn(OK_RESPONSE);
+    when(response.getWriter()).thenReturn(printWriter);
+
+    final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(request, times(7)).getRequestURI();
+    verify(projectsCollection).parse(TopLevelResource.INSTANCE, IdString.fromDecoded(PROJECT_NAME));
+    verify(projectDeletionAction).apply(eq(projectResource), any());
+    verify(response).getWriter();
+    verify(response).setContentType("application/json");
+    verify(response).setStatus(OK_RESPONSE.statusCode());
+  }
+
+  @Test
+  public void shouldGoNextInChainWhenUriDoesNotMatch() throws Exception {
+    when(request.getRequestURI()).thenReturn("any-url");
+    final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+    verify(filterChain).doFilter(request, response);
+  }
+
+  @Test
+  public void shouldBe404WhenJsonIsMalformed() throws Exception {
+    byte[] payloadMalformedJson = "some-json-malformed".getBytes(StandardCharsets.UTF_8);
+    InputStream is = new ByteArrayInputStream(payloadMalformedJson);
+    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
+    when(request.getRequestURI()).thenReturn(FETCH_URI);
+    when(request.getReader()).thenReturn(bufferedReader);
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(HttpServletResponse.SC_BAD_REQUEST);
+  }
+
+  @Test
+  public void shouldBe500WhenProjectCannotBeInitiated() throws Exception {
+    when(request.getRequestURI()).thenReturn(INIT_PROJECT_URI);
+    when(request.getHeader(ACCEPT)).thenReturn(MediaType.PLAIN_TEXT_UTF_8.toString());
+    when(projectInitializationAction.initProject(PROJECT_NAME_GIT)).thenReturn(false);
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+  }
+
+  @Test
+  public void shouldBe500WhenResourceNotFound() throws Exception {
+    when(request.getRequestURI()).thenReturn(DELETE_PROJECT_URI);
+    when(request.getMethod()).thenReturn("DELETE");
+    when(projectsCollection.parse(TopLevelResource.INSTANCE, IdString.fromDecoded(PROJECT_NAME)))
+        .thenReturn(projectResource);
+    when(projectDeletionAction.apply(any(), any()))
+        .thenThrow(new ResourceNotFoundException("resource not found"));
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    final PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+  }
+
+  @Test
+  public void shouldBe403WhenUserIsNotAuthorised() throws Exception {
+    byte[] payloadFetchAction =
+        ("{"
+                + "\"label\":\"Replication\", "
+                + "\"ref_name\": \"refs/heads/master\", "
+                + "\"async\":false"
+                + "}")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadFetchAction, FETCH_URI);
+    when(fetchAction.apply(any(), any()))
+        .thenThrow(new AuthException("The user is not authorised"));
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(HttpServletResponse.SC_FORBIDDEN);
+  }
+
+  @Test
+  public void shouldBe422WhenEntityCannotBeProcessed() throws Exception {
+    byte[] payloadFetchAction =
+        ("{"
+                + "\"label\":\"Replication\", "
+                + "\"ref_name\": \"refs/heads/master\", "
+                + "\"async\":false"
+                + "}")
+            .getBytes(StandardCharsets.UTF_8);
+
+    defineBehaviours(payloadFetchAction, FETCH_URI);
+    when(fetchAction.apply(any(), any()))
+        .thenThrow(new UnprocessableEntityException("Entity cannot be processed"));
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(SC_UNPROCESSABLE_ENTITY);
+  }
+
+  @Test
+  public void shouldBe409WhenThereIsResourceConflict() throws Exception {
+    when(request.getRequestURI()).thenReturn(DELETE_PROJECT_URI);
+    when(request.getMethod()).thenReturn("DELETE");
+    when(projectsCollection.parse(TopLevelResource.INSTANCE, IdString.fromDecoded(PROJECT_NAME)))
+        .thenReturn(projectResource);
+
+    when(projectDeletionAction.apply(any(), any()))
+        .thenThrow(new ResourceConflictException("Resource conflict"));
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(SC_CONFLICT);
+  }
+
+  @Test
+  public void shouldBe400WhenProjectNameIsNotPresentInURL() throws Exception {
+    when(request.getRequestURI())
+        .thenReturn(String.format("any-prefix/projects/%s~delete-project", PLUGIN_NAME));
+    when(request.getMethod()).thenReturn("DELETE");
+    when(response.getOutputStream()).thenReturn(outputStream);
+
+    PullReplicationFilter pullReplicationFilter = createPullReplicationFilter();
+    pullReplicationFilter.doFilter(request, response, filterChain);
+
+    verify(response).setStatus(HttpServletResponse.SC_BAD_REQUEST);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadActionIT.java
index aa07a7c..7c725b3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/UpdateHeadActionIT.java
@@ -26,6 +26,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class UpdateHeadActionIT extends ActionITBase {
@@ -39,8 +41,7 @@
         .create(source)
         .execute(
             createPutRequest(headInput("some/branch")),
-            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED),
-            getAnonymousContext());
+            assertHttpResponseCode(HttpServletResponse.SC_UNAUTHORIZED));
   }
 
   @Test
@@ -48,9 +49,8 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequest(headInput("")),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequest(headInput(""))),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
   }
 
   @Test
@@ -61,13 +61,11 @@
     BranchInput input = new BranchInput();
     input.revision = master;
     gApi.projects().name(testProjectName).branch(newBranch).create(input);
-
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequest(headInput(newBranch)),
-            assertHttpResponseCode(HttpServletResponse.SC_OK),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequest(headInput(newBranch))),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
 
     assertThat(gApi.projects().name(testProjectName).head()).isEqualTo(newBranch);
   }
@@ -78,9 +76,8 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequest(headInput("")),
-            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequest(headInput(""))),
+            assertHttpResponseCode(HttpServletResponse.SC_BAD_REQUEST));
   }
 
   @Test
@@ -92,13 +89,11 @@
     BranchInput input = new BranchInput();
     input.revision = master;
     gApi.projects().name(testProjectName).branch(newBranch).create(input);
-
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequest(headInput(newBranch)),
-            assertHttpResponseCode(HttpServletResponse.SC_OK),
-            getContext());
+            withBasicAuthenticationAsAdmin(createPutRequest(headInput(newBranch))),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
 
     assertThat(gApi.projects().name(testProjectName).head()).isEqualTo(newBranch);
   }
@@ -108,9 +103,8 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequest(headInput("some/new/head")),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+            withBasicAuthenticationAsUser(createPutRequest(headInput("some/new/head"))),
+            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
   }
 
   @Test
@@ -121,13 +115,10 @@
     BranchInput input = new BranchInput();
     input.revision = master;
     gApi.projects().name(testProjectName).branch(newBranch).create(input);
-
+    HttpRequestBase put = withBasicAuthenticationAsUser(createPutRequest(headInput(newBranch)));
     httpClientFactory
         .create(source)
-        .execute(
-            createPutRequest(headInput(newBranch)),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+        .execute(put, assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
 
     projectOperations
         .project(project)
@@ -137,10 +128,7 @@
 
     httpClientFactory
         .create(source)
-        .execute(
-            createPutRequest(headInput(newBranch)),
-            assertHttpResponseCode(HttpServletResponse.SC_OK),
-            getUserContext());
+        .execute(put, assertHttpResponseCode(HttpServletResponse.SC_OK));
   }
 
   @Test
@@ -149,9 +137,52 @@
     httpClientFactory
         .create(source)
         .execute(
-            createPutRequest(headInput("some/new/head")),
-            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN),
-            getUserContext());
+            withBasicAuthenticationAsUser(createPutRequest(headInput("some/new/head"))),
+            assertHttpResponseCode(HttpServletResponse.SC_FORBIDDEN));
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "true")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  @Ignore("Waiting for resolving: Issue 16332: Not able to update the HEAD from internal user")
+  public void shouldReturnOKWhenHeadIsUpdatedInReplicaWithBearerToken() throws Exception {
+    String testProjectName = project.get();
+    url = getURLWithoutAuthenticationPrefix(testProjectName);
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(
+                createPutRequest(headInput(newBranch)), "some-bearer-token"),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
+
+    assertThat(gApi.projects().name(testProjectName).head()).isEqualTo(newBranch);
+  }
+
+  @Test
+  @GerritConfig(name = "container.replica", value = "false")
+  @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
+  @Ignore("Waiting for resolving: Issue 16332: Not able to update the HEAD from internal user")
+  public void shouldReturnOKWhenHeadIsUpdatedInPrimaryWithBearerToken() throws Exception {
+    String testProjectName = project.get();
+    url = getURLWithoutAuthenticationPrefix(testProjectName);
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+    httpClientFactory
+        .create(source)
+        .execute(
+            withBearerTokenAuthentication(
+                createPutRequest(headInput(newBranch)), "some-bearer-token"),
+            assertHttpResponseCode(HttpServletResponse.SC_OK));
+
+    assertThat(gApi.projects().name(testProjectName).head()).isEqualTo(newBranch);
   }
 
   private String headInput(String ref) {
@@ -161,7 +192,7 @@
   }
 
   @Override
-  protected String getURL(String projectName) {
+  protected String getURLWithAuthenticationPrefix(String projectName) {
     return String.format("%s/a/projects/%s/HEAD", adminRestSession.url(), projectName);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupBackendIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupBackendIT.java
new file mode 100644
index 0000000..87738e3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/auth/PullReplicationGroupBackendIT.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.auth;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupBackend.INTERNAL_GROUP_DESCRIPTION;
+import static com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupBackend.INTERNAL_GROUP_NAME;
+import static com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupBackend.INTERNAL_GROUP_UUID;
+import static com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupBackend.NAME_PREFIX;
+
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.entities.GroupDescription;
+import com.google.gerrit.entities.GroupReference;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.account.GroupMembership;
+import com.google.gerrit.server.group.SystemGroupBackend;
+import java.util.Collection;
+import org.junit.Test;
+
+@SkipProjectClone
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupModule")
+public class PullReplicationGroupBackendIT extends LightweightPluginDaemonTest {
+
+  @Test
+  public void shouldResolvePullReplicationInternalGroup() {
+    GroupDescription.Basic group = groupBackend.get(INTERNAL_GROUP_UUID);
+
+    assertThat(group).isNotNull();
+    assertThat(group).isEqualTo(INTERNAL_GROUP_DESCRIPTION);
+  }
+
+  @Test
+  public void shouldSuggestPullReplicationInternalGroup() {
+    Collection<GroupReference> groups = groupBackend.suggest(NAME_PREFIX, null);
+
+    assertThat(groups).isNotNull();
+    assertThat(groups).hasSize(1);
+
+    GroupReference groupReference = groups.iterator().next();
+    assertThat(groupReference.getName()).isEqualTo(INTERNAL_GROUP_NAME);
+    assertThat(groupReference.getUUID()).isEqualTo(INTERNAL_GROUP_UUID);
+  }
+
+  @Test
+  public void pullReplicationInternalUserShouldHaveMembershipOfInternalGroupAndAnonymousUsers() {
+    assertMemberOfInternalAndAnonymousUsers(
+        groupBackend.membershipsOf(getPullReplicationInternalUser()));
+  }
+
+  @Test
+  public void pullReplicationInternalUserShouldHaveEffectiveGroups() {
+    assertMemberOfInternalAndAnonymousUsers(getPullReplicationInternalUser().getEffectiveGroups());
+  }
+
+  private CurrentUser getPullReplicationInternalUser() {
+    CurrentUser user = plugin.getSysInjector().getInstance(PullReplicationInternalUser.class);
+    return user;
+  }
+
+  private void assertMemberOfInternalAndAnonymousUsers(GroupMembership userMembership) {
+    assertThat(userMembership.contains(INTERNAL_GROUP_UUID)).isTrue();
+    assertThat(userMembership.contains(SystemGroupBackend.ANONYMOUS_USERS)).isTrue();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
similarity index 82%
rename from src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
index c45e7be..a2389d7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
@@ -16,9 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
-import static javax.servlet.http.HttpServletResponse.SC_CREATED;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -30,6 +28,7 @@
 import com.google.gerrit.entities.RefNames;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
@@ -39,32 +38,25 @@
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.Optional;
 import org.apache.http.Header;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
 import org.apache.http.message.BasicHeader;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.CredentialItem;
 import org.eclipse.jgit.transport.CredentialsProvider;
 import org.eclipse.jgit.transport.URIish;
 import org.eclipse.jgit.util.IO;
 import org.eclipse.jgit.util.RawParseUtils;
-import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
 
-@RunWith(MockitoJUnitRunner.class)
-public class FetchRestApiClientTest {
+public abstract class FetchRestApiClientBase {
   private static final boolean IS_REF_UPDATE = false;
 
   @Mock CredentialsProvider credentialProvider;
@@ -74,13 +66,13 @@
   @Mock FileBasedConfig config;
   @Mock ReplicationFileBasedConfig replicationConfig;
   @Mock Source source;
+  @Mock BearerTokenProvider bearerTokenProvider;
   @Captor ArgumentCaptor<HttpPost> httpPostCaptor;
   @Captor ArgumentCaptor<HttpPut> httpPutCaptor;
   @Captor ArgumentCaptor<HttpDelete> httpDeleteCaptor;
-
   String api = "http://gerrit-host";
   String pluginName = "pull-replication";
-  String label = "Replication";
+  String instanceId = "Replication";
   String refName = RefNames.REFS_HEADS + "master";
 
   String expectedPayload =
@@ -150,38 +142,9 @@
 
   FetchApiClient objectUnderTest;
 
-  @Before
-  public void setup() throws ClientProtocolException, IOException {
-    when(credentialProvider.supports(any()))
-        .thenAnswer(
-            new Answer<Boolean>() {
+  protected abstract String urlAuthenticationPrefix();
 
-              @Override
-              public Boolean answer(InvocationOnMock invocation) throws Throwable {
-                CredentialItem.Username user = (CredentialItem.Username) invocation.getArgument(0);
-                CredentialItem.Password password =
-                    (CredentialItem.Password) invocation.getArgument(1);
-                user.setValue("admin");
-                password.setValue("secret".toCharArray());
-                return true;
-              }
-            });
-
-    when(credentialProvider.get(any(), any(CredentialItem.class))).thenReturn(true);
-    when(credentials.create(anyString())).thenReturn(credentialProvider);
-    when(replicationConfig.getConfig()).thenReturn(config);
-    when(config.getStringList("replication", null, "syncRefs")).thenReturn(new String[0]);
-    when(source.getRemoteConfigName()).thenReturn("Replication");
-    when(config.getString("replication", null, "instanceLabel")).thenReturn(label);
-
-    HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
-    when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult);
-    when(httpClientFactory.create(any())).thenReturn(httpClient);
-    syncRefsFilter = new SyncRefsFilter(replicationConfig);
-    objectUnderTest =
-        new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
-  }
+  protected abstract void assertAuthentication(HttpRequestBase httpRequest);
 
   @Test
   public void shouldCallFetchEndpoint()
@@ -189,26 +152,24 @@
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host");
     assertThat(httpPost.getURI().getPath())
-        .isEqualTo("/a/projects/test_repo/pull-replication~fetch");
+        .isEqualTo(
+            String.format(
+                "%s/projects/test_repo/pull-replication~fetch", urlAuthenticationPrefix()));
+    assertAuthentication(httpPost);
   }
 
   @Test
   public void shouldByDefaultCallSyncFetchForAllRefs()
       throws ClientProtocolException, IOException, URISyntaxException {
 
-    syncRefsFilter = new SyncRefsFilter(replicationConfig);
-    objectUnderTest =
-        new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
-
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
@@ -223,11 +184,18 @@
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
         new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedAsyncPayload);
@@ -245,15 +213,22 @@
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
         new FetchRestApiClient(
-            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedAsyncPayload);
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), metaRefName, new URIish(api));
-    verify(httpClient, times(2)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(2)).execute(httpPostCaptor.capture(), any());
     httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedMetaRefPayload);
   }
@@ -264,7 +239,7 @@
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
@@ -276,7 +251,7 @@
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(httpPost.getLastHeader("Content-Type").getValue())
@@ -294,12 +269,15 @@
         createSampleRevisionData(),
         new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host");
     assertThat(httpPost.getURI().getPath())
-        .isEqualTo("/a/projects/test_repo/pull-replication~apply-object");
+        .isEqualTo(
+            String.format(
+                "%s/projects/test_repo/pull-replication~apply-object", urlAuthenticationPrefix()));
+    assertAuthentication(httpPost);
   }
 
   @Test
@@ -313,7 +291,7 @@
         createSampleRevisionData(),
         new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedSendObjectPayload);
@@ -325,7 +303,7 @@
 
     objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(httpPost.getLastHeader("Content-Type").getValue())
@@ -334,7 +312,6 @@
 
   @Test
   public void shouldThrowExceptionWhenInstanceLabelIsNull() {
-    when(config.getString("replication", null, "instanceLabel")).thenReturn(null);
     assertThrows(
         NullPointerException.class,
         () ->
@@ -344,12 +321,13 @@
                 replicationConfig,
                 syncRefsFilter,
                 pluginName,
+                null,
+                bearerTokenProvider,
                 source));
   }
 
   @Test
   public void shouldTrimInstanceLabel() {
-    when(config.getString("replication", null, "instanceLabel")).thenReturn(" ");
     assertThrows(
         NullPointerException.class,
         () ->
@@ -359,12 +337,13 @@
                 replicationConfig,
                 syncRefsFilter,
                 pluginName,
+                " ",
+                bearerTokenProvider,
                 source));
   }
 
   @Test
   public void shouldThrowExceptionWhenInstanceLabelIsEmpty() {
-    when(config.getString("replication", null, "instanceLabel")).thenReturn("");
     assertThrows(
         NullPointerException.class,
         () ->
@@ -374,20 +353,48 @@
                 replicationConfig,
                 syncRefsFilter,
                 pluginName,
+                "",
+                bearerTokenProvider,
                 source));
   }
 
   @Test
+  public void shouldUseReplicationLabelWhenProvided()
+      throws ClientProtocolException, IOException, URISyntaxException {
+    when(config.getString("replication", null, "instanceLabel")).thenReturn(instanceId);
+    FetchRestApiClient objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            "",
+            bearerTokenProvider,
+            source);
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
   public void shouldCallInitProjectEndpoint() throws IOException, URISyntaxException {
 
     objectUnderTest.initProject(Project.nameKey("test_repo"), new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPutCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPutCaptor.capture(), any());
 
     HttpPut httpPut = httpPutCaptor.getValue();
     assertThat(httpPut.getURI().getHost()).isEqualTo("gerrit-host");
     assertThat(httpPut.getURI().getPath())
-        .isEqualTo("/a/plugins/pull-replication/init-project/test_repo.git");
+        .isEqualTo(
+            String.format(
+                "%s/plugins/pull-replication/init-project/test_repo.git",
+                urlAuthenticationPrefix()));
+    assertAuthentication(httpPut);
   }
 
   @Test
@@ -395,12 +402,16 @@
 
     objectUnderTest.deleteProject(Project.nameKey("test_repo"), new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpDeleteCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpDeleteCaptor.capture(), any());
 
     HttpDelete httpDelete = httpDeleteCaptor.getValue();
     assertThat(httpDelete.getURI().getHost()).isEqualTo("gerrit-host");
     assertThat(httpDelete.getURI().getPath())
-        .isEqualTo("/a/projects/test_repo/pull-replication~delete-project");
+        .isEqualTo(
+            String.format(
+                "%s/projects/test_repo/pull-replication~delete-project",
+                urlAuthenticationPrefix()));
+    assertAuthentication(httpDelete);
   }
 
   @Test
@@ -409,7 +420,7 @@
     String projectName = "aProject";
     objectUnderTest.updateHead(Project.nameKey(projectName), newHead, new URIish(api));
 
-    verify(httpClient, times(1)).execute(httpPutCaptor.capture(), any(), any());
+    verify(httpClient, times(1)).execute(httpPutCaptor.capture(), any());
 
     HttpPut httpPut = httpPutCaptor.getValue();
     String payload =
@@ -418,8 +429,11 @@
 
     assertThat(httpPut.getURI().getHost()).isEqualTo("gerrit-host");
     assertThat(httpPut.getURI().getPath())
-        .isEqualTo(String.format("/a/projects/%s/pull-replication~HEAD", projectName));
+        .isEqualTo(
+            String.format(
+                "%s/projects/%s/pull-replication~HEAD", urlAuthenticationPrefix(), projectName));
     assertThat(payload).isEqualTo(String.format("{\"ref\": \"%s\"}", newHead));
+    assertAuthentication(httpPut);
   }
 
   public String readPayload(HttpPost entity) throws UnsupportedOperationException, IOException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientWithBasicAuthenticationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientWithBasicAuthenticationTest.java
new file mode 100644
index 0000000..644afce
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientWithBasicAuthenticationTest.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import static com.google.common.truth.Truth.assertThat;
+import static javax.servlet.http.HttpServletResponse.SC_CREATED;
+import static org.mockito.Mockito.*;
+
+import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.eclipse.jgit.transport.CredentialItem;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class FetchRestApiClientWithBasicAuthenticationTest extends FetchRestApiClientBase {
+
+  @Before
+  public void setup() throws ClientProtocolException, IOException {
+    when(bearerTokenProvider.get()).thenReturn(Optional.empty());
+    when(credentialProvider.supports(any()))
+        .thenAnswer(
+            new Answer<Boolean>() {
+
+              @Override
+              public Boolean answer(InvocationOnMock invocation) throws Throwable {
+                CredentialItem.Username user = (CredentialItem.Username) invocation.getArgument(0);
+                CredentialItem.Password password =
+                    (CredentialItem.Password) invocation.getArgument(1);
+                user.setValue("admin");
+                password.setValue("secret".toCharArray());
+                return true;
+              }
+            });
+
+    when(credentialProvider.get(any(), any(CredentialItem.class))).thenReturn(true);
+    when(credentials.create(anyString())).thenReturn(credentialProvider);
+    when(replicationConfig.getConfig()).thenReturn(config);
+    when(config.getStringList("replication", null, "syncRefs")).thenReturn(new String[0]);
+    when(source.getRemoteConfigName()).thenReturn("Replication");
+
+    HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
+    when(httpClient.execute(any(HttpRequestBase.class), any())).thenReturn(httpResult);
+    when(httpClientFactory.create(any())).thenReturn(httpClient);
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+    verify(bearerTokenProvider).get();
+  }
+
+  @Override
+  protected String urlAuthenticationPrefix() {
+    return "/a";
+  }
+
+  @Override
+  protected void assertAuthentication(HttpRequestBase httpRequest) {
+    Header[] authorizationHeaders = httpRequest.getHeaders(HttpHeaders.AUTHORIZATION);
+    assertThat(authorizationHeaders.length).isEqualTo(1);
+    assertThat(authorizationHeaders[0].getValue()).contains("Basic");
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientWithBearerTokenTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientWithBearerTokenTest.java
new file mode 100644
index 0000000..90d71ad
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientWithBearerTokenTest.java
@@ -0,0 +1,70 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import static com.google.common.truth.Truth.assertThat;
+import static javax.servlet.http.HttpServletResponse.SC_CREATED;
+import static org.mockito.Mockito.*;
+
+import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.http.Header;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class FetchRestApiClientWithBearerTokenTest extends FetchRestApiClientBase {
+
+  @Before
+  public void setup() throws ClientProtocolException, IOException {
+    when(bearerTokenProvider.get()).thenReturn(Optional.of("some-bearer-token"));
+    when(replicationConfig.getConfig()).thenReturn(config);
+    when(config.getStringList("replication", null, "syncRefs")).thenReturn(new String[0]);
+    HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
+    when(httpClient.execute(any(HttpRequestBase.class), any())).thenReturn(httpResult);
+    when(httpClientFactory.create(any())).thenReturn(httpClient);
+
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials,
+            httpClientFactory,
+            replicationConfig,
+            syncRefsFilter,
+            pluginName,
+            instanceId,
+            bearerTokenProvider,
+            source);
+    verify(bearerTokenProvider).get();
+  }
+
+  @Override
+  protected String urlAuthenticationPrefix() {
+    return "";
+  }
+
+  @Override
+  protected void assertAuthentication(HttpRequestBase httpRequest) {
+    Header[] authorizationHeaders = httpRequest.getHeaders(HttpHeaders.AUTHORIZATION);
+    assertThat(authorizationHeaders.length).isEqualTo(1);
+    assertThat(authorizationHeaders[0].getValue()).isEqualTo("Bearer " + "some-bearer-token");
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
new file mode 100644
index 0000000..c673011
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -0,0 +1,150 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
+import com.googlesource.gerrit.plugins.replication.pull.api.ProjectInitializationAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
+import java.util.concurrent.ScheduledExecutorService;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StreamEventListenerTest {
+
+  private static final String TEST_REF_NAME = "refs/changes/01/1/1";
+  private static final String TEST_PROJECT = "test-project";
+  private static final String INSTANCE_ID = "node_instance_id";
+  private static final String REMOTE_INSTANCE_ID = "remote_node_instance_id";
+
+  @Mock private ProjectInitializationAction projectInitializationAction;
+  @Mock private WorkQueue workQueue;
+  @Mock private ScheduledExecutorService executor;
+  @Mock private FetchJob fetchJob;
+  @Mock private FetchJob.Factory fetchJobFactory;
+  @Captor ArgumentCaptor<Input> inputCaptor;
+  @Mock private PullReplicationApiRequestMetrics metrics;
+
+  private StreamEventListener objectUnderTest;
+
+  @Before
+  public void setup() {
+    when(workQueue.getDefaultQueue()).thenReturn(executor);
+    when(fetchJobFactory.create(eq(Project.nameKey(TEST_PROJECT)), any(), any()))
+        .thenReturn(fetchJob);
+    objectUnderTest =
+        new StreamEventListener(
+            INSTANCE_ID, projectInitializationAction, workQueue, fetchJobFactory, () -> metrics);
+  }
+
+  @Test
+  public void shouldSkipEventsGeneratedByTheSameInstance() {
+    Event event = new RefUpdatedEvent();
+    event.instanceId = INSTANCE_ID;
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldSkipFetchForProjectDeleteEvent() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = RefNames.REFS_CONFIG;
+    refUpdate.newRev = ObjectId.zeroId().getName();
+    refUpdate.project = TEST_PROJECT;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldScheduleFetchJobForRefUpdateEvent() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = TEST_REF_NAME;
+    refUpdate.project = TEST_PROJECT;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(fetchJobFactory).create(eq(Project.nameKey(TEST_PROJECT)), inputCaptor.capture(), any());
+
+    Input input = inputCaptor.getValue();
+    assertThat(input.label).isEqualTo(REMOTE_INSTANCE_ID);
+    assertThat(input.refName).isEqualTo(TEST_REF_NAME);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
+
+  @Test
+  public void shouldCreateProjectForProjectCreatedEvent()
+      throws AuthException, PermissionBackendException {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.projectName = TEST_PROJECT;
+
+    objectUnderTest.onEvent(event);
+
+    verify(projectInitializationAction).initProject(String.format("%s.git", TEST_PROJECT));
+  }
+
+  @Test
+  public void shouldScheduleAllRefsFetchForProjectCreatedEvent() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.projectName = TEST_PROJECT;
+
+    objectUnderTest.onEvent(event);
+
+    verify(fetchJobFactory).create(eq(Project.nameKey(TEST_PROJECT)), inputCaptor.capture(), any());
+
+    Input input = inputCaptor.getValue();
+    assertThat(input.label).isEqualTo(REMOTE_INSTANCE_ID);
+    assertThat(input.refName).isEqualTo(FetchOne.ALL_REFS);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
+}
diff --git a/src/test/java/org/eclipse/jgit/transport/TransportHttpWithBearerTokenTest.java b/src/test/java/org/eclipse/jgit/transport/TransportHttpWithBearerTokenTest.java
new file mode 100644
index 0000000..5bf1047
--- /dev/null
+++ b/src/test/java/org/eclipse/jgit/transport/TransportHttpWithBearerTokenTest.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.eclipse.jgit.transport;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import java.net.URISyntaxException;
+import org.junit.Test;
+
+public class TransportHttpWithBearerTokenTest {
+
+  @Test
+  public void cannotHandleURIWhenSchemaIsNeitherHttpNorHttps() throws URISyntaxException {
+    URIish uriUnderTest = new URIish("some-uri").setScheme(null);
+    boolean result = TransportHttpWithBearerToken.canHandle(uriUnderTest);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void cannotHandleURIWhenHostIsNotPresent() throws URISyntaxException {
+    URIish uriUnderTest = new URIish("some-uri").setScheme("http").setHost(null);
+    boolean result = TransportHttpWithBearerToken.canHandle(uriUnderTest);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void cannotHandleURIWhenPathIsNotPresent() throws URISyntaxException {
+    URIish uriUnderTest =
+        new URIish("some-uri").setScheme("http").setHost("some-host").setPath(null);
+    boolean result = TransportHttpWithBearerToken.canHandle(uriUnderTest);
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void canHandleURIWhenIsWellFormed() throws URISyntaxException {
+    URIish uriUnderTest = new URIish("http://some-host/some-path");
+    boolean result = TransportHttpWithBearerToken.canHandle(uriUnderTest);
+    assertThat(result).isTrue();
+  }
+}
diff --git a/src/test/java/org/eclipse/jgit/transport/TransportProviderTest.java b/src/test/java/org/eclipse/jgit/transport/TransportProviderTest.java
new file mode 100644
index 0000000..ea70c40
--- /dev/null
+++ b/src/test/java/org/eclipse/jgit/transport/TransportProviderTest.java
@@ -0,0 +1,103 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.eclipse.jgit.transport;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.eclipse.jgit.transport.HttpConfig.EXTRA_HEADER;
+import static org.eclipse.jgit.transport.HttpConfig.HTTP;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.StoredConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransportProviderTest {
+  @Mock private SourceConfiguration sourceConfig;
+  @Mock private CredentialsFactory cpFactory;
+  @Mock private RemoteConfig remoteConfig;
+  @Mock private BearerTokenProvider bearerTokenProvider;
+  @Mock private Repository repository;
+  @Mock private StoredConfig storedConfig;
+  @Mock private org.eclipse.jgit.transport.TransferConfig transferConfig;
+
+  @Before
+  public void setup() {
+    when(sourceConfig.getRemoteConfig()).thenReturn(remoteConfig);
+    when(repository.getConfig()).thenReturn(storedConfig);
+    String[] emptyHeaders = {};
+    when(storedConfig.getStringList(HTTP, null, EXTRA_HEADER)).thenReturn(emptyHeaders);
+    when(storedConfig.get(TransferConfig.KEY)).thenReturn(transferConfig);
+  }
+
+  private void verifyConstructor() {
+    verify(sourceConfig).getRemoteConfig();
+    verify(remoteConfig).getName();
+    verify(bearerTokenProvider).get();
+  }
+
+  @Test
+  public void shouldProvideTransportHttpWithBearerToken() throws URISyntaxException, IOException {
+    when(bearerTokenProvider.get()).thenReturn(Optional.of("some-bearer-token"));
+
+    TransportProvider transportProvider =
+        new TransportProvider(sourceConfig, cpFactory, bearerTokenProvider);
+    verifyConstructor();
+
+    URIish urIish = new URIish("http://some-host/some-path");
+    Transport transport = transportProvider.open(repository, urIish);
+    assertThat(transport).isInstanceOf(TransportHttpWithBearerToken.class);
+  }
+
+  @Test
+  public void shouldProvideNativeTransportWhenNoBearerTokenProvided()
+      throws URISyntaxException, IOException {
+
+    when(bearerTokenProvider.get()).thenReturn(Optional.empty());
+
+    TransportProvider transportProvider =
+        new TransportProvider(sourceConfig, cpFactory, bearerTokenProvider);
+    verifyConstructor();
+
+    URIish urIish = new URIish("ssh://some-host/some-path");
+    Transport transport = transportProvider.open(repository, urIish);
+    assertThat(transport).isNotInstanceOf(TransportHttpWithBearerToken.class);
+  }
+
+  @Test
+  public void shouldProvideNativeTransportWhenNoHttpSchemeProvided()
+      throws URISyntaxException, IOException {
+    when(bearerTokenProvider.get()).thenReturn(Optional.of("some-bearer-token"));
+
+    TransportProvider transportProvider =
+        new TransportProvider(sourceConfig, cpFactory, bearerTokenProvider);
+    verifyConstructor();
+
+    URIish urIish = new URIish("ssh://some-host/some-path");
+    Transport transport = transportProvider.open(repository, urIish);
+    assertThat(transport).isNotInstanceOf(TransportHttpWithBearerToken.class);
+  }
+}