Merge "Fix inconsistent logging for scheduled pushes"
diff --git a/BUILD b/BUILD
index 2a6dfd5..4c74bf2 100644
--- a/BUILD
+++ b/BUILD
@@ -33,18 +33,16 @@
     ],
 )
 
-junit_tests(
-    name = "replication_it",
-    srcs = glob([
-        "src/test/java/**/*IT.java",
-    ]),
+[junit_tests(
+    name = f[:f.index(".")].replace("/", "_"),
+    srcs = [f],
     tags = ["replication"],
     visibility = ["//visibility:public"],
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":replication__plugin",
         ":replication_util",
     ],
-)
+) for f in glob(["src/test/java/**/*IT.java"])]
 
 java_library(
     name = "replication_util",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 782ff4f..a0d9624 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -77,6 +77,11 @@
   }
 
   @Override
+  public int getMaxRefsToShow() {
+    return currentConfig.getMaxRefsToShow();
+  }
+
+  @Override
   public Path getEventsDirectory() {
     return currentConfig.getEventsDirectory();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 1febf17..6117abf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -64,6 +64,8 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -335,6 +337,9 @@
                 if (PushOne.ALL_REFS.equals(ref)) {
                   return true;
                 }
+                if (userProvider.get().isInternalUser()) {
+                  return true;
+                }
                 try {
                   permissionBackend
                       .user(userProvider.get())
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 5b24bf5..b915d0d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -137,6 +137,11 @@
   }
 
   @Override
+  public int getMaxRefsToShow() {
+    return replicationConfig.getMaxRefsToShow();
+  }
+
+  @Override
   public Path getEventsDirectory() {
     return replicationConfig.getEventsDirectory();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 565790c..ebe169c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -61,6 +61,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.errors.NoRemoteRepositoryException;
 import org.eclipse.jgit.errors.NotSupportedException;
 import org.eclipse.jgit.errors.RemoteRepositoryException;
@@ -212,7 +213,7 @@
 
   @Override
   public String toString() {
-    String print = "[" + HexFormat.fromInt(id) + "] push " + uri;
+    String print = "[" + HexFormat.fromInt(id) + "] push " + uri + " " + getLimitedRefs();
 
     if (retryCount > 0) {
       print = "(retry " + retryCount + ") " + print;
@@ -220,6 +221,36 @@
     return print;
   }
 
+  /**
+   * Returns a string of refs limited to the maxRefsToShow config with count of total refs hidden
+   * when there are more refs than maxRefsToShow config.
+   *
+   * <ul>
+   *   <li>Refs will not be limited when maxRefsToShow config is set to zero.
+   *   <li>By default output will be limited to two refs.
+   * </ul>
+   *
+   * The default value of two is chosen because whenever a new patchset is created there are two
+   * refs to be replicated(change ref and meta ref).
+   *
+   * @return Space separated string of refs (in square bracket) limited to the maxRefsToShow with
+   *     count of total refs hidden(in parentheses) when there are more refs than maxRefsToShow
+   *     config.
+   */
+  protected String getLimitedRefs() {
+    Set<String> refs = getRefs();
+    int maxRefsToShow = replConfig.getMaxRefsToShow();
+    if (maxRefsToShow == 0) {
+      maxRefsToShow = refs.size();
+    }
+    String refsString = refs.stream().limit(maxRefsToShow).collect(Collectors.joining(" "));
+    int hiddenRefs = refs.size() - maxRefsToShow;
+    if (hiddenRefs > 0) {
+      refsString += " (+" + hiddenRefs + ")";
+    }
+    return "[" + refsString + "]";
+  }
+
   boolean isRetrying() {
     return retrying;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 92ba4be..c63a3ba 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -20,6 +20,8 @@
 import com.google.gerrit.server.events.RefEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 8bbb180..ec75450 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -60,6 +60,13 @@
   int getMaxRefsToLog();
 
   /**
+   * Returns the maximum number of replicating refs to show in the show-queue output
+   *
+   * @return maximum number of refs to show, 2 by default.
+   */
+  int getMaxRefsToShow();
+
+  /**
    * Configured location where the replication events are stored on the filesystem for being resumed
    * and kept across restarts.
    *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 2631cbe..f5c6185 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -34,6 +34,7 @@
   private boolean replicateAllOnPluginStart;
   private boolean defaultForceUpdate;
   private int maxRefsToLog;
+  private final int maxRefsToShow;
   private int sshCommandTimeout;
   private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
   private final FileBasedConfig config;
@@ -54,6 +55,7 @@
     this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
     this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
     this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
+    this.maxRefsToShow = config.getInt("gerrit", "maxRefsToShow", 2);
     this.pluginDataDir = pluginDataDir;
   }
 
@@ -96,6 +98,11 @@
   }
 
   @Override
+  public int getMaxRefsToShow() {
+    return maxRefsToShow;
+  }
+
+  @Override
   public Path getEventsDirectory() {
     String eventsDirectory = config.getString("replication", null, "eventsDirectory");
     if (!Strings.isNullOrEmpty(eventsDirectory)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index c2b96a1..c3adfaf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -32,6 +32,9 @@
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.internal.UniqueAnnotations;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index df8f3f4..b948be0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -17,6 +17,7 @@
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
@@ -24,7 +25,7 @@
 
 public class ReplicationState {
 
-  private boolean allScheduled;
+  private volatile boolean allScheduled;
   private final PushResultProcessing pushResultProcessing;
 
   private final Lock countingLock = new ReentrantLock();
@@ -33,8 +34,8 @@
   private static class RefReplicationStatus {
     private final String project;
     private final String ref;
-    private int nodesToReplicateCount;
-    private int replicatedNodesCount;
+    private final AtomicInteger nodesToReplicateCount = new AtomicInteger();
+    private final AtomicInteger replicatedNodesCount = new AtomicInteger();
 
     RefReplicationStatus(String project, String ref) {
       this.project = project;
@@ -42,13 +43,13 @@
     }
 
     public boolean allDone() {
-      return replicatedNodesCount == nodesToReplicateCount;
+      return replicatedNodesCount.get() == nodesToReplicateCount.get();
     }
   }
 
   private final Table<String, String, RefReplicationStatus> statusByProjectRef;
-  private int totalPushTasksCount;
-  private int finishedPushTasksCount;
+  private final AtomicInteger totalPushTasksCount = new AtomicInteger();
+  private final AtomicInteger finishedPushTasksCount = new AtomicInteger();
 
   ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
@@ -58,15 +59,15 @@
   public void increasePushTaskCount(String project, String ref) {
     countingLock.lock();
     try {
-      getRefStatus(project, ref).nodesToReplicateCount++;
-      totalPushTasksCount++;
+      getRefStatus(project, ref).nodesToReplicateCount.getAndIncrement();
+      totalPushTasksCount.getAndIncrement();
     } finally {
       countingLock.unlock();
     }
   }
 
   public boolean hasPushTask() {
-    return totalPushTasksCount != 0;
+    return totalPushTasksCount.get() != 0;
   }
 
   public void notifyRefReplicated(
@@ -82,14 +83,14 @@
     countingLock.lock();
     try {
       RefReplicationStatus refStatus = getRefStatus(project, ref);
-      refStatus.replicatedNodesCount++;
-      finishedPushTasksCount++;
+      refStatus.replicatedNodesCount.getAndIncrement();
+      finishedPushTasksCount.getAndIncrement();
 
       if (allScheduled) {
         if (refStatus.allDone()) {
           completedRefStatus = statusByProjectRef.remove(project, ref);
         }
-        allPushTaksCompleted = finishedPushTasksCount == totalPushTasksCount;
+        allPushTaksCompleted = finishedPushTasksCount.get() == totalPushTasksCount.get();
       }
     } finally {
       countingLock.unlock();
@@ -108,7 +109,7 @@
     countingLock.lock();
     try {
       allScheduled = true;
-      if (finishedPushTasksCount < totalPushTasksCount) {
+      if (finishedPushTasksCount.get() < totalPushTasksCount.get()) {
         return;
       }
     } finally {
@@ -120,7 +121,7 @@
 
   private void doAllPushTasksCompleted() {
     fireRemainingOnRefReplicatedToAllNodes();
-    pushResultProcessing.onAllRefsReplicatedToAllNodes(totalPushTasksCount);
+    pushResultProcessing.onAllRefsReplicatedToAllNodes(totalPushTasksCount.get());
     allPushTasksFinished.countDown();
   }
 
@@ -135,7 +136,7 @@
 
   private void doRefPushTasksCompleted(RefReplicationStatus refStatus) {
     pushResultProcessing.onRefReplicatedToAllNodes(
-        refStatus.project, refStatus.ref, refStatus.nodesToReplicateCount);
+        refStatus.project, refStatus.ref, refStatus.nodesToReplicateCount.get());
   }
 
   private RefReplicationStatus getRefStatus(String project, String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
similarity index 97%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
index d1ab790..48ed8ca 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.replication;
+package com.googlesource.gerrit.plugins.replication.events;
 
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
similarity index 96%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
index e663194..75b8026 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.replication;
+package com.googlesource.gerrit.plugins.replication.events;
 
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
similarity index 95%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
index 28f6b6b..1d050b6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.replication;
+package com.googlesource.gerrit.plugins.replication.events;
 
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.events.RefEvent;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 6a2cf26..f4ea9d6 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -86,6 +86,15 @@
 :	Number of refs, that are pushed during replication, to be logged.
 	For printing all refs to the logs, use a value of 0. By default, 0.
 
+gerrit.maxRefsToShow
+:	Number of refs, that are pushed during replication, to be shown
+	in the show-queue output. To show all refs, use a value of 0.
+	By default, 2, because whenever a new patchset is created there
+	are two refs (change ref and meta ref) eg.
+
+	`(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
+
+
 gerrit.sshCommandTimeout
 :	Timeout for SSH command execution. If 0, there is no timeout and
 	the client waits indefinitely. By default, 0.
@@ -388,6 +397,12 @@
 	By default, replicates without group control, i.e. replicates
 	everything to all remotes.
 
+	*NOTE:* If an authGroup is provided, and you want a complete
+	mirror (for backup reasons or to run a Gerrit replica), at
+	least one of the provided authGroups must have "Access Database"
+	capability. Otherwise [db](../../../Documentation/note-db.html)
+	refs will not be replicated.
+
 remote.NAME.createMissingRepositories
 :	If true, a repository is automatically created on the remote site.
 	If the remote site was not available at the moment when a new
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 2ee9a39..8c2d88e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -23,6 +23,8 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.net.URISyntaxException;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
new file mode 100644
index 0000000..62d78ee
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.Sandboxed;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@Sandboxed
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationEventsIT extends ReplicationDaemon {
+  private static final Duration TEST_POST_EVENT_TIMEOUT = Duration.ofSeconds(1);
+
+  @Inject private DynamicItem<EventDispatcher> eventDispatcher;
+  private TestDispatcher testDispatcher;
+
+  @Before
+  public void setup() throws Exception {
+    initConfig();
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    setUpTestPlugin();
+    testDispatcher = new TestDispatcher();
+    eventDispatcher.set(testDispatcher, eventDispatcher.getPluginName());
+  }
+
+  @Test
+  public void replicateNewChangeSendsEvents() throws Exception {
+    Project.NameKey targetProject = createTestProject(project + "replica");
+
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    String sourceRef = pushResult.getPatchSet().refName();
+    String metaRef = pushResult.getChange().notes().getRefName();
+    BranchNameKey changeBranch = BranchNameKey.create(project, sourceRef);
+    BranchNameKey metaBranch = BranchNameKey.create(project, metaRef);
+
+    assertThat(testDispatcher.getEvents(changeBranch, ReplicationScheduledEvent.class)).hasSize(1);
+    assertThat(testDispatcher.getEvents(metaBranch, ReplicationScheduledEvent.class)).hasSize(1);
+
+    isPushCompleted(targetProject, sourceRef, TEST_PUSH_TIMEOUT);
+    isPushCompleted(targetProject, metaRef, TEST_PUSH_TIMEOUT);
+
+    waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), metaRef);
+    waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), sourceRef);
+    assertThat(testDispatcher.getEvents(RefReplicatedEvent.class).size()).isEqualTo(2);
+
+    waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), metaRef);
+    waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), sourceRef);
+    assertThat(testDispatcher.getEvents(RefReplicationDoneEvent.class).size()).isEqualTo(2);
+  }
+
+  @Test
+  public void replicateNewBranchSendsEvents() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject = createTestProject(project + "replica");
+    String newBranch = "refs/heads/mybranch";
+    BranchNameKey branchName = BranchNameKey.create(project, newBranch);
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(testDispatcher.getEvents(branchName, ReplicationScheduledEvent.class)).hasSize(1);
+
+    isPushCompleted(targetProject, newBranch, TEST_PUSH_TIMEOUT);
+
+    waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), newBranch);
+    assertThat(testDispatcher.getEvents(RefReplicatedEvent.class).size()).isEqualTo(1);
+
+    waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), newBranch);
+    assertThat(testDispatcher.getEvents(RefReplicationDoneEvent.class).size()).isEqualTo(1);
+  }
+
+  private <T extends RefEvent> void waitForRefEvent(Supplier<List<T>> events, String refName)
+      throws InterruptedException {
+    WaitUtil.waitUntil(
+        () -> events.get().stream().filter(e -> refName.equals(e.getRefName())).count() == 1,
+        TEST_POST_EVENT_TIMEOUT);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
new file mode 100644
index 0000000..bc5c35c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefEvent;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestDispatcher implements EventDispatcher {
+  private final List<RefEvent> refEvents = new LinkedList<>();
+  private final List<Event> events = new LinkedList<>();
+
+  @Override
+  public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
+
+  @Override
+  public void postEvent(BranchNameKey branchName, RefEvent event) {
+    refEvents.add(event);
+  }
+
+  @Override
+  public void postEvent(
+      Project.NameKey projectName, ProjectEvent event) {} // Not used in replication
+
+  @Override
+  public void postEvent(Event event) {
+    events.add(event);
+  }
+
+  public List<RefEvent> getEvents(BranchNameKey branch, Class<? extends RefEvent> clazz) {
+    return getEvents(branch).stream().filter(clazz::isInstance).collect(Collectors.toList());
+  }
+
+  public <T extends RefEvent> List<T> getEvents(Class<T> clazz) {
+    return events.stream().filter(clazz::isInstance).map(clazz::cast).collect(Collectors.toList());
+  }
+
+  private List<RefEvent> getEvents(BranchNameKey branch) {
+    return refEvents.stream()
+        .filter(e -> e.getBranchNameKey().equals(branch))
+        .collect(Collectors.toList());
+  }
+}