Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Rename metric tasks/max_retries to tasks/failed_max_retries
  Document metrics
  Fix completed tasks metrics
  Log fetch tasks when graceful shutdown fails
  Displaying refs for fetch tasks
  Leverage ShutdownState in SourcesCollection
  Fix pull replication queue metric prefix
  Set pull replication tests as large
  Introduce replication queue metrics
  Handle fetch tasks gracefully during shutdown

Change-Id: Ie3a5b275f5c8590ea283d124ba66064edb204624
diff --git a/.gitignore b/.gitignore
index d392f0e..a9fe73b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
 *.jar
+.idea
+*.iml
diff --git a/example-setup/broker/Dockerfile b/example-setup/broker/Dockerfile
index be54f88..08eaba9 100644
--- a/example-setup/broker/Dockerfile
+++ b/example-setup/broker/Dockerfile
@@ -1,4 +1,4 @@
-FROM gerritcodereview/gerrit:3.4.8-almalinux8
+FROM gerritcodereview/gerrit:3.5.5-almalinux8
 
 USER root
 
diff --git a/example-setup/broker/entrypoint.sh b/example-setup/broker/entrypoint.sh
index 8e4e976..33df1d4 100755
--- a/example-setup/broker/entrypoint.sh
+++ b/example-setup/broker/entrypoint.sh
@@ -13,14 +13,14 @@
   cat /var/gerrit/etc/gerrit.config.template | envsubst > /var/gerrit/etc/gerrit.config
 }
 
-setup_replication_config
-setup_gerrit_config
-
 JAVA_OPTS='--add-opens java.base/java.net=ALL-UNNAMED --add-opens java.base/java.lang.invoke=ALL-UNNAMED'
 
 echo "Init phase ..."
 java $JAVA_OPTS -jar /var/gerrit/bin/gerrit.war init --batch --install-all-plugins -d /var/gerrit
 
+setup_replication_config
+setup_gerrit_config
+
 echo "Reindexing phase ..."
 java $JAVA_OPTS -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit
 
diff --git a/example-setup/http/Dockerfile b/example-setup/http/Dockerfile
index 77cfba0..e9f8239 100644
--- a/example-setup/http/Dockerfile
+++ b/example-setup/http/Dockerfile
@@ -1,4 +1,4 @@
-FROM gerritcodereview/gerrit:3.4.8-almalinux8
+FROM gerritcodereview/gerrit:3.5.5-almalinux8
 
 USER root
 
diff --git a/example-setup/http/entrypoint.sh b/example-setup/http/entrypoint.sh
index 8e4e976..33df1d4 100755
--- a/example-setup/http/entrypoint.sh
+++ b/example-setup/http/entrypoint.sh
@@ -13,14 +13,14 @@
   cat /var/gerrit/etc/gerrit.config.template | envsubst > /var/gerrit/etc/gerrit.config
 }
 
-setup_replication_config
-setup_gerrit_config
-
 JAVA_OPTS='--add-opens java.base/java.net=ALL-UNNAMED --add-opens java.base/java.lang.invoke=ALL-UNNAMED'
 
 echo "Init phase ..."
 java $JAVA_OPTS -jar /var/gerrit/bin/gerrit.war init --batch --install-all-plugins -d /var/gerrit
 
+setup_replication_config
+setup_gerrit_config
+
 echo "Reindexing phase ..."
 java $JAVA_OPTS -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit
 
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index f1f162e..5d2ba13 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -3,6 +3,6 @@
 def external_plugin_deps():
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.4.0.4",
-        sha1 = "8d361d863382290e33828116e65698190118d0f1",
+        artifact = "com.gerritforge:events-broker:3.5.0.1",
+        sha1 = "af192a8bceaf7ff54d19356f9bfe1f1e83634b40",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
index 89a9067..4818ec7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/DeleteProjectTask.java
@@ -69,7 +69,7 @@
     } catch (URISyntaxException | IOException e) {
       String errorMessage =
           String.format("Cannot delete project %s on remote site %s.", project, uri);
-      logger.atWarning().withCause(e).log(errorMessage);
+      logger.atWarning().withCause(e).log("%s", errorMessage);
       repLog.warn(errorMessage);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
index 8bf257e..0eabf42 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
@@ -15,30 +15,23 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.events.RefEvent;
+import com.googlesource.gerrit.plugins.replication.events.RemoteRefReplicationEvent;
 import java.util.Objects;
 import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
 
-public class FetchRefReplicatedEvent extends RefEvent {
+public class FetchRefReplicatedEvent extends RemoteRefReplicationEvent {
   static final String TYPE = "fetch-ref-replicated";
 
-  final String project;
-  final String ref;
-  final String sourceNode;
-  final String status;
   final RefUpdate.Result refUpdateResult;
 
   public FetchRefReplicatedEvent(
       String project,
       String ref,
-      String sourceNode,
+      URIish sourceUri,
       ReplicationState.RefFetchResult status,
       RefUpdate.Result refUpdateResult) {
-    super(TYPE);
-    this.project = project;
-    this.ref = ref;
-    this.sourceNode = sourceNode;
-    this.status = status.toString();
+    super(TYPE, project, ref, sourceUri, status.toString());
     this.refUpdateResult = refUpdateResult;
   }
 
@@ -63,7 +56,7 @@
     if (!Objects.equals(event.ref, this.ref)) {
       return false;
     }
-    if (!Objects.equals(event.sourceNode, this.sourceNode)) {
+    if (!Objects.equals(event.targetUri, this.targetUri)) {
       return false;
     }
     if (!Objects.equals(event.status, this.status)) {
@@ -81,8 +74,4 @@
   public String getRefName() {
     return ref;
   }
-
-  public String getSourceNode() {
-    return sourceNode;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
index 9a29c86..4f96a8f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
@@ -15,20 +15,14 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.events.RefEvent;
+import com.googlesource.gerrit.plugins.replication.events.RemoteRefReplicationEvent;
+import org.eclipse.jgit.transport.URIish;
 
-public class FetchReplicationScheduledEvent extends RefEvent {
+public class FetchReplicationScheduledEvent extends RemoteRefReplicationEvent {
   static final String TYPE = "fetch-ref-replication-scheduled";
 
-  final String project;
-  final String ref;
-  final String sourceNode;
-
-  public FetchReplicationScheduledEvent(String project, String ref, String sourceNode) {
-    super(TYPE);
-    this.project = project;
-    this.ref = ref;
-    this.sourceNode = sourceNode;
+  public FetchReplicationScheduledEvent(String project, String ref, URIish sourceUri) {
+    super(TYPE, project, ref, sourceUri, null);
   }
 
   @Override
@@ -40,8 +34,4 @@
   public Project.NameKey getProjectNameKey() {
     return Project.nameKey(project);
   }
-
-  public String getSourceNode() {
-    return sourceNode;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
index ab16318..eb94ecc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
@@ -55,7 +55,7 @@
     // Default doing nothing
   }
 
-  public static String resolveNodeName(URIish uri) {
+  private static String resolveNodeName(URIish uri) {
     StringBuilder sb = new StringBuilder();
     if (uri.isRemote()) {
       sb.append(uri.getHost());
@@ -120,8 +120,7 @@
       try {
         Context.setLocalEvent(true);
         dispatcher.postEvent(
-            new FetchRefReplicatedEvent(
-                project, ref, resolveNodeName(uri), status, refUpdateResult));
+            new FetchRefReplicatedEvent(project, ref, uri, status, refUpdateResult));
       } catch (PermissionBackendException e) {
         logger.atSevere().withCause(e).log(
             "Cannot post event for ref '%s', project %s", ref, project);
@@ -189,8 +188,7 @@
         URIish uri,
         ReplicationState.RefFetchResult result,
         RefUpdate.Result refUpdateResult) {
-      postEvent(
-          new FetchRefReplicatedEvent(project, ref, resolveNodeName(uri), result, refUpdateResult));
+      postEvent(new FetchRefReplicatedEvent(project, ref, uri, result, refUpdateResult));
     }
 
     @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/GerritConfigOps.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/GerritConfigOps.java
index 2437d80..bca2219 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/GerritConfigOps.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/GerritConfigOps.java
@@ -46,7 +46,7 @@
       uri = new URIish("file://" + basePath + "/" + projectName);
       return Optional.of(uri);
     } catch (URISyntaxException e) {
-      logger.atSevere().withCause(e).log("Unsupported URI for project " + projectName);
+      logger.atSevere().withCause(e).log("Unsupported URI for project %s", projectName);
     }
 
     return Optional.empty();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 5e4314d..5ab3859 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
-import static com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.resolveNodeName;
 import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -311,8 +310,7 @@
 
     if (!drained) {
       logger.atWarning().atMostEvery(DRAINED_LOGGING_FREQUENCY_SECS, SECONDS).log(
-          String.format(
-              "Queue not drained: %d pending|%d in-flight", numberOfPending, numberOfInFlight));
+          "Queue not drained: %d pending|%d in-flight", numberOfPending, numberOfInFlight);
     } else {
       logger.atWarning().log("Queue drained");
     }
@@ -916,10 +914,9 @@
   private void postReplicationScheduledEvent(FetchOne fetchOp, String inputRef) {
     Set<String> refs = inputRef == null ? fetchOp.getRefs() : ImmutableSet.of(inputRef);
     Project.NameKey project = fetchOp.getProjectNameKey();
-    String targetNode = resolveNodeName(fetchOp.getURI());
     for (String ref : refs) {
       FetchReplicationScheduledEvent event =
-          new FetchReplicationScheduledEvent(project.get(), ref, targetNode);
+          new FetchReplicationScheduledEvent(project.get(), ref, fetchOp.getURI());
       try {
         eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
       } catch (PermissionBackendException e) {
@@ -930,13 +927,16 @@
 
   private void postReplicationFailedEvent(FetchOne fetchOp, RefUpdate.Result result) {
     Project.NameKey project = fetchOp.getProjectNameKey();
-    String sourceNode = resolveNodeName(fetchOp.getURI());
     try {
       Context.setLocalEvent(true);
       for (String ref : fetchOp.getRefs()) {
         FetchRefReplicatedEvent event =
             new FetchRefReplicatedEvent(
-                project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
+                project.get(),
+                ref,
+                fetchOp.getURI(),
+                ReplicationState.RefFetchResult.FAILED,
+                result);
         try {
           eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
         } catch (PermissionBackendException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index 3cbe306..0e840bd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -224,6 +224,11 @@
     return slowLatencyThreshold;
   }
 
+  @Override
+  public int getPushBatchSize() {
+    return 0;
+  }
+
   public int getShutDownDrainTimeout() {
     return shutDownDrainTimeout;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
index 3d8af69..8bd1d95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
@@ -27,7 +27,11 @@
 import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 
 @Singleton
@@ -35,8 +39,8 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final Source.Factory sourceFactory;
+  private volatile Map<String, Source> sources;
   private final ShutdownState shutdownState;
-  private volatile List<Source> sources;
   private final Provider<ReplicationQueue> replicationQueue;
 
   @Inject
@@ -58,22 +62,26 @@
 
   @Override
   public List<Source> getAll() {
-    return sources.stream().filter(Objects::nonNull).collect(toList());
+    return sources.values().stream().filter(Objects::nonNull).collect(toList());
   }
 
-  private List<Source> allSources(
+  public Optional<Source> getByRemoteName(String remoteName) {
+    return Optional.ofNullable(sources.get(remoteName));
+  }
+
+  private Map<String, Source> allSources(
       Source.Factory sourceFactory, List<RemoteConfiguration> sourceConfigurations) {
     return sourceConfigurations.stream()
         .filter((c) -> c instanceof SourceConfiguration)
         .map((c) -> (SourceConfiguration) c)
         .map(sourceFactory::create)
-        .collect(toList());
+        .collect(Collectors.toMap(Source::getRemoteConfigName, Function.identity()));
   }
 
   @Override
   public void startup(WorkQueue workQueue) {
     shutdownState.setIsShuttingDown(false);
-    for (Source cfg : sources) {
+    for (Source cfg : sources.values()) {
       cfg.start(workQueue);
     }
   }
@@ -96,7 +104,7 @@
     shutdownState.setIsShuttingDown(true);
 
     int discarded = 0;
-    for (Source cfg : sources) {
+    for (Source cfg : sources.values()) {
       discarded += cfg.shutdown();
     }
     return discarded;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
index b366b90..9dfe26e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/UpdateHeadTask.java
@@ -74,7 +74,7 @@
           String.format(
               "Cannot update HEAD of project %s remote site %s",
               project.get(), apiURI.toASCIIString());
-      logger.atWarning().withCause(e).log(errorMessage);
+      logger.atWarning().withCause(e).log("%s", errorMessage);
       repLog.warn(errorMessage);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
index 5f368c5..968a03c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -30,6 +30,8 @@
 import com.google.inject.name.Named;
 import com.googlesource.gerrit.plugins.replication.pull.*;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
@@ -58,6 +60,7 @@
   private final ApplyObject applyObject;
   private final ApplyObjectMetrics metrics;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final SourcesCollection sourcesCollection;
 
   @Inject
   public ApplyObjectCommand(
@@ -65,11 +68,13 @@
       ApplyObject applyObject,
       ApplyObjectMetrics metrics,
       DynamicItem<EventDispatcher> eventDispatcher,
+      SourcesCollection sourcesCollection,
       @Named(APPLY_OBJECTS_CACHE) Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache) {
     this.fetchStateLog = fetchStateLog;
     this.applyObject = applyObject;
     this.metrics = metrics;
     this.eventDispatcher = eventDispatcher;
+    this.sourcesCollection = sourcesCollection;
     this.refUpdatesSucceededCache = refUpdatesSucceededCache;
   }
 
@@ -124,16 +129,23 @@
 
     try {
       Context.setLocalEvent(true);
+      Source source =
+          sourcesCollection
+              .getByRemoteName(sourceLabel)
+              .orElseThrow(
+                  () ->
+                      new IllegalStateException(
+                          String.format("Could not find URI for %s remote", sourceLabel)));
       eventDispatcher
           .get()
           .postEvent(
               new FetchRefReplicatedEvent(
                   name.get(),
                   refName,
-                  sourceLabel,
+                  source.getURI(name),
                   getStatus(refUpdateState),
                   refUpdateState.getResult()));
-    } catch (PermissionBackendException e) {
+    } catch (PermissionBackendException | IllegalStateException e) {
       logger.atSevere().withCause(e).log(
           "Cannot post event for ref '%s', project %s", refName, name);
     } finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
index 8147149..be71946 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
@@ -44,6 +44,7 @@
 public class BearerAuthenticationFilter extends AllRequestFilter {
 
   private static final String BEARER_TOKEN = "BearerToken";
+  private static final String BEARER_TOKEN_PREFIX = "Bearer";
   private final DynamicItem<WebSession> session;
   private final String pluginName;
   private final PullReplicationInternalUser pluginUser;
@@ -79,13 +80,14 @@
     HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
     HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
     String requestURI = httpRequest.getRequestURI();
+    Optional<String> authorizationHeader =
+        Optional.ofNullable(httpRequest.getHeader("Authorization"));
 
     if (isBasicAuthenticationRequest(requestURI)) {
       filterChain.doFilter(servletRequest, servletResponse);
-    } else if (isPullReplicationApiRequest(requestURI) || isGitUploadPackRequest(httpRequest)) {
-      Optional<String> authorizationHeader =
-          Optional.ofNullable(httpRequest.getHeader("Authorization"));
-
+    } else if (isPullReplicationApiRequest(requestURI)
+        || (isGitUploadPackRequest(httpRequest)
+            && isAuthenticationHeaderWithBearerToken(authorizationHeader))) {
       if (isBearerTokenAuthenticated(authorizationHeader, bearerToken))
         try (ManualRequestContext ctx =
             new ManualRequestContext(pluginUser, threadLocalRequestContext.get())) {
@@ -137,4 +139,8 @@
     }
     return Optional.empty();
   }
+
+  private boolean isAuthenticationHeaderWithBearerToken(Optional<String> authorizationHeader) {
+    return authorizationHeader.map(h -> h.startsWith(BEARER_TOKEN_PREFIX)).orElse(false);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
index 62c7e25..f897012 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
@@ -23,6 +23,7 @@
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.permissions.PermissionBackend;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.project.ProjectState;
@@ -32,6 +33,8 @@
 import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
@@ -40,6 +43,7 @@
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
 
 public class DeleteRefCommand {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -48,19 +52,25 @@
   private final ApplyObject applyObject;
   private final DynamicItem<EventDispatcher> eventDispatcher;
   private final ProjectCache projectCache;
+  private final SourcesCollection sourcesCollection;
+  private final PermissionBackend permissionBackend;
   private final GitRepositoryManager gitManager;
 
   @Inject
   public DeleteRefCommand(
       PullReplicationStateLogger fetchStateLog,
       ProjectCache projectCache,
+      SourcesCollection sourcesCollection,
       ApplyObject applyObject,
+      PermissionBackend permissionBackend,
       DynamicItem<EventDispatcher> eventDispatcher,
       LocalGitRepositoryManagerProvider gitManagerProvider) {
     this.fetchStateLog = fetchStateLog;
     this.projectCache = projectCache;
     this.applyObject = applyObject;
     this.eventDispatcher = eventDispatcher;
+    this.sourcesCollection = sourcesCollection;
+    this.permissionBackend = permissionBackend;
     this.gitManager = gitManagerProvider.get();
   }
 
@@ -79,6 +89,15 @@
         return;
       }
 
+      Source source =
+          sourcesCollection
+              .getByRemoteName(sourceLabel)
+              .orElseThrow(
+                  () ->
+                      new IllegalStateException(
+                          String.format("Could not find URI for %s remote", sourceLabel)));
+      URIish sourceUri = source.getURI(name);
+
       try {
 
         Context.setLocalEvent(true);
@@ -90,7 +109,7 @@
                 new FetchRefReplicatedEvent(
                     name.get(),
                     refName,
-                    sourceLabel,
+                    sourceUri,
                     ReplicationState.RefFetchResult.SUCCEEDED,
                     RefUpdate.Result.FORCED));
       } catch (PermissionBackendException e) {
@@ -105,14 +124,14 @@
                 new FetchRefReplicatedEvent(
                     name.get(),
                     refName,
-                    sourceLabel,
+                    sourceUri,
                     ReplicationState.RefFetchResult.FAILED,
                     RefUpdate.Result.LOCK_FAILURE));
         String message =
             String.format(
                 "RefUpdate lock failure for: sourceLabel=%s, project=%s, refName=%s",
                 sourceLabel, name, refName);
-        logger.atSevere().withCause(e).log(message);
+        logger.atSevere().withCause(e).log("%s", message);
         fetchStateLog.error(message);
         throw e;
       } finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index 3a502ef..dd06875 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -81,8 +81,7 @@
     ReplicationState state =
         fetchReplicationStateFactory.create(
             new FetchResultProcessing.CommandProcessing(this, eventDispatcher.get()));
-    Optional<Source> source =
-        sources.getAll().stream().filter(s -> s.getRemoteConfigName().equals(label)).findFirst();
+    Optional<Source> source = sources.getByRemoteName(label);
     if (!source.isPresent()) {
       String msg = String.format("Remote configuration section %s not found", label);
       fetchStateLog.error(msg, state);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
index 030af76..e54d408 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
@@ -58,7 +58,6 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.file.InvalidPathException;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
@@ -164,7 +163,8 @@
       RestApiServlet.replyError(
           httpRequest, httpResponse, SC_BAD_REQUEST, "Project name not present in the url", e);
     } catch (Exception e) {
-      if (e instanceof InvalidPathException || e.getCause() instanceof InvalidPathException) {
+      if (e instanceof IllegalArgumentException
+          || e.getCause() instanceof IllegalArgumentException) {
         RestApiServlet.replyError(
             httpRequest, httpResponse, SC_BAD_REQUEST, "Invalid repository path in request", e);
       } else {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index c3fec5e..b606ba8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -279,7 +279,7 @@
       try {
         req.addHeader(new BasicScheme().authenticate(creds, req, null));
       } catch (AuthenticationException e) {
-        logger.atFine().log(String.format("Anonymous Basic Authentication for uri: %s", targetUri));
+        logger.atFine().log("Anonymous Basic Authentication for uri: %s", targetUri);
       }
     }
     return req;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
index 77dc947..68044b4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
@@ -35,6 +35,7 @@
   private GitUpdateProcessing gitUpdateProcessing;
   private CommandProcessing commandProcessing;
   private Command sshCommandMock;
+  private static URIish sourceUri;
 
   @Before
   public void setUp() throws Exception {
@@ -42,6 +43,7 @@
     gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
     sshCommandMock = mock(Command.class);
     commandProcessing = new CommandProcessing(sshCommandMock, dispatcherMock);
+    sourceUri = new URIish("git://someHost/someProject.git");
   }
 
   @Test
@@ -51,14 +53,14 @@
         new FetchRefReplicatedEvent(
             "someProject",
             "refs/heads/master",
-            "someHost",
+            sourceUri,
             RefFetchResult.SUCCEEDED,
             RefUpdate.Result.NEW);
 
     gitUpdateProcessing.onOneProjectReplicationDone(
         "someProject",
         "refs/heads/master",
-        new URIish("git://someHost/someProject.git"),
+        sourceUri,
         RefFetchResult.SUCCEEDED,
         RefUpdate.Result.NEW);
     verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
@@ -71,7 +73,7 @@
         new FetchRefReplicatedEvent(
             "someProject",
             "refs/heads/master",
-            "someHost",
+            sourceUri,
             RefFetchResult.SUCCEEDED,
             RefUpdate.Result.NEW);
 
@@ -90,7 +92,7 @@
         new FetchRefReplicatedEvent(
             "someProject",
             "refs/changes/01/1/1",
-            "someHost",
+            sourceUri,
             RefFetchResult.FAILED,
             RefUpdate.Result.REJECTED_OTHER_REASON);
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
index ee49aee..7f5a67c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
@@ -34,6 +34,8 @@
 import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
@@ -41,9 +43,12 @@
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Collections;
+import java.util.Optional;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -58,6 +63,7 @@
   private static final String TEST_REF_NAME = "refs/changes/01/1/1";
   private static final NameKey TEST_PROJECT_NAME = Project.nameKey("test-project");
   private static final String TEST_REMOTE_NAME = "test-remote-name";
+  private static URIish TEST_REMOTE_URI;
   private static final long TEST_EVENT_TIMESTAMP = 1L;
 
   private String sampleCommitObjectId = "9f8d52853089a3cf00c02ff7bd0817bd4353a95a";
@@ -72,22 +78,28 @@
   @Mock private DynamicItem<EventDispatcher> eventDispatcherDataItem;
   @Mock private EventDispatcher eventDispatcher;
   @Mock private Timer1.Context<String> timetContext;
+  @Mock private SourcesCollection sourceCollection;
+  @Mock private Source source;
   @Captor ArgumentCaptor<Event> eventCaptor;
   private Cache<ApplyObjectsCacheKey, Long> cache;
 
   private ApplyObjectCommand objectUnderTest;
 
   @Before
-  public void setup() throws MissingParentObjectException, IOException {
+  public void setup() throws MissingParentObjectException, IOException, URISyntaxException {
     cache = CacheBuilder.newBuilder().build();
     RefUpdateState state = new RefUpdateState(TEST_REMOTE_NAME, RefUpdate.Result.NEW);
+    TEST_REMOTE_URI = new URIish("git://some.remote.uri");
     when(eventDispatcherDataItem.get()).thenReturn(eventDispatcher);
     when(metrics.start(anyString())).thenReturn(timetContext);
     when(timetContext.stop()).thenReturn(100L);
     when(applyObject.apply(any(), any(), any())).thenReturn(state);
+    when(sourceCollection.getByRemoteName(TEST_SOURCE_LABEL)).thenReturn(Optional.of(source));
+    when(source.getURI(TEST_PROJECT_NAME)).thenReturn(TEST_REMOTE_URI);
 
     objectUnderTest =
-        new ApplyObjectCommand(fetchStateLog, applyObject, metrics, eventDispatcherDataItem, cache);
+        new ApplyObjectCommand(
+            fetchStateLog, applyObject, metrics, eventDispatcherDataItem, sourceCollection, cache);
   }
 
   @Test
@@ -109,6 +121,7 @@
     FetchRefReplicatedEvent fetchEvent = (FetchRefReplicatedEvent) sentEvent;
     assertThat(fetchEvent.getProjectNameKey()).isEqualTo(TEST_PROJECT_NAME);
     assertThat(fetchEvent.getRefName()).isEqualTo(TEST_REF_NAME);
+    assertThat(fetchEvent.targetUri).isEqualTo(TEST_REMOTE_URI.toASCIIString());
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
index 824496a..ca69f06 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
@@ -16,7 +16,6 @@
 
 import static javax.servlet.http.HttpServletResponse.SC_UNAUTHORIZED;
 import static org.mockito.Mockito.atMost;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -108,17 +107,53 @@
   }
 
   @Test
-  public void shouldAuthenticateWhenGitUploadPacket() throws ServletException, IOException {
+  public void shouldAuthenticateWhenGitUploadPack() throws ServletException, IOException {
     authenticateAndFilter("any-prefix/git-upload-pack", NO_QUERY_PARAMETERS);
   }
 
   @Test
-  public void shouldAuthenticateWhenGitUploadPacketInQueryParameter()
+  public void shouldAuthenticateWhenGitUploadPackInQueryParameter()
       throws ServletException, IOException {
     authenticateAndFilter("any-prefix", GIT_UPLOAD_PACK_QUERY_PARAMETER);
   }
 
   @Test
+  public void shouldGoNextInChainWhenGitUploadPackWithoutAuthenticationHeader()
+      throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI()).thenReturn("any-prefix/git-upload-pack");
+
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest).getHeader("Authorization");
+    verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
+  }
+
+  @Test
+  public void shouldGoNextInChainWhenGitUploadPackWithAuthenticationHeaderDifferentThanBearer()
+      throws ServletException, IOException {
+    when(httpServletRequest.getRequestURI()).thenReturn("any-prefix/git-upload-pack");
+    when(httpServletRequest.getHeader("Authorization")).thenReturn("some-authorization");
+    final BearerAuthenticationFilter filter =
+        new BearerAuthenticationFilter(
+            session,
+            pluginName,
+            pluginUser,
+            threadLocalRequestContextProvider,
+            "some-bearer-token");
+    filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
+
+    verify(httpServletRequest).getHeader("Authorization");
+    verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
+  }
+
+  @Test
   public void shouldBe401WhenBearerTokenDoesNotMatch() throws ServletException, IOException {
     when(httpServletRequest.getRequestURI()).thenReturn("any-prefix/pull-replication~fetch");
     when(httpServletRequest.getHeader("Authorization"))
@@ -171,6 +206,7 @@
     filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
 
     verify(httpServletRequest).getRequestURI();
+    verify(httpServletRequest).getHeader("Authorization");
     verify(httpServletResponse).sendError(SC_UNAUTHORIZED);
   }
 
@@ -187,7 +223,7 @@
             "some-bearer-token");
     filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
 
-    verify(httpServletRequest, times(2)).getRequestURI();
+    verify(httpServletRequest).getHeader("Authorization");
     verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
   }
 
@@ -206,7 +242,7 @@
             "some-bearer-token");
     filter.doFilter(httpServletRequest, httpServletResponse, filterChain);
 
-    verify(httpServletRequest).getRequestURI();
+    verify(httpServletRequest).getHeader("Authorization");
     verify(filterChain).doFilter(httpServletRequest, httpServletResponse);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java
index 15cfa4b..f1f9e44 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommandTest.java
@@ -27,11 +27,17 @@
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.git.LocalDiskRepositoryManager;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.ForProject;
+import com.google.gerrit.server.permissions.PermissionBackend.ForRef;
+import com.google.gerrit.server.permissions.PermissionBackend.WithUser;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.project.ProjectState;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.util.Optional;
 import org.eclipse.jgit.lib.Ref;
@@ -39,6 +45,7 @@
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.RefUpdate.Result;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -54,6 +61,7 @@
 
   private static final String NON_EXISTING_REF_NAME = "refs/changes/01/11101/1";
   private static final NameKey TEST_PROJECT_NAME = Project.nameKey("test-project");
+  private static URIish TEST_REMOTE_URI;
 
   @Mock private PullReplicationStateLogger fetchStateLog;
   @Mock private DynamicItem<EventDispatcher> eventDispatcherDataItem;
@@ -61,6 +69,12 @@
   @Mock private ProjectCache projectCache;
   @Mock private ApplyObject applyObject;
   @Mock private ProjectState projectState;
+  @Mock private SourcesCollection sourceCollection;
+  @Mock private Source source;
+  @Mock private PermissionBackend permissionBackend;
+  @Mock private WithUser currentUser;
+  @Mock private ForProject forProject;
+  @Mock private ForRef forRef;
   @Mock private LocalDiskRepositoryManager gitManager;
   @Mock private RefUpdate refUpdate;
   @Mock private Repository repository;
@@ -74,6 +88,9 @@
   public void setup() throws Exception {
     when(eventDispatcherDataItem.get()).thenReturn(eventDispatcher);
     when(projectCache.get(any())).thenReturn(Optional.of(projectState));
+    when(sourceCollection.getByRemoteName(TEST_SOURCE_LABEL)).thenReturn(Optional.of(source));
+    TEST_REMOTE_URI = new URIish("git://some.remote.uri");
+    when(source.getURI(TEST_PROJECT_NAME)).thenReturn(TEST_REMOTE_URI);
     when(gitManager.openRepository(any())).thenReturn(repository);
     when(repository.updateRef(any())).thenReturn(refUpdate);
     when(repository.getRefDatabase()).thenReturn(refDb);
@@ -84,7 +101,9 @@
         new DeleteRefCommand(
             fetchStateLog,
             projectCache,
+            sourceCollection,
             applyObject,
+            permissionBackend,
             eventDispatcherDataItem,
             new LocalGitRepositoryManagerProvider(gitManager));
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
index 9af2d10..c093719 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -25,7 +25,6 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
@@ -76,8 +75,7 @@
     label = "instance-1-label";
 
     when(fetchReplicationStateFactory.create(any())).thenReturn(state);
-    when(source.getRemoteConfigName()).thenReturn(label);
-    when(sources.getAll()).thenReturn(Lists.newArrayList(source));
+    when(sources.getByRemoteName(label)).thenReturn(Optional.of(source));
     when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any(), any()))
         .thenReturn(CompletableFuture.completedFuture(null));
     objectUnderTest =
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsSerializationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsSerializationTest.java
new file mode 100644
index 0000000..efa521c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsSerializationTest.java
@@ -0,0 +1,83 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Objects;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.FetchReplicationScheduledEvent;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+public class EventsSerializationTest {
+  private static URIish sourceUri;
+  private static final Gson eventGson = new EventGsonProvider().get();
+  private static final String TEST_PROJECT = "test_project";
+  private static final String TEST_REF = "refs/heads/master";
+
+  @Before
+  public void setUp() throws Exception {
+    sourceUri = new URIish(String.format("git://aSourceNode/%s.git", TEST_PROJECT));
+  }
+
+  @Test
+  public void shouldSerializeFetchRefReplicatedEvent() {
+    FetchRefReplicatedEvent origEvent =
+        new FetchRefReplicatedEvent(
+            TEST_PROJECT,
+            TEST_REF,
+            sourceUri,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.FAST_FORWARD);
+
+    assertThat(origEvent)
+        .isEqualTo(eventGson.fromJson(eventGson.toJson(origEvent), FetchRefReplicatedEvent.class));
+  }
+
+  @Test
+  public void shouldSerializeFetchReplicationScheduledEvent() {
+    FetchReplicationScheduledEvent origEvent =
+        new FetchReplicationScheduledEvent(TEST_PROJECT, TEST_REF, sourceUri);
+
+    assertTrue(
+        equals(
+            origEvent,
+            eventGson.fromJson(eventGson.toJson(origEvent), FetchReplicationScheduledEvent.class)));
+  }
+
+  private boolean equals(FetchReplicationScheduledEvent scheduledEvent, Object other) {
+    if (!(other instanceof FetchReplicationScheduledEvent)) {
+      return false;
+    }
+    FetchReplicationScheduledEvent event = (FetchReplicationScheduledEvent) other;
+    if (!Objects.equal(event.project, scheduledEvent.project)) {
+      return false;
+    }
+    if (!Objects.equal(event.ref, scheduledEvent.ref)) {
+      return false;
+    }
+    if (!Objects.equal(event.targetUri, scheduledEvent.targetUri)) {
+      return false;
+    }
+    return Objects.equal(event.status, scheduledEvent.status);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
index e528eca..81a4fc0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
@@ -28,17 +28,20 @@
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
 import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
 
 public class FetchRefReplicatedEventHandlerTest {
   private ChangeIndexer changeIndexerMock;
   private FetchRefReplicatedEventHandler fetchRefReplicatedEventHandler;
+  private static URIish sourceUri;
 
   @Before
   public void setUp() throws Exception {
     changeIndexerMock = mock(ChangeIndexer.class);
     fetchRefReplicatedEventHandler = new FetchRefReplicatedEventHandler(changeIndexerMock);
+    sourceUri = new URIish("git://aSourceNode/testProject.git");
   }
 
   @Test
@@ -52,7 +55,7 @@
           new FetchRefReplicatedEvent(
               projectNameKey.get(),
               ref,
-              "aSourceNode",
+              sourceUri,
               ReplicationState.RefFetchResult.SUCCEEDED,
               RefUpdate.Result.FAST_FORWARD));
       verify(changeIndexerMock, times(1)).index(eq(projectNameKey), eq(changeId));
@@ -70,7 +73,7 @@
         new FetchRefReplicatedEvent(
             projectNameKey.get(),
             ref,
-            "aSourceNode",
+            sourceUri,
             ReplicationState.RefFetchResult.SUCCEEDED,
             RefUpdate.Result.FAST_FORWARD));
     verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
@@ -85,7 +88,7 @@
         new FetchRefReplicatedEvent(
             projectNameKey.get(),
             ref,
-            "aSourceNode",
+            sourceUri,
             ReplicationState.RefFetchResult.SUCCEEDED,
             RefUpdate.Result.FAST_FORWARD));
     verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
@@ -97,7 +100,7 @@
         new FetchRefReplicatedEvent(
             Project.nameKey("testProject").get(),
             "invalidRef",
-            "aSourceNode",
+            sourceUri,
             ReplicationState.RefFetchResult.SUCCEEDED,
             RefUpdate.Result.FAST_FORWARD));
     verify(changeIndexerMock, never()).index(any(), any());
@@ -111,7 +114,7 @@
         new FetchRefReplicatedEvent(
             projectNameKey.get(),
             ref,
-            "aSourceNode",
+            sourceUri,
             ReplicationState.RefFetchResult.FAILED,
             RefUpdate.Result.FAST_FORWARD));
     verify(changeIndexerMock, never()).index(any(), any());
@@ -125,7 +128,7 @@
         new FetchRefReplicatedEvent(
             projectNameKey.get(),
             ref,
-            "aSourceNode",
+            sourceUri,
             ReplicationState.RefFetchResult.NOT_ATTEMPTED,
             RefUpdate.Result.FAST_FORWARD));
     verify(changeIndexerMock, never()).index(any(), any());