Merge branch 'stable-2.16' into stable-3.0

* stable-2.16:
  ReplicationIT: fix flakiness of shouldReplicateNewBranch
  ReplicationIT: remove dependency of ReviewDb/NoteDb
  ReplicationIT: Add a test that HEAD update is replicated
  ReplicationIT#shouldReplicateNewBranch: Create branch on local project
  ReplicationIT: Don't swallow exceptions on failed repository operations
  ReplicationIT: Fix typo in method name
  AdminApi: Reintroduce return value of methods
  Store replication tasks instead of ref-update events
  ReplicationIT: Don't swallow exceptions on failed repository operations
  ReplicationIT: Add explicit test for replication of new branch
  ReplicationIT: Change test method name to reflect actual operation
  Allow AdminApiFactory to be replaced dynamically

Adapt project creation to use ProjectOperations.

Change-Id: I3d9cd916be200f1935be2a60d9a4e715e4b859a3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index 79362fd..acbf763 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -19,7 +19,7 @@
 public interface AdminApi {
   public boolean createProject(Project.NameKey project, String head);
 
-  public void deleteProject(Project.NameKey project);
+  public boolean deleteProject(Project.NameKey project);
 
-  public void updateHead(Project.NameKey project, String newHead);
+  public boolean updateHead(Project.NameKey project, String newHead);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 0a6d9d2..2a329e2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -60,6 +60,7 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -105,6 +106,7 @@
   private final PerThreadRequestScope.Scoper threadScoper;
   private final DestinationConfiguration config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final Provider<ReplicationTasksStorage> replicationTasksStorage;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -134,6 +136,7 @@
       ReplicationStateListeners stateLog,
       GroupIncludeCache groupIncludeCache,
       DynamicItem<EventDispatcher> eventDispatcher,
+      Provider<ReplicationTasksStorage> rts,
       @Assisted DestinationConfiguration cfg) {
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
@@ -141,6 +144,7 @@
     this.userProvider = userProvider;
     this.projectCache = projectCache;
     this.stateLog = stateLog;
+    this.replicationTasksStorage = rts;
     config = cfg;
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -398,21 +402,21 @@
     }
 
     synchronized (stateLock) {
-      PushOne e = getPendingPush(uri);
-      if (e == null) {
-        e = opFactory.create(project, uri);
-        addRef(e, ref);
-        e.addState(ref, state);
+      PushOne task = getPendingPush(uri);
+      if (task == null) {
+        task = opFactory.create(project, uri);
+        addRef(task, ref);
+        task.addState(ref, state);
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
-            pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
-        pending.put(uri, e);
-      } else if (!e.getRefs().contains(ref)) {
-        addRef(e, ref);
-        e.addState(ref, state);
+            pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        pending.put(uri, task);
+      } else if (!task.getRefs().contains(ref)) {
+        addRef(task, ref);
+        task.addState(ref, state);
       }
       state.increasePushTaskCount(project.get(), ref);
-      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
+      repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
     }
   }
 
@@ -563,12 +567,31 @@
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne op) {
+  void notifyFinished(PushOne task) {
     synchronized (stateLock) {
-      inFlight.remove(op.getURI());
+      inFlight.remove(task.getURI());
+      if (!task.wasCanceled()) {
+        for (String ref : task.getRefs()) {
+          if (!refHasPendingPush(task.getURI(), ref)) {
+            replicationTasksStorage
+                .get()
+                .delete(
+                    new ReplicateRefUpdate(
+                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
+          }
+        }
+      }
     }
   }
 
+  private boolean refHasPendingPush(URIish opUri, String ref) {
+    return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
+  }
+
+  private boolean pushContainsRef(PushOne op, String ref) {
+    return op != null && op.getRefs().contains(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index 55fbed1..aaf2b15 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -75,18 +75,20 @@
   }
 
   @Override
-  public void deleteProject(Project.NameKey project) {
+  public boolean deleteProject(Project.NameKey project) {
     repLog.info("Deleting project {} on {}", project, uri);
     String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
     try {
       httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext());
+      return true;
     } catch (IOException e) {
       repLog.error("Couldn't perform project deletion on {}", uri, e);
     }
+    return false;
   }
 
   @Override
-  public void updateHead(Project.NameKey project, String newHead) {
+  public boolean updateHead(Project.NameKey project, String newHead) {
     repLog.info("Updating head of {} on {}", project, uri);
     String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get()));
     try {
@@ -95,9 +97,11 @@
           new StringEntity(String.format("{\"ref\": \"%s\"}", newHead), Charsets.UTF_8.name()));
       req.addHeader(new BasicHeader("Content-Type", "application/json"));
       httpClient.execute(req, new HttpResponseHandler(), getContext());
+      return true;
     } catch (IOException e) {
       repLog.error("Couldn't perform update head on {}", uri, e);
     }
+    return false;
   }
 
   private HttpClientContext getContext() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 9a92d4e..56cff5a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -54,7 +54,7 @@
   }
 
   @Override
-  public void deleteProject(Project.NameKey projectName) {
+  public boolean deleteProject(Project.NameKey projectName) {
     if (!withoutDeleteProjectPlugin.contains(uri)) {
       OutputStream errStream = sshHelper.newErrorBufferStream();
       String cmd = "deleteproject delete --yes-really-delete --force " + projectName.get();
@@ -63,6 +63,7 @@
         exitCode = execute(uri, cmd, errStream);
       } catch (IOException e) {
         logError("deleting", uri, errStream, cmd, e);
+        return false;
       }
       if (exitCode == 1) {
         logger.atInfo().log(
@@ -71,17 +72,20 @@
         withoutDeleteProjectPlugin.add(uri);
       }
     }
+    return true;
   }
 
   @Override
-  public void updateHead(Project.NameKey projectName, String newHead) {
+  public boolean updateHead(Project.NameKey projectName, String newHead) {
     OutputStream errStream = sshHelper.newErrorBufferStream();
     String cmd = "gerrit set-head " + projectName.get() + " --new-head " + newHead;
     try {
       execute(uri, cmd, errStream);
     } catch (IOException e) {
       logError("updating HEAD of", uri, errStream, cmd, e);
+      return false;
     }
+    return true;
   }
 
   private URIish toSshUri(URIish uri) throws URISyntaxException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index 908e7e0..aa6e16c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -52,17 +52,19 @@
   }
 
   @Override
-  public void deleteProject(Project.NameKey project) {
+  public boolean deleteProject(Project.NameKey project) {
     try {
       recursivelyDelete(new File(uri.getPath()));
       repLog.info("Deleted local repository: {}", uri);
     } catch (IOException e) {
       repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+      return false;
     }
+    return true;
   }
 
   @Override
-  public void updateHead(Project.NameKey project, String newHead) {
+  public boolean updateHead(Project.NameKey project, String newHead) {
     try (Repository repo = new FileRepository(uri.getPath())) {
       if (newHead != null) {
         RefUpdate u = repo.updateRef(Constants.HEAD);
@@ -70,7 +72,9 @@
       }
     } catch (IOException e) {
       repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+      return false;
     }
+    return true;
   }
 
   private static void recursivelyDelete(File dir) throws IOException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index ac0262d..8b0aa3d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -21,7 +21,6 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
-import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -32,20 +31,17 @@
   private final PushAll.Factory pushAll;
   private final ReplicationConfig config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
-  private final Factory replicationStateFactory;
 
   @Inject
   protected OnStartStop(
       ServerInformation srvInfo,
       PushAll.Factory pushAll,
       ReplicationConfig config,
-      DynamicItem<EventDispatcher> eventDispatcher,
-      ReplicationState.Factory replicationStateFactory) {
+      DynamicItem<EventDispatcher> eventDispatcher) {
     this.srvInfo = srvInfo;
     this.pushAll = pushAll;
     this.config = config;
     this.eventDispatcher = eventDispatcher;
-    this.replicationStateFactory = replicationStateFactory;
     this.pushAllFuture = Atomics.newReference();
   }
 
@@ -53,8 +49,7 @@
   public void start() {
     if (srvInfo.getState() == ServerInformation.State.STARTUP
         && config.isReplicateAllOnPluginStart()) {
-      ReplicationState state =
-          replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
+      ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
       pushAllFuture.set(
           pushAll
               .create(null, ReplicationFilter.all(), state, false)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index 6dfa117..ee9d4c0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -60,7 +60,7 @@
   }
 
   @Override
-  public void deleteProject(Project.NameKey project) {
+  public boolean deleteProject(Project.NameKey project) {
     String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
     String cmd = "rm -rf " + quotedPath;
     OutputStream errStream = sshHelper.newErrorBufferStream();
@@ -78,11 +78,13 @@
           cmd,
           errStream,
           e);
+      return false;
     }
+    return true;
   }
 
   @Override
-  public void updateHead(Project.NameKey project, String newHead) {
+  public boolean updateHead(Project.NameKey project, String newHead) {
     String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
     String cmd =
         "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
@@ -101,6 +103,8 @@
           cmd,
           errStream,
           e);
+      return false;
     }
+    return true;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index f9a2b2c..835d068 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -56,7 +56,6 @@
         .to(StartReplicationCapability.class);
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
-    install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
 
     bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 6229686..63ae355 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -25,7 +25,10 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,8 +47,7 @@
   private final WorkQueue workQueue;
   private final DynamicItem<EventDispatcher> dispatcher;
   private final ReplicationConfig config;
-  private final ReplicationState.Factory replicationStateFactory;
-  private final EventsStorage eventsStorage;
+  private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
 
@@ -55,14 +57,12 @@
       ReplicationConfig rc,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
-      ReplicationState.Factory rsf,
-      EventsStorage es) {
+      ReplicationTasksStorage rts) {
     workQueue = wq;
     dispatcher = dis;
     config = rc;
     stateLog = sl;
-    replicationStateFactory = rsf;
-    eventsStorage = es;
+    replicationTasksStorage = rts;
   }
 
   @Override
@@ -117,8 +117,7 @@
   }
 
   private void onGitReferenceUpdated(String projectName, String refName) {
-    ReplicationState state =
-        replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     if (!running) {
       stateLog.warn("Replication plugin did not finish startup before event", state);
       return;
@@ -127,9 +126,9 @@
     Project.NameKey project = new Project.NameKey(projectName);
     for (Destination cfg : config.getDestinations(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-        String eventKey = eventsStorage.persist(projectName, refName);
-        state.setEventKey(eventKey);
         for (URIish uri : cfg.getURIs(project, null)) {
+          replicationTasksStorage.persist(
+              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
           cfg.schedule(project, refName, uri, state);
         }
       }
@@ -140,9 +139,15 @@
   private void firePendingEvents() {
     replaying = true;
     try {
-      for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
-        repLog.info("Firing pending event {}", e);
-        onGitReferenceUpdated(e.project, e.ref);
+      Set<String> eventsReplayed = new HashSet<>();
+      replaying = true;
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+        String eventKey = String.format("%s:%s", t.project, t.ref);
+        if (!eventsReplayed.contains(eventKey)) {
+          repLog.info("Firing pending task {}", eventKey);
+          onGitReferenceUpdated(t.project, t.ref);
+          eventsReplayed.add(eventKey);
+        }
       }
     } finally {
       replaying = false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 6f0803a..df8f3f4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,8 +16,6 @@
 
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
-import com.google.inject.assistedinject.Assisted;
-import com.google.inject.assistedinject.AssistedInject;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -26,12 +24,7 @@
 
 public class ReplicationState {
 
-  public interface Factory {
-    ReplicationState create(PushResultProcessing processing);
-  }
-
   private boolean allScheduled;
-  private final EventsStorage eventsStorage;
   private final PushResultProcessing pushResultProcessing;
 
   private final Lock countingLock = new ReentrantLock();
@@ -57,11 +50,7 @@
   private int totalPushTasksCount;
   private int finishedPushTasksCount;
 
-  private String eventKey;
-
-  @AssistedInject
-  ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
-    eventsStorage = storage;
+  ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
     statusByProjectRef = HashBasedTable.create();
   }
@@ -86,7 +75,6 @@
       URIish uri,
       RefPushResult status,
       RemoteRefUpdate.Status refUpdateStatus) {
-    deleteEvent();
     pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
 
     RefReplicationStatus completedRefStatus = null;
@@ -116,12 +104,6 @@
     }
   }
 
-  private void deleteEvent() {
-    if (eventKey != null) {
-      eventsStorage.delete(eventKey);
-    }
-  }
-
   public void markAllPushTasksScheduled() {
     countingLock.lock();
     try {
@@ -192,8 +174,4 @@
       return name().toLowerCase().replace("_", "-");
     }
   }
-
-  public void setEventKey(String eventKey) {
-    this.eventKey = eventKey;
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
similarity index 69%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 0efa726..a8d075d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -29,18 +29,28 @@
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.transport.URIish;
 
 @Singleton
-public class EventsStorage {
+public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   public static class ReplicateRefUpdate {
-    public String project;
-    public String ref;
+    public final String project;
+    public final String ref;
+    public final String uri;
+    public final String remote;
+
+    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+      this.project = project;
+      this.ref = ref;
+      this.uri = uri.toASCIIString();
+      this.remote = remote;
+    }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref;
+      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -49,15 +59,11 @@
   private final Path refUpdates;
 
   @Inject
-  EventsStorage(ReplicationConfig config) {
+  ReplicationTasksStorage(ReplicationConfig config) {
     refUpdates = config.getEventsDirectory().resolve("ref-updates");
   }
 
-  public String persist(String project, String ref) {
-    ReplicateRefUpdate r = new ReplicateRefUpdate();
-    r.project = project;
-    r.ref = ref;
-
+  public String persist(ReplicateRefUpdate r) {
     String json = GSON.toJson(r) + "\n";
     String eventKey = sha1(json).name();
     Path file = refUpdates().resolve(eventKey);
@@ -67,20 +73,24 @@
     }
 
     try {
+      logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
       Files.write(file, json.getBytes(UTF_8));
     } catch (IOException e) {
-      logger.atWarning().log("Couldn't persist event %s", json);
+      logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
     }
     return eventKey;
   }
 
-  public void delete(String eventKey) {
-    if (eventKey != null) {
-      try {
-        Files.delete(refUpdates().resolve(eventKey));
-      } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting event %s", eventKey);
-      }
+  public void delete(ReplicateRefUpdate r) {
+    String taskJson = GSON.toJson(r) + "\n";
+    String taskKey = sha1(taskJson).name();
+    Path file = refUpdates().resolve(taskKey);
+
+    try {
+      logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+      Files.delete(file);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
     }
   }
 
@@ -91,7 +101,6 @@
         if (Files.isRegularFile(e)) {
           String json = new String(Files.readAllBytes(e), UTF_8);
           result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
-          Files.delete(e);
         }
       }
     } catch (IOException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 20bdf65..7115d5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -51,8 +51,6 @@
 
   @Inject private PushAll.Factory pushFactory;
 
-  @Inject private ReplicationState.Factory replicationStateFactory;
-
   private final Object lock = new Object();
 
   @Override
@@ -61,7 +59,7 @@
       throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
     }
 
-    ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
+    ReplicationState state = new ReplicationState(new CommandProcessing(this));
     Future<?> future = null;
 
     ReplicationFilter projectFilter;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index fe800b0..7517c14 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -15,22 +15,39 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Stopwatch;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.PushOneCommit.Result;
 import com.google.gerrit.acceptance.TestPlugin;
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
 import com.google.gerrit.extensions.common.ProjectInfo;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
+import com.google.gson.Gson;
 import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
@@ -43,30 +60,46 @@
     name = "replication",
     sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
 public class ReplicationIT extends LightweightPluginDaemonTest {
-  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 5;
   private static final Duration TEST_TIMEMOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 10);
 
   @Inject private SitePaths sitePaths;
   @Inject private ProjectOperations projectOperations;
+  private Path pluginDataDir;
   private Path gitPath;
+  private Path storagePath;
   private FileBasedConfig config;
+  private Gson GSON = new Gson();
 
   @Override
   public void setUpTestPlugin() throws Exception {
-    config =
-        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
-    config.save();
-
     gitPath = sitePaths.site_path.resolve("git");
 
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setReplicationDestination(
+        "remote1",
+        "suffix1",
+        Optional.of("not-used-project")); // Simulates a full replication.config initialization
+    config.save();
+
     super.setUpTestPlugin();
+
+    pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+    storagePath = pluginDataDir.resolve("ref-updates");
   }
 
   @Test
   public void shouldReplicateNewProject() throws Exception {
-    setReplicationDestination("foo", "replica");
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+    waitForEmptyTasks();
 
     Project.NameKey sourceProject = projectOperations.newProject().name("foo").create();
+
+    assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
     waitUntil(() -> gitPath.resolve(sourceProject + "replica.git").toFile().isDirectory());
 
     ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
@@ -74,51 +107,159 @@
   }
 
   @Test
-  public void shouldReplicateNewBranch() throws Exception {
-    setReplicationDestination("foo", "replica");
-
+  public void shouldReplicateNewChangeRef() throws Exception {
     Project.NameKey targetProject =
         projectOperations.newProject().name(project + "replica").create();
 
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+    waitForEmptyTasks();
+
     Result pushResult = createChange();
     RevCommit sourceCommit = pushResult.getCommit();
     String sourceRef = pushResult.getPatchSet().getRefName();
 
-    waitUntil(() -> getRef(getRepo(targetProject), sourceRef) != null);
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
 
-    Ref targetBranchRef = getRef(getRepo(targetProject), sourceRef);
-    assertThat(targetBranchRef).isNotNull();
-    assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
-  }
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
 
-  private Repository getRepo(Project.NameKey targetProject) {
-    try {
-      return repoManager.openRepository(targetProject);
-    } catch (Exception e) {
-      e.printStackTrace();
-      return null;
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
     }
   }
 
-  private Ref getRef(Repository repo, String branchName) {
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+    waitForEmptyTasks();
+
+    Project.NameKey targetProject =
+        projectOperations.newProject().name(project + "replica").create();
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+    assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+    try (Repository repo = repoManager.openRepository(targetProject);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref masterRef = getRef(sourceRepo, master);
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+    Project.NameKey targetProject1 =
+        projectOperations.newProject().name(project + "replica1").create();
+    Project.NameKey targetProject2 =
+        projectOperations.newProject().name(project + "replica2").create();
+
+    setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+    setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+    reloadConfig();
+    waitForEmptyTasks();
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+    try (Repository repo1 = repoManager.openRepository(targetProject1);
+        Repository repo2 = repoManager.openRepository(targetProject2)) {
+      waitUntil(
+          () ->
+              (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+      Ref targetBranchRef1 = getRef(repo1, sourceRef);
+      assertThat(targetBranchRef1).isNotNull();
+      assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+      Ref targetBranchRef2 = getRef(repo2, sourceRef);
+      assertThat(targetBranchRef2).isNotNull();
+      assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+    List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+    projectOperations.newProject().name(project + "replica1").create();
+    projectOperations.newProject().name(project + "replica2").create();
+
+    setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+    setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+    reloadConfig();
+    waitForEmptyTasks();
+
+    createChange();
+
+    assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+  }
+
+  @Test
+  public void shouldReplicateHeadUpdate() throws Exception {
+    setReplicationDestination("foo", "replica", ALL_PROJECTS);
+    reloadConfig();
+
+    Project.NameKey targetProject =
+        projectOperations.newProject().name(project + "replica").create();
+    String newHead = "refs/heads/newhead";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(project.get()).branch(newHead).create(input);
+    gApi.projects().name(project.get()).head(newHead);
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, newHead) != null);
+
+      Ref targetProjectHead = getRef(repo, Constants.HEAD);
+      assertThat(targetProjectHead).isNotNull();
+      assertThat(targetProjectHead.getTarget().getName()).isEqualTo(newHead);
+    }
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
     try {
       return repo.getRefDatabase().exactRef(branchName);
-    } catch (IOException e) {
-      e.printStackTrace();
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
       return null;
     }
   }
 
-  private void setReplicationDestination(String remoteName, String replicaSuffix)
+  private void setReplicationDestination(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationDestination(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
       throws IOException {
-    config.setString(
-        "remote",
-        remoteName,
-        "url",
-        gitPath.resolve("${name}" + replicaSuffix + ".git").toString());
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
     config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
     config.save();
-    reloadConfig();
   }
 
   private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
@@ -131,4 +272,40 @@
   private void reloadConfig() {
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
   }
+
+  private void waitForEmptyTasks() throws InterruptedException {
+    waitUntil(
+        () -> {
+          try {
+            return listReplicationTasks(".*").size() == 0;
+          } catch (Exception e) {
+            logger.atSevere().withCause(e).log("Failed to list replication tasks");
+            throw new IllegalStateException(e);
+          }
+        });
+  }
+
+  private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) throws IOException {
+    Pattern refmaskPattern = Pattern.compile(refRegex);
+    List<ReplicateRefUpdate> tasks = new ArrayList<>();
+    try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+      for (Path path : files) {
+        ReplicateRefUpdate task = readTask(path);
+        if (refmaskPattern.matcher(task.ref).matches()) {
+          tasks.add(readTask(path));
+        }
+      }
+    }
+
+    return tasks;
+  }
+
+  private ReplicateRefUpdate readTask(Path file) {
+    try (BufferedReader reader = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
+      return GSON.fromJson(reader, ReplicateRefUpdate.class);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to read replication task %s", file);
+      throw new IllegalStateException(e);
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index cf6715e..2767f53 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,15 +32,15 @@
 
   private ReplicationState replicationState;
   private PushResultProcessing pushResultProcessingMock;
-  private EventsStorage eventsStorage;
+  private ReplicationTasksStorage eventsStorage;
 
   @Before
   public void setUp() throws Exception {
     pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
     replay(pushResultProcessingMock);
-    eventsStorage = createNiceMock(EventsStorage.class);
+    eventsStorage = createNiceMock(ReplicationTasksStorage.class);
     replay(eventsStorage);
-    replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
+    replicationState = new ReplicationState(pushResultProcessingMock);
   }
 
   @Test