Fix potential loss of persisted replication task

There was a race window between the check to see if any new updates were
available for the completed update and the deletion of the persistent
update. If an update occurred in that window it could leave the event
missing from the persisted task store. If the server went down before
this update completed, the update would be missed entirely.

Eliminate the race by separating the running tasks from the waiting
tasks in the persistent store by placing each into their own
subdirectories. Place new waiting tasks in a "waiting" directory and
move them to the "running" directory once they are running. This allows
new updates to be persisted to the "waiting" directory while a similar
update is running without the waiting task getting deleted when the
persisted running task is deleted. On startup, reset all running tasks
by moving them back to the waiting directory to ensure that they are
retried.

Reset "running" tasks (return them to the "waiting" directory) when a
retry is rescheduled, this allows the retry to be consolidated with the
new run, and helps ensure that the persistence store reflects what is
actually happening better.

Bug: Issue 11672
Change-Id: Ia31329e8d939f8e5cb1e7455de69744431f34d66
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index dbdb325..4ebd67e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -60,7 +60,6 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -522,6 +521,7 @@
         pending.put(uri, pushOp);
         switch (reason) {
           case COLLISION:
+            replicationTasksStorage.get().reset(pushOp);
             @SuppressWarnings("unused")
             ScheduledFuture<?> ignored =
                 pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
@@ -536,6 +536,7 @@
             postReplicationFailedEvent(pushOp, status);
             if (pushOp.setToRetry()) {
               postReplicationScheduledEvent(pushOp);
+              replicationTasksStorage.get().reset(pushOp);
               @SuppressWarnings("unused")
               ScheduledFuture<?> ignored2 =
                   pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
@@ -562,25 +563,16 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
+      replicationTasksStorage.get().start(op);
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
   }
 
-  void notifyFinished(PushOne task) {
+  void notifyFinished(PushOne op) {
     synchronized (stateLock) {
-      inFlight.remove(task.getURI());
-      if (!task.wasCanceled()) {
-        for (String ref : task.getRefs()) {
-          if (!refHasPendingPush(task.getURI(), ref)) {
-            replicationTasksStorage
-                .get()
-                .delete(
-                    new ReplicateRefUpdate(
-                        task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
-          }
-        }
-      }
+      replicationTasksStorage.get().finish(op);
+      inFlight.remove(op.getURI());
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index e02fafc..5dc8f73 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -78,6 +78,7 @@
     if (!running) {
       destinations.get().startup(workQueue);
       running = true;
+      replicationTasksStorage.resetAll();
       firePendingEvents();
       fireBeforeStartupEvents();
     }
@@ -153,7 +154,7 @@
     try {
       Set<String> eventsReplayed = new HashSet<>();
       replaying = true;
-      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+      for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.info("Firing pending task {}", eventKey);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3b174a3..950ef26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -27,11 +27,32 @@
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 
+/**
+ * A persistent store for replication tasks.
+ *
+ * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is
+ * determined by the replication.eventsDirectory config option and defaults to
+ * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to
+ * anywhere within the store. This generally means that all the contents of the store needs to live
+ * on the same filesystem.
+ *
+ * <p>Individual tasks are stored in files under the following directories using the sha1 of the
+ * task:
+ *
+ * <p><code>
+ *   .../running/<sha1>    running replication tasks
+ *   .../waiting/<sha1>    outstanding replication tasks
+ * </code>
+ *
+ * <p>Tasks are moved atomically via a rename between those directories to indicate the current
+ * state of each task.
+ */
 @Singleton
 public class ReplicationTasksStorage {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -44,6 +65,10 @@
     public final String uri;
     public final String remote;
 
+    public ReplicateRefUpdate(PushOne push, String ref) {
+      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    }
+
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
       this.project = project;
       this.ref = ref;
@@ -59,11 +84,14 @@
 
   private static Gson GSON = new Gson();
 
-  private final Path refUpdates;
+  private final Path runningUpdates;
+  private final Path waitingUpdates;
 
   @Inject
   ReplicationTasksStorage(ReplicationConfig config) {
-    refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+    runningUpdates = refUpdates.resolve("running");
+    waitingUpdates = refUpdates.resolve("waiting");
   }
 
   public String persist(ReplicateRefUpdate r) {
@@ -75,13 +103,37 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public void delete(ReplicateRefUpdate r) {
-    new Task(r).delete();
+  public void start(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).start();
+    }
   }
 
-  public List<ReplicateRefUpdate> list() {
+  public void reset(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).reset();
+    }
+  }
+
+  public void resetAll() {
+    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
+      new Task(r).reset();
+    }
+  }
+
+  public void finish(PushOne push) {
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).finish();
+    }
+  }
+
+  public List<ReplicateRefUpdate> listWaiting() {
+    return list(createDir(waitingUpdates));
+  }
+
+  private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
       for (Path e : events) {
         if (Files.isRegularFile(e)) {
           String json = new String(Files.readAllBytes(e), UTF_8);
@@ -89,7 +141,7 @@
         }
       }
     } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error when firing pending tasks");
+      logger.atSevere().withCause(e).log("Error while listing tasks");
     }
     return results;
   }
@@ -99,11 +151,11 @@
     return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
   }
 
-  private Path refUpdates() {
+  private static Path createDir(Path dir) {
     try {
-      return Files.createDirectories(refUpdates);
+      return Files.createDirectories(dir);
     } catch (IOException e) {
-      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+      throw new ProvisionException(String.format("Couldn't create %s", dir), e);
     }
   }
 
@@ -111,44 +163,63 @@
     public final ReplicateRefUpdate update;
     public final String json;
     public final String taskKey;
-    public final Path file;
+    public final Path running;
+    public final Path waiting;
 
     public Task(ReplicateRefUpdate update) {
       this.update = update;
       json = GSON.toJson(update) + "\n";
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      file = refUpdates().resolve(taskKey);
+      running = createDir(runningUpdates).resolve(taskKey);
+      waiting = createDir(waitingUpdates).resolve(taskKey);
     }
 
     public String persist() {
-      if (Files.exists(file)) {
+      if (Files.exists(waiting)) {
         return taskKey;
       }
 
       try {
-        logger.atFine().log("CREATE %s %s", file, updateLog());
-        Files.write(file, json.getBytes(UTF_8));
+        logger.atFine().log("CREATE %s %s", waiting, updateLog());
+        Files.write(waiting, json.getBytes(UTF_8));
       } catch (IOException e) {
         logger.atWarning().withCause(e).log("Couldn't persist task %s", json);
       }
       return taskKey;
     }
 
-    public void delete() {
+    public void start() {
+      rename(waiting, running);
+    }
+
+    public void reset() {
+      rename(running, waiting);
+    }
+
+    public void finish() {
       if (disableDeleteForTesting) {
-        logger.atFine().log("DELETE %s %s DISABLED", file, updateLog());
+        logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
         return;
       }
 
       try {
-        logger.atFine().log("DELETE %s %s", file, updateLog());
-        Files.delete(file);
+        logger.atFine().log("DELETE %s %s", running, updateLog());
+        Files.delete(running);
       } catch (IOException e) {
         logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
       }
     }
 
+    private void rename(Path from, Path to) {
+      try {
+        logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
+        Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+      }
+    }
+
     private String updateLog() {
       return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
     }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index d9d4bae..7233061 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -142,6 +142,11 @@
 	persisted events will not be deleted. When the plugin is started again,
 	it will trigger all replications found under this directory.
 
+	For replication to work, is is important that atomic renames be possible
+	from within any subdirectory of the eventsDirectory to within any other
+	subdirectory of the eventsDirectory. This generally means that the entire
+	contents of the eventsDirectory should live on the same filesystem.
+
 	When not set, defaults to the plugin's data directory.
 
 remote.NAME.url
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 3b7a3ed..19269f0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -399,7 +399,7 @@
 
   private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
-    return tasksStorage.list().stream()
+    return tasksStorage.listWaiting().stream()
         .filter(task -> refmaskPattern.matcher(task.ref).matches())
         .collect(toList());
   }