Store replication tasks instead of ref-update events

Fix the granularity of persistent events on disk by storing
replication tasks rather than ref-update events.

Ref-updates are triggering the replication tasks,
however it is tricky to perform reference counting on the associated
tasks reliably, due to all the different conditions of where a
replication task can be in the replication queue.

Fix a recurring issue on the persistence of ref-update events.
The persisted ref-update events were either removed prematurely on the
filesystem or left forever orphan even after the replication was completed
on all remotes and target URLs.
The problem was caused by the different granularity of the incoming
ref-update events and the corresponding replication tasks queued
and executed.

Do not persist ref-update events anymore but include remote and
target URLs at the replication tasks and persist those instead.

The stored objects are replication tasks and not anymore ref-update
events.

Bug: Issue 11172
Change-Id: I02d8cda0a124e8e3d2b9bb01b7d44f98ba717fcd
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 969f8e7..a84b54e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -60,6 +60,7 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -102,6 +103,7 @@
   private final PerThreadRequestScope.Scoper threadScoper;
   private final DestinationConfiguration config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final Provider<ReplicationTasksStorage> replicationTasksStorage;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -131,6 +133,7 @@
       ReplicationStateListeners stateLog,
       GroupIncludeCache groupIncludeCache,
       DynamicItem<EventDispatcher> eventDispatcher,
+      Provider<ReplicationTasksStorage> rts,
       @Assisted DestinationConfiguration cfg) {
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
@@ -138,6 +141,7 @@
     this.userProvider = userProvider;
     this.projectCache = projectCache;
     this.stateLog = stateLog;
+    this.replicationTasksStorage = rts;
     config = cfg;
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -388,21 +392,21 @@
     }
 
     synchronized (stateLock) {
-      PushOne e = getPendingPush(uri);
-      if (e == null) {
-        e = opFactory.create(project, uri);
-        addRef(e, ref);
-        e.addState(ref, state);
+      PushOne task = getPendingPush(uri);
+      if (task == null) {
+        task = opFactory.create(project, uri);
+        addRef(task, ref);
+        task.addState(ref, state);
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
-            pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
-        pending.put(uri, e);
-      } else if (!e.getRefs().contains(ref)) {
-        addRef(e, ref);
-        e.addState(ref, state);
+            pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        pending.put(uri, task);
+      } else if (!task.getRefs().contains(ref)) {
+        addRef(task, ref);
+        task.addState(ref, state);
       }
       state.increasePushTaskCount(project.get(), ref);
-      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
+      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
     }
   }
 
@@ -541,12 +545,31 @@
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne op) {
+  void notifyFinished(PushOne task) {
     synchronized (stateLock) {
-      inFlight.remove(op.getURI());
+      inFlight.remove(task.getURI());
+      if (!task.wasCanceled()) {
+        for (String ref : task.getRefs()) {
+          if (!refHasPendingPush(task.getURI(), ref)) {
+            replicationTasksStorage
+                .get()
+                .delete(
+                    new ReplicateRefUpdate(
+                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
+          }
+        }
+      }
     }
   }
 
+  private boolean refHasPendingPush(URIish opUri, String ref) {
+    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
+  }
+
+  private boolean pushContainsRef(PushOne op, String ref) {
+    return op != null && op.getRefs().contains(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index ac0262d..8b0aa3d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -21,7 +21,6 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
-import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -32,20 +31,17 @@
   private final PushAll.Factory pushAll;
   private final ReplicationConfig config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
-  private final Factory replicationStateFactory;
 
   @Inject
   protected OnStartStop(
       ServerInformation srvInfo,
       PushAll.Factory pushAll,
       ReplicationConfig config,
-      DynamicItem<EventDispatcher> eventDispatcher,
-      ReplicationState.Factory replicationStateFactory) {
+      DynamicItem<EventDispatcher> eventDispatcher) {
     this.srvInfo = srvInfo;
     this.pushAll = pushAll;
     this.config = config;
     this.eventDispatcher = eventDispatcher;
-    this.replicationStateFactory = replicationStateFactory;
     this.pushAllFuture = Atomics.newReference();
   }
 
@@ -53,8 +49,7 @@
   public void start() {
     if (srvInfo.getState() == ServerInformation.State.STARTUP
         && config.isReplicateAllOnPluginStart()) {
-      ReplicationState state =
-          replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
+      ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
       pushAllFuture.set(
           pushAll
               .create(null, ReplicationFilter.all(), state, false)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 4162973..5fdb375 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -57,7 +57,6 @@
         .to(StartReplicationCapability.class);
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
-    install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
 
     bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 60604db..a5b267b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -30,6 +30,7 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -66,8 +67,7 @@
   private final DynamicItem<EventDispatcher> dispatcher;
   private final ReplicationConfig config;
   private final DynamicItem<AdminApiFactory> adminApiFactory;
-  private final ReplicationState.Factory replicationStateFactory;
-  private final EventsStorage eventsStorage;
+  private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
 
@@ -78,15 +78,13 @@
       ReplicationConfig rc,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
-      ReplicationState.Factory rsf,
-      EventsStorage es) {
+      ReplicationTasksStorage rts) {
     workQueue = wq;
     dispatcher = dis;
     config = rc;
     stateLog = sl;
     adminApiFactory = aaf;
-    replicationStateFactory = rsf;
-    eventsStorage = es;
+    replicationTasksStorage = rts;
   }
 
   @Override
@@ -141,8 +139,7 @@
   }
 
   private void onGitReferenceUpdated(String projectName, String refName) {
-    ReplicationState state =
-        replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     if (!running) {
       stateLog.warn("Replication plugin did not finish startup before event", state);
       return;
@@ -151,9 +148,9 @@
     Project.NameKey project = new Project.NameKey(projectName);
     for (Destination cfg : config.getDestinations(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-        String eventKey = eventsStorage.persist(projectName, refName);
-        state.setEventKey(eventKey);
         for (URIish uri : cfg.getURIs(project, null)) {
+          replicationTasksStorage.persist(
+              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
           cfg.schedule(project, refName, uri, state);
         }
       }
@@ -162,11 +159,16 @@
   }
 
   private void firePendingEvents() {
-    replaying = true;
     try {
-      for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
-        repLog.info("Firing pending event {}", e);
-        onGitReferenceUpdated(e.project, e.ref);
+      Set<String> eventsReplayed = new HashSet<>();
+      replaying = true;
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+        String eventKey = String.format("%s:%s", t.project, t.ref);
+        if (!eventsReplayed.contains(eventKey)) {
+          repLog.info("Firing pending task {}", eventKey);
+          onGitReferenceUpdated(t.project, t.ref);
+          eventsReplayed.add(eventKey);
+        }
       }
     } finally {
       replaying = false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 6f0803a..df8f3f4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,8 +16,6 @@
 
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
-import com.google.inject.assistedinject.Assisted;
-import com.google.inject.assistedinject.AssistedInject;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -26,12 +24,7 @@
 
 public class ReplicationState {
 
-  public interface Factory {
-    ReplicationState create(PushResultProcessing processing);
-  }
-
   private boolean allScheduled;
-  private final EventsStorage eventsStorage;
   private final PushResultProcessing pushResultProcessing;
 
   private final Lock countingLock = new ReentrantLock();
@@ -57,11 +50,7 @@
   private int totalPushTasksCount;
   private int finishedPushTasksCount;
 
-  private String eventKey;
-
-  @AssistedInject
-  ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
-    eventsStorage = storage;
+  ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
     statusByProjectRef = HashBasedTable.create();
   }
@@ -86,7 +75,6 @@
       URIish uri,
       RefPushResult status,
       RemoteRefUpdate.Status refUpdateStatus) {
-    deleteEvent();
     pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
 
     RefReplicationStatus completedRefStatus = null;
@@ -116,12 +104,6 @@
     }
   }
 
-  private void deleteEvent() {
-    if (eventKey != null) {
-      eventsStorage.delete(eventKey);
-    }
-  }
-
   public void markAllPushTasksScheduled() {
     countingLock.lock();
     try {
@@ -192,8 +174,4 @@
       return name().toLowerCase().replace("_", "-");
     }
   }
-
-  public void setEventKey(String eventKey) {
-    this.eventKey = eventKey;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
similarity index 69%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 0efa726..a8d075d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -29,18 +29,28 @@
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.transport.URIish;
 
 @Singleton
-public class EventsStorage {
+public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   public static class ReplicateRefUpdate {
-    public String project;
-    public String ref;
+    public final String project;
+    public final String ref;
+    public final String uri;
+    public final String remote;
+
+    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+      this.project = project;
+      this.ref = ref;
+      this.uri = uri.toASCIIString();
+      this.remote = remote;
+    }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref;
+      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -49,15 +59,11 @@
   private final Path refUpdates;
 
   @Inject
-  EventsStorage(ReplicationConfig config) {
+  ReplicationTasksStorage(ReplicationConfig config) {
     refUpdates = config.getEventsDirectory().resolve("ref-updates");
   }
 
-  public String persist(String project, String ref) {
-    ReplicateRefUpdate r = new ReplicateRefUpdate();
-    r.project = project;
-    r.ref = ref;
-
+  public String persist(ReplicateRefUpdate r) {
     String json = GSON.toJson(r) + "\n";
     String eventKey = sha1(json).name();
     Path file = refUpdates().resolve(eventKey);
@@ -67,20 +73,24 @@
     }
 
     try {
+      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
       Files.write(file, json.getBytes(UTF_8));
     } catch (IOException e) {
-      logger.atWarning().log("Couldn't persist event %s", json);
+      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
     }
     return eventKey;
   }
 
-  public void delete(String eventKey) {
-    if (eventKey != null) {
-      try {
-        Files.delete(refUpdates().resolve(eventKey));
-      } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting event %s", eventKey);
-      }
+  public void delete(ReplicateRefUpdate r) {
+    String taskJson = GSON.toJson(r) + "\n";
+    String taskKey = sha1(taskJson).name();
+    Path file = refUpdates().resolve(taskKey);
+
+    try {
+      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+      Files.delete(file);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
     }
   }
 
@@ -91,7 +101,6 @@
         if (Files.isRegularFile(e)) {
           String json = new String(Files.readAllBytes(e), UTF_8);
           result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
-          Files.delete(e);
         }
       }
     } catch (IOException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 20bdf65..7115d5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -51,8 +51,6 @@
 
   @Inject private PushAll.Factory pushFactory;
 
-  @Inject private ReplicationState.Factory replicationStateFactory;
-
   private final Object lock = new Object();
 
   @Override
@@ -61,7 +59,7 @@
       throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
     }
 
-    ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
+    ReplicationState state = new ReplicationState(new CommandProcessing(this));
     Future<?> future = null;
 
     ReplicationFilter projectFilter;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 76a8480..a4f2608 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.flogger.FluentLogger;
@@ -22,14 +23,19 @@
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
+import com.google.inject.Key;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.eclipse.jgit.lib.Ref;
@@ -44,30 +50,45 @@
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
 public class ReplicationIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final int TEST_REPLICATION_DELAY = 5;
   private static final Duration TEST_TIMEMOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 10);
 
   @Inject private SitePaths sitePaths;
+  private Path pluginDataDir;
   private Path gitPath;
+  private Path storagePath;
   private FileBasedConfig config;
 
   @Override
   public void setUpTestPlugin() throws Exception {
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.save();
-
     gitPath = sitePaths.site_path.resolve("git");
 
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    config.save();
+
     super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
   }
 
   @Test
   public void shouldReplicateNewProject() throws Exception {
-    setReplicationDestination("foo", "replica");
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
 
     Project.NameKey sourceProject = createProject("foo");
+
+    String[] replicationTasks = storagePath.toFile().list();
+    assertThat(replicationTasks).hasLength(2); // refs/heads/master and /refs/meta/config
+
     waitUntil(() -> gitPath.resolve(sourceProject + "replica.git").toFile().isDirectory());
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
@@ -76,26 +97,29 @@
 
   @Test
   public void shouldReplicateNewChangeRef() throws Exception {
-    setReplicationDestination("foo", "replica");
-
     Project.NameKey targetProject = createProject("projectreplica");
 
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().getRefName();
 
-    try (Repository repo = repoManager.openRepository(targetProject)) {
-      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+    String[] replicationTasks = storagePath.toFile().list();
+    assertThat(replicationTasks).hasLength(1);
 
-      Ref targetBranchRef = getRef(repo, sourceRef);
-      assertThat(targetBranchRef).isNotNull();
-      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
-    }
+    waitUntil(() -> getRef(getRepo(targetProject), sourceRef) != null);
+
+    Ref targetBranchRef = getRef(getRepo(targetProject), sourceRef);
+    assertThat(targetBranchRef).isNotNull();
+    assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
   }
 
   @Test
   public void shouldReplicateNewBranch() throws Exception {
-    setReplicationDestination("foo", "replica");
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
 
     Project.NameKey targetProject = createProject("projectreplica");
     String newBranch = "refs/heads/mybranch";
@@ -114,8 +138,68 @@
     }
   }
 
-  private Ref getRef(Repository repo, String branchName) throws Exception {
-    return repo.getRefDatabase().exactRef(branchName);
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 = createProject("projectreplica1");
+    Project.NameKey targetProject2 = createProject("projectreplica2");
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    String[] replicationTasks = storagePath.toFile().list();
+    assertThat(replicationTasks).hasLength(2);
+
+    waitUntil(
+        () ->
+            (getRef(getRepo(targetProject1), sourceRef) != null
+                && getRef(getRepo(targetProject2), sourceRef) != null));
+
+    Ref targetBranchRef1 = getRef(getRepo(targetProject1), sourceRef);
+    assertThat(targetBranchRef1).isNotNull();
+    assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+    Ref targetBranchRef2 = getRef(getRepo(targetProject2), sourceRef);
+    assertThat(targetBranchRef2).isNotNull();
+    assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+  }
+
+  @Test
+  public void shouldCreateIndividualResplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    createProject("projectreplica1");
+    createProject("projectreplica2");
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    reloadConfig();
+
+    createChange();
+
+    assertThat(storagePath.toFile().list()).hasLength(4);
+  }
+
+  private Repository getRepo(Project.NameKey targetProject) {
+    try {
+      return repoManager.openRepository(targetProject);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Unable to open repository associated with project %s", targetProject);
+      return null;
+    }
+  }
+
+  private Ref getRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return null;
+    }
   }
 
   private Ref checkedGetRef(Repository repo, String branchName) {
@@ -127,16 +211,23 @@
     }
   }
 
-  private void setReplicationDestination(String remoteName, String replicaSuffix)
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
       throws IOException {
-    config.setString(
-        "remote",
-        remoteName,
-        "url",
-        gitPath.resolve("${name}" + replicaSuffix + ".git").toString());
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.save();
-    reloadConfig();
   }
 
   private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index cf6715e..2767f53 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,15 +32,15 @@
 
   private ReplicationState replicationState;
   private PushResultProcessing pushResultProcessingMock;
-  private EventsStorage eventsStorage;
+  private ReplicationTasksStorage eventsStorage;
 
   @Before
   public void setUp() throws Exception {
     pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
     replay(pushResultProcessingMock);
-    eventsStorage = createNiceMock(EventsStorage.class);
+    eventsStorage = createNiceMock(ReplicationTasksStorage.class);
     replay(eventsStorage);
-    replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
+    replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
   @Test