Replicate refs created in a BatchRefUpdate together

With a recent change it was introduced that internal ref-updated
events would contain all refs that were updated in a single command.
This was not yet used by the replication plugin to ensure that
refs that are dependent on each other are guaranteed to be replicated
in the same push.

Now, each replication task contains a set of one or more refs instead
of just one. This allows to define tasks that take care of multiple
refs that were updated by a single command.

Depends-On: https://gerrit-review.googlesource.com/c/gerrit/+/328228
Change-Id: Ic0534d03dee6759ba4cb6b19ae87fd75269b437b
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index e5f125b..b205073 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -68,7 +68,9 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -79,6 +81,7 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
@@ -383,17 +386,21 @@
     return false;
   }
 
-  void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) {
-    schedule(project, ref, uri, state, false);
+  void schedule(Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) {
+    schedule(project, refs, uri, state, false);
   }
 
   void schedule(
-      Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
-    if (!shouldReplicate(project, ref, state)) {
-      repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
-      return;
+      Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
+    Set<String> refsToSchedule = new HashSet<>();
+    for (String ref : refs) {
+      if (!shouldReplicate(project, ref, state)) {
+        repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
+        continue;
+      }
+      refsToSchedule.add(ref);
     }
-    repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri);
+    repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri);
 
     if (!config.replicatePermissions()) {
       PushOne e;
@@ -424,22 +431,25 @@
       PushOne task = getPendingPush(uri);
       if (task == null) {
         task = opFactory.create(project, uri);
-        addRef(task, ref);
-        task.addState(ref, state);
+        addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+        task.addState(refsToSchedule, state);
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
             pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
         pending.put(uri, task);
         repLog.atInfo().log(
             "scheduled %s:%s => %s to run %s",
-            project, ref, task, now ? "now" : "after " + config.getDelay() + "s");
+            project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
       } else {
-        addRef(task, ref);
-        task.addState(ref, state);
+        addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+        task.addState(refsToSchedule, state);
         repLog.atInfo().log(
-            "consolidated %s:%s => %s with an existing pending push", project, ref, task);
+            "consolidated %s:%s => %s with an existing pending push",
+            project, refsToSchedule, task);
       }
-      state.increasePushTaskCount(project.get(), ref);
+      for (String ref : refsToSchedule) {
+        state.increasePushTaskCount(project.get(), ref);
+      }
     }
   }
 
@@ -473,9 +483,9 @@
         pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
   }
 
-  private void addRef(PushOne e, String ref) {
-    e.addRef(ref);
-    postReplicationScheduledEvent(e, ref);
+  private void addRefs(PushOne e, ImmutableSet<String> refs) {
+    e.addRefBatch(refs);
+    postReplicationScheduledEvent(e, refs);
   }
 
   /**
@@ -519,7 +529,7 @@
           // second one fails, it will also be rescheduled and then,
           // here, find out replication to its URI is already pending
           // for retry (blocking).
-          pendingPushOp.addRefs(pushOp.getRefs());
+          pendingPushOp.addRefBatches(pushOp.getRefs());
           pendingPushOp.addStates(pushOp.getStates());
           pushOp.removeStates();
 
@@ -538,7 +548,7 @@
           pendingPushOp.canceledByReplication();
           pending.remove(uri);
 
-          pushOp.addRefs(pendingPushOp.getRefs());
+          pushOp.addRefBatches(pendingPushOp.getRefs());
           pushOp.addStates(pendingPushOp.getStates());
           pendingPushOp.removeStates();
         }
@@ -787,10 +797,10 @@
     postReplicationScheduledEvent(pushOp, null);
   }
 
-  private void postReplicationScheduledEvent(PushOne pushOp, String inputRef) {
-    Set<String> refs = inputRef == null ? pushOp.getRefs() : ImmutableSet.of(inputRef);
+  private void postReplicationScheduledEvent(PushOne pushOp, ImmutableSet<String> inputRefs) {
+    Set<ImmutableSet<String>> refBatches = inputRefs == null ? pushOp.getRefs() : Set.of(inputRefs);
     Project.NameKey project = pushOp.getProjectNameKey();
-    for (String ref : refs) {
+    for (String ref : flattenSetOfRefBatches(refBatches)) {
       ReplicationScheduledEvent event =
           new ReplicationScheduledEvent(project.get(), ref, pushOp.getURI());
       try {
@@ -803,7 +813,7 @@
 
   private void postReplicationFailedEvent(PushOne pushOp, RemoteRefUpdate.Status status) {
     Project.NameKey project = pushOp.getProjectNameKey();
-    for (String ref : pushOp.getRefs()) {
+    for (String ref : flattenSetOfRefBatches(pushOp.getRefs())) {
       RefReplicatedEvent event =
           new RefReplicatedEvent(project.get(), ref, pushOp.getURI(), RefPushResult.FAILED, status);
       try {
@@ -813,4 +823,8 @@
       }
     }
   }
+
+  private Set<String> flattenSetOfRefBatches(Set<ImmutableSet<String>> refBatches) {
+    return refBatches.stream().flatMap(Collection::stream).collect(Collectors.toSet());
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index 4957a64..79a0683 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -41,6 +41,7 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Predicate;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.transport.URIish;
@@ -157,11 +158,13 @@
   }
 
   @Override
-  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, Set<String> refs) {
     List<Destination> dests = new ArrayList<>();
     for (Destination dest : getAll(FilterType.ALL)) {
-      if (dest.wouldPush(uri, project, ref)) {
-        dests.add(dest);
+      for (String ref : refs) {
+        if (dest.wouldPush(uri, project, ref)) {
+          dests.add(dest);
+        }
       }
     }
     return dests;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 8afcf9b..da3089e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -22,15 +22,17 @@
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
@@ -84,7 +86,7 @@
 import org.eclipse.jgit.transport.URIish;
 
 /**
- * A push to remote operation started by {@link GitReferenceUpdatedListener}.
+ * A push to remote operation started by {@link GitBatchRefUpdateListener}.
  *
  * <p>Instance members are protected by the lock within PushQueue. Callers must take that lock to
  * ensure they are working with a current view of the object.
@@ -119,7 +121,7 @@
 
   private final Project.NameKey projectName;
   private final URIish uri;
-  private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
+  private final Set<ImmutableSet<String>> refBatchesToPush = Sets.newHashSetWithExpectedSize(4);
   private boolean pushAllRefs;
   private Repository git;
   private boolean isCollision;
@@ -240,12 +242,16 @@
    *     config.
    */
   protected String getLimitedRefs() {
-    Set<String> refs = getRefs();
+    Set<ImmutableSet<String>> refs = getRefs();
     int maxRefsToShow = replConfig.getMaxRefsToShow();
     if (maxRefsToShow == 0) {
       maxRefsToShow = refs.size();
     }
-    String refsString = refs.stream().limit(maxRefsToShow).collect(Collectors.joining(" "));
+    String refsString =
+        refs.stream()
+            .flatMap(Collection::stream)
+            .limit(maxRefsToShow)
+            .collect(Collectors.joining(" "));
     int hiddenRefs = refs.size() - maxRefsToShow;
     if (hiddenRefs > 0) {
       refsString += " (+" + hiddenRefs + ")";
@@ -281,52 +287,60 @@
   }
 
   void addRef(String ref) {
-    if (ALL_REFS.equals(ref)) {
-      delta.clear();
+    addRefBatch(ImmutableSet.of(ref));
+  }
+
+  void addRefBatch(ImmutableSet<String> refBatch) {
+    if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) {
+      refBatchesToPush.clear();
       pushAllRefs = true;
       repLog.atFinest().log("Added all refs for replication to %s", uri);
-    } else if (!pushAllRefs && delta.add(ref)) {
-      repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
+    } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
+      repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
     }
   }
 
   @Override
-  public Set<String> getRefs() {
-    return pushAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
+  public Set<ImmutableSet<String>> getRefs() {
+    return pushAllRefs ? Set.of(ImmutableSet.of(ALL_REFS)) : refBatchesToPush;
   }
 
-  void addRefs(Set<String> refs) {
+  void addRefBatches(Set<ImmutableSet<String>> refBatches) {
     if (!pushAllRefs) {
-      for (String ref : refs) {
-        addRef(ref);
+      for (ImmutableSet<String> refBatch : refBatches) {
+        addRefBatch(refBatch);
       }
     }
   }
 
-  Set<String> setStartedRefs(Set<String> startedRefs) {
-    Set<String> notAttemptedRefs = Sets.difference(delta, startedRefs);
+  Set<ImmutableSet<String>> setStartedRefs(Set<ImmutableSet<String>> startedRefs) {
+    Set<ImmutableSet<String>> notAttemptedRefs = Sets.difference(refBatchesToPush, startedRefs);
     pushAllRefs = false;
-    delta.clear();
-    addRefs(startedRefs);
+    refBatchesToPush.clear();
+    addRefBatches(startedRefs);
     return notAttemptedRefs;
   }
 
-  void notifyNotAttempted(Set<String> notAttemptedRefs) {
-    notAttemptedRefs.forEach(
-        ref ->
-            Arrays.asList(getStatesByRef(ref))
-                .forEach(
-                    state ->
-                        state.notifyRefReplicated(
-                            projectName.get(),
-                            ref,
-                            uri,
-                            RefPushResult.NOT_ATTEMPTED,
-                            RemoteRefUpdate.Status.UP_TO_DATE)));
+  void notifyNotAttempted(Set<ImmutableSet<String>> notAttemptedRefs) {
+    notAttemptedRefs.stream()
+        .flatMap(Collection::stream)
+        .forEach(
+            ref ->
+                Arrays.asList(getStatesByRef(ref))
+                    .forEach(
+                        state ->
+                            state.notifyRefReplicated(
+                                projectName.get(),
+                                ref,
+                                uri,
+                                RefPushResult.NOT_ATTEMPTED,
+                                RemoteRefUpdate.Status.UP_TO_DATE)));
   }
 
-  void addState(String ref, ReplicationState state) {
-    stateMap.put(ref, state);
+  void addState(Set<String> refs, ReplicationState state) {
+    for (String ref : refs) {
+      stateMap.put(ref, state);
+    }
   }
 
   ListMultimap<String, ReplicationState> getStates() {
@@ -645,7 +659,7 @@
         // to only the references we will update during this operation.
         //
         Map<String, Ref> n = new HashMap<>();
-        for (String src : delta) {
+        for (String src : flattenRefBatchesToPush()) {
           Ref r = local.get(src);
           if (r != null) {
             n.put(src, r);
@@ -707,7 +721,8 @@
   private List<RemoteRefUpdate> doPushDelta(Map<String, Ref> local) throws IOException {
     List<RemoteRefUpdate> cmds = new ArrayList<>();
     boolean noPerms = !pool.isReplicatePermissions();
-    for (String src : delta) {
+    Set<String> refs = flattenRefBatchesToPush();
+    for (String src : refs) {
       RefSpec spec = matchSrc(src);
       if (spec != null) {
         // If the ref still exists locally, send it, otherwise delete it.
@@ -758,7 +773,8 @@
     return null;
   }
 
-  private void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
+  @VisibleForTesting
+  void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
     String dst = spec.getDestination();
     boolean force = spec.isForceUpdate();
     cmds.add(new RemoteRefUpdate(git, src, dst, force, null, null));
@@ -856,6 +872,10 @@
     stateMap.clear();
   }
 
+  private Set<String> flattenRefBatchesToPush() {
+    return refBatchesToPush.stream().flatMap(Collection::stream).collect(Collectors.toSet());
+  }
+
   public static class UpdateRefFailureException extends TransportException {
     private static final long serialVersionUID = 1L;
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index bac599f..6ed47d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -47,7 +47,7 @@
       RemoteRefUpdate.Status refStatus) {}
 
   /**
-   * Invoked when a ref has been replicated to all nodes.
+   * Invoked when refs have been replicated to all nodes.
    *
    * @param project the project name
    * @param ref the ref name
@@ -108,7 +108,7 @@
       StringBuilder sb = new StringBuilder();
       sb.append("Replicate ");
       sb.append(project);
-      sb.append(" ref ");
+      sb.append(" refs ");
       sb.append(ref);
       sb.append(" to ");
       sb.append(resolveNodeName(uri));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index 2fa6c34..78c1a35 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -20,6 +20,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 
 /** Git destinations currently active for replication. */
@@ -45,14 +46,14 @@
   List<Destination> getAll(FilterType filterType);
 
   /**
-   * Return the active replication destinations for a uri/project/ref triplet.
+   * Return the active replication destinations for a uri/project/refs triplet.
    *
    * @param uriish uri of the destinations
    * @param project name of the project
-   * @param ref ref name
+   * @param refs ref names
    * @return the list of active destinations
    */
-  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+  List<Destination> getDestinations(URIish uriish, Project.NameKey project, Set<String> refs);
 
   /** Returns true if there are no destinations, false otherwise. */
   boolean isEmpty();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 39650cb..9f331f2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -19,7 +19,7 @@
 import com.google.common.eventbus.EventBus;
 import com.google.gerrit.extensions.annotations.Exports;
 import com.google.gerrit.extensions.config.CapabilityDefinition;
-import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
@@ -69,7 +69,7 @@
         .annotatedWith(UniqueAnnotations.create())
         .to(ReplicationQueue.class);
 
-    DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class);
+    DynamicSet.bind(binder(), GitBatchRefUpdateListener.class).to(ReplicationQueue.class);
     DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
     DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 791b387..47ed6b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,15 +17,18 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import com.google.auto.value.AutoValue;
+import com.google.auto.value.extension.memoized.Memoized;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Queues;
 import com.google.gerrit.common.UsedAt;
 import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.util.logging.NamedFluentLogger;
 import com.google.inject.Inject;
@@ -45,13 +48,14 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.eclipse.jgit.transport.URIish;
 
 /** Manages automatic replication to remote repositories. */
 public class ReplicationQueue
     implements ObservableQueue,
         LifecycleListener,
-        GitReferenceUpdatedListener,
+        GitBatchRefUpdateListener,
         ProjectDeletedListener,
         HeadUpdatedListener {
   static final String REPLICATION_LOG_NAME = "replication_log";
@@ -67,7 +71,7 @@
   private final ProjectDeletionState.Factory projectDeletionStateFactory;
   private volatile boolean running;
   private final AtomicBoolean replaying = new AtomicBoolean();
-  private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+  private final Queue<ReferencesUpdatedEvent> beforeStartupEventsQueue;
   private Distributor distributor;
 
   protected enum Prune {
@@ -128,71 +132,89 @@
 
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+    fire(
+        project,
+        urlMatch,
+        Set.of(new GitReferenceUpdated.UpdatedRef(PushOne.ALL_REFS, null, null, null)),
+        state,
+        now);
   }
 
   @Override
-  public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    fire(event.getProjectName(), event.getRefName());
+  public void onGitBatchRefUpdate(GitBatchRefUpdateListener.Event event) {
+    fire(event.getProjectName(), event.getUpdatedRefs());
   }
 
-  private void fire(String projectName, String refName) {
+  private void fire(String projectName, Set<UpdatedRef> updatedRefs) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    fire(Project.nameKey(projectName), null, refName, state, false);
+    fire(Project.nameKey(projectName), null, updatedRefs, state, false);
     state.markAllPushTasksScheduled();
   }
 
   private void fire(
       Project.NameKey project,
       String urlMatch,
-      String refName,
+      Set<UpdatedRef> updatedRefs,
       ReplicationState state,
       boolean now) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
+      beforeStartupEventsQueue.add(ReferencesUpdatedEvent.create(project.get(), updatedRefs));
       return;
     }
 
     for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
-      pushReference(cfg, project, urlMatch, refName, state, now);
+      pushReferences(
+          cfg,
+          project,
+          urlMatch,
+          updatedRefs.stream().map(UpdatedRef::getRefName).collect(Collectors.toSet()),
+          state,
+          now);
     }
   }
 
-  private void fire(URIish uri, Project.NameKey project, String refName) {
+  private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
-    for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
-      dest.schedule(project, refName, uri, state);
+    for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) {
+      dest.schedule(project, refNames, uri, state);
     }
     state.markAllPushTasksScheduled();
   }
 
   @UsedAt(UsedAt.Project.COLLABNET)
   public void pushReference(Destination cfg, Project.NameKey project, String refName) {
-    pushReference(cfg, project, null, refName, null, true);
+    pushReferences(cfg, project, null, Set.of(refName), null, true);
   }
 
-  private void pushReference(
+  private void pushReferences(
       Destination cfg,
       Project.NameKey project,
       String urlMatch,
-      String refName,
+      Set<String> refNames,
       ReplicationState state,
       boolean now) {
     boolean withoutState = state == null;
     if (withoutState) {
       state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     }
-    if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+    Set<String> refNamesToPush = new HashSet<>();
+    for (String refName : refNames) {
+      if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+        refNamesToPush.add(refName);
+      } else {
+        repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
+      }
+    }
+    if (!refNamesToPush.isEmpty()) {
       for (URIish uri : cfg.getURIs(project, urlMatch)) {
         replicationTasksStorage.create(
-            ReplicateRefUpdate.create(project.get(), refName, uri, cfg.getRemoteConfigName()));
-        cfg.schedule(project, refName, uri, state, now);
+            ReplicateRefUpdate.create(
+                project.get(), refNamesToPush, uri, cfg.getRemoteConfigName()));
+        cfg.schedule(project, refNamesToPush, uri, state, now);
       }
-    } else {
-      repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
     }
     if (withoutState) {
       state.markAllPushTasksScheduled();
@@ -215,7 +237,7 @@
             @Override
             public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
               try {
-                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
                 if (Prune.TRUE.equals(prune)) {
                   taskNamesByReplicateRefUpdate.remove(u);
                 }
@@ -237,7 +259,7 @@
 
             @Override
             public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
-              return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
+              return "Scheduling push to " + String.format("%s:%s", u.project(), u.refs());
             }
           });
     }
@@ -282,26 +304,31 @@
 
   private void fireBeforeStartupEvents() {
     Set<String> eventsReplayed = new HashSet<>();
-    for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
-      String eventKey = String.format("%s:%s", event.projectName(), event.refName());
+    for (ReferencesUpdatedEvent event : beforeStartupEventsQueue) {
+      String eventKey = String.format("%s:%s", event.projectName(), event.getRefNames());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.atInfo().log("Firing pending task %s", event);
-        fire(event.projectName(), event.refName());
+        fire(event.projectName(), event.updatedRefs());
         eventsReplayed.add(eventKey);
       }
     }
   }
 
   @AutoValue
-  abstract static class ReferenceUpdatedEvent {
+  abstract static class ReferencesUpdatedEvent {
 
-    static ReferenceUpdatedEvent create(String projectName, String refName) {
-      return new AutoValue_ReplicationQueue_ReferenceUpdatedEvent(projectName, refName);
+    static ReferencesUpdatedEvent create(String projectName, Set<UpdatedRef> updatedRefs) {
+      return new AutoValue_ReplicationQueue_ReferencesUpdatedEvent(
+          projectName, ImmutableSet.copyOf(updatedRefs));
     }
 
     public abstract String projectName();
 
-    public abstract String refName();
+    public abstract ImmutableSet<UpdatedRef> updatedRefs();
+
+    public Set<String> getRefNames() {
+      return updatedRefs().stream().map(UpdatedRef::getRefName).collect(Collectors.toSet());
+    }
   }
 
   protected class Distributor implements WorkQueue.CancelableRunnable {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 4736402..c7765a3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -18,15 +18,22 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.hash.Hashing;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
 import com.google.inject.Inject;
 import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.NotDirectoryException;
@@ -82,27 +89,42 @@
       return gson.fromJson(json, ReplicateRefUpdate.class);
     }
 
-    public static ReplicateRefUpdate create(String project, String ref, URIish uri, String remote) {
+    public static ReplicateRefUpdate create(
+        String project, Set<String> refs, URIish uri, String remote) {
       return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate(
-          project, ref, uri.toASCIIString(), remote);
+          project, ImmutableSet.copyOf(refs), uri.toASCIIString(), remote);
+    }
+
+    public static ReplicateRefUpdate create(
+        String project, ImmutableSet<String> refs, URIish uri, String remote) {
+      return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate(
+          project, refs, uri.toASCIIString(), remote);
     }
 
     public abstract String project();
 
-    public abstract String ref();
+    public abstract ImmutableSet<String> refs();
 
     public abstract String uri();
 
     public abstract String remote();
 
     public String sha1() {
-      return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote())
+      return ReplicationTasksStorage.sha1(
+              project() + "\n" + refs().toString() + "\n" + uri() + "\n" + remote())
           .name();
     }
 
     @Override
     public final String toString() {
-      return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote();
+      return "ref-update "
+          + project()
+          + ":"
+          + refs().toString()
+          + " uri:"
+          + uri()
+          + " remote:"
+          + remote();
     }
 
     public static TypeAdapter<ReplicateRefUpdate> typeAdapter(Gson gson) {
@@ -127,19 +149,21 @@
     runningUpdates = refUpdates.resolve("running");
     waitingUpdates = refUpdates.resolve("waiting");
     gson =
-        new GsonBuilder().registerTypeAdapterFactory(AutoValueTypeAdapterFactory.create()).create();
+        new GsonBuilder()
+            .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+            .create();
   }
 
   public synchronized String create(ReplicateRefUpdate r) {
     return new Task(r).create();
   }
 
-  public synchronized Set<String> start(UriUpdates uriUpdates) {
-    Set<String> startedRefs = new HashSet<>();
+  public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
+    Set<ImmutableSet<String>> startedRefs = new HashSet<>();
     for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
       Task t = new Task(update);
       if (t.start()) {
-        startedRefs.add(t.update.ref());
+        startedRefs.add(t.update.refs());
       }
     }
     return startedRefs;
@@ -206,6 +230,98 @@
     }
   }
 
+  public static final class ReplicateRefUpdateTypeAdapterFactory implements TypeAdapterFactory {
+    static class ReplicateRefUpdateTypeAdapter<T> extends TypeAdapter<ReplicateRefUpdate> {
+
+      @Override
+      public void write(JsonWriter out, ReplicateRefUpdate value) throws IOException {
+        if (value == null) {
+          out.nullValue();
+          return;
+        }
+        out.beginObject();
+
+        out.name("project");
+        out.value(value.project());
+
+        out.name("refs");
+        out.beginArray();
+        for (String ref : value.refs()) {
+          out.value(ref);
+        }
+        out.endArray();
+
+        out.name("uri");
+        out.value(value.uri());
+
+        out.name("remote");
+        out.value(value.remote());
+
+        out.endObject();
+      }
+
+      @Override
+      public ReplicateRefUpdate read(JsonReader in) throws IOException {
+        if (in.peek() == JsonToken.NULL) {
+          in.nextNull();
+          return null;
+        }
+        String project = null;
+        Set<String> refs = new HashSet<>();
+        URIish uri = null;
+        String remote = null;
+
+        String fieldname = null;
+        in.beginObject();
+
+        while (in.hasNext()) {
+          JsonToken token = in.peek();
+
+          if (token.equals(JsonToken.NAME)) {
+            fieldname = in.nextName();
+          }
+
+          if ("project".equals(fieldname)) {
+            project = in.nextString();
+          }
+
+          if ("refs".equals(fieldname)) {
+            in.beginArray();
+            while (in.hasNext()) {
+              refs.add(in.nextString());
+            }
+            in.endArray();
+          }
+
+          if ("uri".equals(fieldname)) {
+            try {
+              uri = new URIish(in.nextString());
+            } catch (URISyntaxException e) {
+              throw new IOException("Unable to parse remote URI", e);
+            }
+          }
+
+          if ("remote".equals(fieldname)) {
+            remote = in.nextString();
+          }
+        }
+
+        in.endObject();
+        return ReplicateRefUpdate.create(project, refs, uri, remote);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+      if (type.equals(TypeToken.get(AutoValue_ReplicationTasksStorage_ReplicateRefUpdate.class))
+          || type.equals(TypeToken.get(ReplicateRefUpdate.class))) {
+        return (TypeAdapter<T>) new ReplicateRefUpdateTypeAdapter<T>();
+      }
+      return null;
+    }
+  }
+
   @VisibleForTesting
   class Task {
     public final ReplicateRefUpdate update;
@@ -275,7 +391,7 @@
     }
 
     private String updateLog() {
-      return String.format("(%s:%s => %s)", update.project(), update.ref(), update.uri());
+      return String.format("(%s:%s => %s)", update.project(), update.refs(), update.uri());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
index a9985d2..77a5574 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.gerrit.entities.Project;
 import java.util.List;
 import java.util.Set;
@@ -28,14 +29,15 @@
 
   String getRemoteName();
 
-  Set<String> getRefs();
+  Set<ImmutableSet<String>> getRefs();
 
   default List<ReplicationTasksStorage.ReplicateRefUpdate> getReplicateRefUpdates() {
+    // TODO: keep batch refs together
     return getRefs().stream()
         .map(
-            (ref) ->
+            (refs) ->
                 ReplicationTasksStorage.ReplicateRefUpdate.create(
-                    getProjectNameKey().get(), ref, getURI(), getRemoteName()))
+                    getProjectNameKey().get(), refs, getURI(), getRemoteName()))
         .collect(Collectors.toList());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index cfe9002..dc29612 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,6 +17,7 @@
 import static org.eclipse.jgit.lib.Ref.Storage.NEW;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -25,6 +26,7 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.metrics.Timer1;
@@ -69,6 +71,7 @@
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -228,6 +231,55 @@
   }
 
   @Test
+  public void shouldPushMetaRefTogetherWithChangeRef() throws InterruptedException, IOException {
+    PushOne pushOne = Mockito.spy(createPushOne(null));
+
+    Ref newLocalChangeRef =
+        new ObjectIdRef.Unpeeled(
+            NEW,
+            "refs/changes/11/11111/1",
+            ObjectId.fromString("0000000000000000000000000000000000000002"));
+
+    Ref newLocalChangeMetaRef =
+        new ObjectIdRef.Unpeeled(
+            NEW,
+            "refs/changes/11/11111/meta",
+            ObjectId.fromString("0000000000000000000000000000000000000003"));
+
+    localRefs.add(newLocalChangeRef);
+    localRefs.add(newLocalChangeMetaRef);
+
+    pushOne.addRefBatch(
+        ImmutableSet.of(newLocalChangeRef.getName(), newLocalChangeMetaRef.getName()));
+    pushOne.run();
+
+    isCallFinished.await(10, TimeUnit.SECONDS);
+    verify(transportMock, atLeastOnce()).push(any(), any());
+    verify(pushOne, times(2)).push(any(), any(), any());
+  }
+
+  @Test
+  public void shouldNotAttemptDuplicateRemoteRefUpdate() throws InterruptedException, IOException {
+    PushOne pushOne = Mockito.spy(createPushOne(null));
+
+    Ref newLocalChangeRef =
+        new ObjectIdRef.Unpeeled(
+            NEW,
+            "refs/changes/11/11111/1",
+            ObjectId.fromString("0000000000000000000000000000000000000002"));
+
+    localRefs.add(newLocalChangeRef);
+
+    pushOne.addRefBatch(ImmutableSet.of(newLocalChangeRef.getName()));
+    pushOne.addRefBatch(ImmutableSet.of(newLocalChangeRef.getName()));
+    pushOne.run();
+
+    isCallFinished.await(10, TimeUnit.SECONDS);
+    verify(transportMock, times(1)).push(any(), any());
+    verify(pushOne, times(1)).push(any(), any(), any());
+  }
+
+  @Test
   public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
       throws InterruptedException, IOException {
     replicateTwoRefs(createPushOne(null));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
index 3d0dfc1..dfcf250 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.server.git.WorkQueue;
 import java.time.Duration;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -61,7 +62,7 @@
     Project.NameKey targetProject = createTestProject(project + replica);
     ReplicationTasksStorage.ReplicateRefUpdate ref =
         ReplicationTasksStorage.ReplicateRefUpdate.create(
-            project.get(), newBranch, new URIish(getProjectUri(targetProject)), remote);
+            project.get(), Set.of(newBranch), new URIish(getProjectUri(targetProject)), remote);
     createBranch(project, master, newBranch);
     setReplicationDestination(remote, replica, ALL_PROJECTS);
     reloadConfig();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 1c1f983..5f80e8c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -191,7 +191,7 @@
     Pattern refmaskPattern = Pattern.compile(refRegex);
     return tasksStorage
         .streamWaiting()
-        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+        .filter(task -> task.refs().stream().anyMatch(ref -> refmaskPattern.matcher(ref).matches()))
         .collect(toList());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index eb2b999..9285c58 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -111,6 +111,7 @@
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().refName();
+    String metaRef = sourceRef.substring(0, sourceRef.lastIndexOf('/') + 1).concat("meta");
 
     try (Repository repo = repoManager.openRepository(targetProject)) {
       waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -118,6 +119,9 @@
       Ref targetBranchRef = getRef(repo, sourceRef);
       assertThat(targetBranchRef).isNotNull();
       assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchMetaRef = getRef(repo, metaRef);
+      assertThat(targetBranchMetaRef).isNotNull();
     }
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index e0de577..ffef1bf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -19,7 +19,12 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdateTypeAdapterFactory;
 import java.net.URISyntaxException;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.eclipse.jgit.transport.URIish;
@@ -207,4 +212,60 @@
     assertEquals("not-attempted", RefPushResult.NOT_ATTEMPTED.toString());
     assertEquals("succeeded", RefPushResult.SUCCEEDED.toString());
   }
+
+  @Test
+  public void writeReplicateRefUpdateTypeAdapter() throws Exception {
+    Gson gson =
+        new GsonBuilder()
+            .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+            .create();
+    ReplicateRefUpdate update =
+        ReplicateRefUpdate.create(
+            "someProject",
+            ImmutableSet.of("ref1"),
+            new URIish("git://host1/someRepo.git"),
+            "someRemote");
+    assertEquals(
+        gson.toJson(update),
+        "{\"project\":\"someProject\",\"refs\":[\"ref1\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}");
+    ReplicateRefUpdate update2 =
+        ReplicateRefUpdate.create(
+            "someProject",
+            ImmutableSet.of("ref1", "ref2"),
+            new URIish("git://host1/someRepo.git"),
+            "someRemote");
+    assertEquals(
+        gson.toJson(update2),
+        "{\"project\":\"someProject\",\"refs\":[\"ref1\",\"ref2\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}");
+  }
+
+  @Test
+  public void ReadReplicateRefUpdateTypeAdapter() throws Exception {
+    Gson gson =
+        new GsonBuilder()
+            .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+            .create();
+    ReplicateRefUpdate update =
+        ReplicateRefUpdate.create(
+            "someProject",
+            ImmutableSet.of("ref1"),
+            new URIish("git://host1/someRepo.git"),
+            "someRemote");
+    assertEquals(
+        gson.fromJson(
+            "{\"project\":\"someProject\",\"refs\":[\"ref1\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}",
+            ReplicateRefUpdate.class),
+        update);
+    ReplicateRefUpdate update2 =
+        ReplicateRefUpdate.create(
+            "someProject",
+            ImmutableSet.of("ref1", "ref2"),
+            new URIish("git://host1/someRepo.git"),
+            "someRemote");
+    assertEquals(
+        gson.fromJson(
+            "{\"project\":\"someProject\",\"refs\":[\"ref1\",\"ref2\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}",
+            ReplicateRefUpdate.class),
+        update2);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
index f549f47..b6d14e8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -61,7 +61,7 @@
     Pattern refmaskPattern = Pattern.compile(refRegex);
     return tasksStorage
         .streamWaiting()
-        .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+        .filter(task -> refmaskPattern.matcher(task.refs().toArray()[0].toString()).matches())
         .collect(toList());
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index e2e1e21..9390798 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -34,6 +34,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 import org.eclipse.jgit.transport.URIish;
@@ -96,7 +97,7 @@
         .forEach(
             (update) -> {
               try {
-                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                UriUpdates uriUpdates = new TestUriUpdates(update);
                 tasksStorage.start(uriUpdates);
                 tasksStorage.finish(uriUpdates);
               } catch (URISyntaxException e) {
@@ -125,7 +126,7 @@
         .forEach(
             (update) -> {
               try {
-                UriUpdates uriUpdates = TestUriUpdates.create(update);
+                UriUpdates uriUpdates = new TestUriUpdates(update);
                 tasksStorage.start(uriUpdates);
                 tasksStorage.finish(uriUpdates);
               } catch (URISyntaxException e) {
@@ -211,7 +212,7 @@
         .forEach(
             (task) -> {
               assertThat(task.uri()).isEqualTo(expectedURI);
-              assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+              assertThat(task.refs()).isEqualTo(Set.of(PushOne.ALL_REFS));
             });
   }
 
@@ -236,7 +237,7 @@
         .forEach(
             (task) -> {
               assertThat(task.uri()).isEqualTo(expectedURI);
-              assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+              assertThat(task.refs()).isEqualTo(Set.of(PushOne.ALL_REFS));
             });
   }
 
@@ -351,14 +352,14 @@
       String changeRef, String remote) {
     return tasksStorage
         .streamWaiting()
-        .filter(task -> changeRef.equals(task.ref()))
+        .filter(task -> task.refs().stream().anyMatch(ref -> changeRef.equals(ref)))
         .filter(task -> remote.equals(task.remote()));
   }
 
   private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
       Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
     return updates
-        .filter(task -> changeRef.equals(task.ref()))
+        .filter(task -> task.refs().stream().anyMatch(ref -> changeRef.equals(ref)))
         .filter(task -> remote.equals(task.remote()));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
index 5cfb2d0..6de0b6b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -24,6 +24,7 @@
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
+import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.After;
 import org.junit.Before;
@@ -36,7 +37,7 @@
   protected static final URIish URISH =
       ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
   protected static final ReplicateRefUpdate REF_UPDATE =
-      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+      ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
   protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE);
 
   protected ReplicationTasksStorage nodeA;
@@ -200,7 +201,7 @@
     nodeA.create(REF_UPDATE);
     nodeB.create(REF_UPDATE);
 
-    assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref());
+    assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.refs());
     assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
 
     assertThat(nodeB.start(URI_UPDATES)).isEmpty();
@@ -209,7 +210,7 @@
 
   public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
     try {
-      return TestUriUpdates.create(refUpdate);
+      return new TestUriUpdates(refUpdate);
     } catch (URISyntaxException e) {
       throw new RuntimeException("Cannot instantiate UriUpdates object", e);
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
index 202cac9..481fa9c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -24,6 +24,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
+import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.After;
 import org.junit.Before;
@@ -36,7 +37,7 @@
   protected static final URIish URISH =
       ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
   protected static final ReplicateRefUpdate REF_UPDATE =
-      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+      ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
 
   protected FileSystem fileSystem;
   protected Path storageSite;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index a2e5e4d..568eef7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -25,6 +25,7 @@
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Files;
+import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.After;
 import org.junit.Before;
@@ -36,7 +37,7 @@
   protected static final String REMOTE = "myDest";
   protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
   protected static final ReplicateRefUpdate REF_UPDATE =
-      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+      ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
 
   protected ReplicationTasksStorage tasksStorage;
   protected FileSystem fileSystem;
@@ -200,8 +201,10 @@
 
   @Test
   public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception {
-    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
-    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
+    Task updateA =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE));
+    Task updateB =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE));
     updateA.create();
     updateB.create();
     assertIsWaiting(updateA);
@@ -210,8 +213,10 @@
 
   @Test
   public void canHaveTwoRunningTasksForDifferentRefs() throws Exception {
-    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
-    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
+    Task updateA =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE));
+    Task updateB =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE));
     updateA.create();
     updateB.create();
     updateA.start();
@@ -226,12 +231,12 @@
         tasksStorage
         .new Task(
             ReplicateRefUpdate.create(
-                "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+                "projectA", Set.of(REF), getUrish("http://example.com/projectA.git"), REMOTE));
     Task updateB =
         tasksStorage
         .new Task(
             ReplicateRefUpdate.create(
-                "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+                "projectB", Set.of(REF), getUrish("http://example.com/projectB.git"), REMOTE));
     updateA.create();
     updateB.create();
     assertIsWaiting(updateA);
@@ -244,12 +249,12 @@
         tasksStorage
         .new Task(
             ReplicateRefUpdate.create(
-                "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+                "projectA", Set.of(REF), getUrish("http://example.com/projectA.git"), REMOTE));
     Task updateB =
         tasksStorage
         .new Task(
             ReplicateRefUpdate.create(
-                "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+                "projectB", Set.of(REF), getUrish("http://example.com/projectB.git"), REMOTE));
     updateA.create();
     updateB.create();
     updateA.start();
@@ -260,8 +265,10 @@
 
   @Test
   public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception {
-    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
-    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
+    Task updateA =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteA"));
+    Task updateB =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteB"));
     updateA.create();
     updateB.create();
     assertIsWaiting(updateA);
@@ -270,8 +277,10 @@
 
   @Test
   public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception {
-    Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
-    Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
+    Task updateA =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteA"));
+    Task updateB =
+        tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteB"));
     updateA.create();
     updateB.create();
     updateA.start();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 16a0363..35254de 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -27,6 +27,7 @@
 import java.net.URISyntaxException;
 import java.nio.file.FileSystem;
 import java.nio.file.Path;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jgit.transport.URIish;
@@ -37,10 +38,13 @@
 public class ReplicationTasksStorageTest {
   protected static final String PROJECT = "myProject";
   protected static final String REF = "myRef";
+  protected static final String REF_2 = "myRef2";
   protected static final String REMOTE = "myDest";
   protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
   protected static final ReplicateRefUpdate REF_UPDATE =
-      ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+      ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
+  protected static final ReplicateRefUpdate REFS_UPDATE =
+      ReplicateRefUpdate.create(PROJECT, Set.of(REF, REF_2), URISH, REMOTE);
 
   protected ReplicationTasksStorage storage;
   protected FileSystem fileSystem;
@@ -52,7 +56,7 @@
     fileSystem = Jimfs.newFileSystem(Configuration.unix());
     storageSite = fileSystem.getPath("replication_site");
     storage = new ReplicationTasksStorage(storageSite);
-    uriUpdates = TestUriUpdates.create(REF_UPDATE);
+    uriUpdates = new TestUriUpdates(REF_UPDATE);
   }
 
   @After
@@ -84,13 +88,23 @@
   @Test
   public void canStartWaitingUpdate() throws Exception {
     storage.create(REF_UPDATE);
-    assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref());
+    assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.refs());
     assertThatStream(storage.streamWaiting()).isEmpty();
     assertFalse(storage.isWaiting(uriUpdates));
     assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
   }
 
   @Test
+  public void canStartWaitingUpdateWithMultipleRefs() throws Exception {
+    TestUriUpdates updates = new TestUriUpdates(REFS_UPDATE);
+    storage.create(REFS_UPDATE);
+    assertThat(storage.start(updates)).containsExactly(REFS_UPDATE.refs());
+    assertThatStream(storage.streamWaiting()).isEmpty();
+    assertFalse(storage.isWaiting(updates));
+    assertThatStream(storage.streamRunning()).containsExactly(REFS_UPDATE);
+  }
+
+  @Test
   public void canFinishRunningUpdate() throws Exception {
     storage.create(REF_UPDATE);
     storage.start(uriUpdates);
@@ -133,7 +147,7 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
 
@@ -141,7 +155,7 @@
     String keyB = storage.create(updateB);
     assertThatStream(storage.streamWaiting()).hasSize(2);
     assertTrue(storage.isWaiting(uriUpdates));
-    assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
+    assertTrue(storage.isWaiting(new TestUriUpdates(updateB)));
     assertNotEquals(keyA, keyB);
   }
 
@@ -150,10 +164,10 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
 
@@ -171,10 +185,10 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
     storage.start(uriUpdates);
@@ -192,7 +206,7 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
 
@@ -202,28 +216,30 @@
     storage.create(updateB);
     assertThatStream(storage.streamWaiting()).hasSize(2);
     assertTrue(storage.isWaiting(uriUpdates));
-    assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
+    assertTrue(storage.isWaiting(new TestUriUpdates(updateB)));
   }
 
   @Test
   public void canCreateMulipleRefsForSameUri() throws Exception {
-    ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
-    ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
+    ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE);
+    ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE);
 
     String keyA = storage.create(refA);
     String keyB = storage.create(refB);
     assertThatStream(storage.streamWaiting()).hasSize(2);
     assertNotEquals(keyA, keyB);
-    assertTrue(storage.isWaiting(TestUriUpdates.create(refA)));
-    assertTrue(storage.isWaiting(TestUriUpdates.create(refB)));
+    assertTrue(storage.isWaiting(new TestUriUpdates(refA)));
+    assertTrue(storage.isWaiting(new TestUriUpdates(refB)));
   }
 
   @Test
   public void canFinishMulipleRefsForSameUri() throws Exception {
-    ReplicateRefUpdate refUpdateA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
-    ReplicateRefUpdate refUpdateB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
-    UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA);
-    UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB);
+    ReplicateRefUpdate refUpdateA =
+        ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE);
+    ReplicateRefUpdate refUpdateB =
+        ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE);
+    UriUpdates uriUpdatesA = new TestUriUpdates(refUpdateA);
+    UriUpdates uriUpdatesB = new TestUriUpdates(refUpdateB);
     storage.create(refUpdateA);
     storage.create(refUpdateB);
     storage.start(uriUpdatesA);
@@ -298,10 +314,10 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
     storage.start(uriUpdates);
@@ -316,10 +332,10 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
     storage.start(uriUpdates);
@@ -358,10 +374,10 @@
     ReplicateRefUpdate updateB =
         ReplicateRefUpdate.create(
             PROJECT,
-            REF,
+            Set.of(REF),
             getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
             REMOTE);
-    UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+    UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
     storage.create(REF_UPDATE);
     storage.create(updateB);
     storage.start(uriUpdates);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index 080f279..f6eec83 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -14,39 +14,43 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
-import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableSet;
 import com.google.gerrit.entities.Project;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 
-@AutoValue
-public abstract class TestUriUpdates implements UriUpdates {
-  public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException {
-    return create(
-        Project.nameKey(update.project()),
-        new URIish(update.uri()),
-        update.remote(),
-        Collections.singleton(update.ref()));
-  }
+public class TestUriUpdates implements UriUpdates {
+  private final Project.NameKey project;
+  private final URIish uri;
+  private final String remote;
+  private final Set<ImmutableSet<String>> refs;
 
-  public static TestUriUpdates create(
-      Project.NameKey project, URIish uri, String remote, Set<String> refs) {
-    return new AutoValue_TestUriUpdates(project, uri, remote, ImmutableSet.copyOf(refs));
+  public TestUriUpdates(ReplicateRefUpdate update) throws URISyntaxException {
+    project = Project.nameKey(update.project());
+    uri = new URIish(update.uri());
+    remote = update.remote();
+    refs = Set.of(update.refs());
   }
 
   @Override
-  public abstract Project.NameKey getProjectNameKey();
+  public Project.NameKey getProjectNameKey() {
+    return project;
+  }
 
   @Override
-  public abstract URIish getURI();
+  public URIish getURI() {
+    return uri;
+  }
 
   @Override
-  public abstract String getRemoteName();
+  public String getRemoteName() {
+    return remote;
+  }
 
   @Override
-  public abstract ImmutableSet<String> getRefs();
+  public Set<ImmutableSet<String>> getRefs() {
+    return refs;
+  }
 }