Get a URI lock before running tasks.

This allows more than one related gerrit process (as in multi-master
situations) to access the persistent task store at the same time without
deleting each other's task files. This also prevents more than one gerrit
process from pushing to the same URI at the same time.

Change-Id: I1f3f4d60605ff21038e722553cb38494e18b9ff9
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 4ebd67e..a5c318e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -563,7 +563,9 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      replicationTasksStorage.get().start(op);
+      if (!replicationTasksStorage.get().start(op)) {
+        return RunwayStatus.deniedExternal();
+      }
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index b46c278..d31ae3b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -319,6 +319,8 @@
     if (!status.isAllowed()) {
       if (status.isCanceled()) {
         repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+      } else if (status.isExternalInflight()) {
+        repLog.info("PushOp for replication to {} was denied externally", uri);
       } else {
         repLog.info(
             "Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index b069508..ed8d0b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,6 +25,7 @@
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -46,11 +47,14 @@
  * task:
  *
  * <p><code>
- *   .../building/<tmp_name>  new replication tasks under construction
- *   .../running/<sha1>       running replication tasks
- *   .../waiting/<sha1>       outstanding replication tasks
+ *   .../building/<tmp_name>             new replication tasks under construction
+ *   .../running/<uri_sha1>/             lock for URI
+ *   .../running/<uri_sha1>/<task_sha1>  running replication tasks
+ *   .../waiting/<task_sha1>             outstanding replication tasks
  * </code>
  *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
  * state of each task.
  */
@@ -60,26 +64,50 @@
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate {
-    public final String project;
+  public static class ReplicateRefUpdate extends UriUpdate {
     public final String ref;
-    public final String uri;
-    public final String remote;
 
-    public ReplicateRefUpdate(PushOne push, String ref) {
-      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
+    public ReplicateRefUpdate(UriUpdate update, String ref) {
+      this(update.project, ref, update.uri, update.remote);
     }
 
     public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this.project = project;
+      this(project, ref, uri.toASCIIString(), remote);
+    }
+
+    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+      super(project, uri, remote);
       this.ref = ref;
-      this.uri = uri.toASCIIString();
+    }
+
+    @Override
+    public String toString() {
+      return "ref-update " + ref + " (" + super.toString() + ")";
+    }
+  }
+
+  public static class UriUpdate {
+    public final String project;
+    public final String uri;
+    public final String remote;
+
+    public UriUpdate(PushOne push) {
+      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    }
+
+    public UriUpdate(String project, URIish uri, String remote) {
+      this(project, uri.toASCIIString(), remote);
+    }
+
+    public UriUpdate(String project, String uri, String remote) {
+      this.project = project;
+      this.uri = uri;
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -106,34 +134,74 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public void start(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).start();
+  public boolean start(PushOne push) {
+    UriLock lock = new UriLock(push);
+    if (!lock.acquire()) {
+      return false;
     }
+
+    boolean started = false;
+    for (String ref : push.getRefs()) {
+      started = new Task(lock, ref).start() || started;
+    }
+
+    if (!started) { // No tasks left, likely replicated externally
+      lock.release();
+    }
+    return started;
   }
 
   public void reset(PushOne push) {
+    UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).reset();
+      new Task(lock, ref).reset();
     }
+    lock.release();
   }
 
   public void resetAll() {
-    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
-      new Task(r).reset();
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+      for (Path dir : dirs) {
+        UriLock lock = null;
+        for (ReplicateRefUpdate u : list(dir)) {
+          if (lock == null) {
+            lock = new UriLock(u);
+          }
+          new Task(u).reset();
+        }
+        if (lock != null) {
+          lock.release();
+        }
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while aborting running tasks");
     }
   }
 
   public void finish(PushOne push) {
-    for (String ref : push.getRefs()) {
-      new Task(new ReplicateRefUpdate(push, ref)).finish();
+    UriLock lock = new UriLock(push);
+    for (ReplicateRefUpdate r : list(lock.runningDir)) {
+      new Task(lock, r.ref).finish();
     }
+    lock.release();
   }
 
   public List<ReplicateRefUpdate> listWaiting() {
     return list(createDir(waitingUpdates));
   }
 
+  private List<ReplicateRefUpdate> listSubs(Path parent) {
+    List<ReplicateRefUpdate> results = new ArrayList<>();
+    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(parent)) {
+      for (Path dir : dirs) {
+        results.addAll(list(dir));
+      }
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Error while listing tasks");
+    }
+    return results;
+  }
+
   private List<ReplicateRefUpdate> list(Path tasks) {
     List<ReplicateRefUpdate> results = new ArrayList<>();
     try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -162,6 +230,53 @@
     }
   }
 
+  private class UriLock {
+    public final UriUpdate update;
+    public final String uriKey;
+    public final Path runningDir;
+
+    public UriLock(PushOne push) {
+      this(new UriUpdate(push));
+    }
+
+    public UriLock(UriUpdate update) {
+      this.update = update;
+      uriKey = sha1(update.uri).name();
+      runningDir = createDir(runningUpdates).resolve(uriKey);
+    }
+
+    public boolean acquire() {
+      try {
+        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+        Files.createDirectory(runningDir);
+        return true;
+      } catch (FileAlreadyExistsException e) {
+        return false; // already running, likely externally
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+        return true; // safer to risk a duplicate than to skip it
+      }
+    }
+
+    public void release() {
+      if (disableDeleteForTesting) {
+        logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+        return;
+      }
+
+      try {
+        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+        Files.delete(runningDir);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+      }
+    }
+
+    private String updateLog() {
+      return String.format("(%s => %s)", update.project, update.uri);
+    }
+  }
+
   private class Task {
     public final ReplicateRefUpdate update;
     public final String json;
@@ -169,12 +284,16 @@
     public final Path running;
     public final Path waiting;
 
-    public Task(ReplicateRefUpdate update) {
-      this.update = update;
+    public Task(ReplicateRefUpdate r) {
+      this(new UriLock(r), r.ref);
+    }
+
+    public Task(UriLock lock, String ref) {
+      update = new ReplicateRefUpdate(lock.update, ref);
       json = GSON.toJson(update) + "\n";
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      running = createDir(runningUpdates).resolve(taskKey);
+      running = lock.runningDir.resolve(taskKey);
       waiting = createDir(waitingUpdates).resolve(taskKey);
     }
 
@@ -195,8 +314,9 @@
       return taskKey;
     }
 
-    public void start() {
+    public boolean start() {
       rename(waiting, running);
+      return Files.exists(running);
     }
 
     public void reset() {
@@ -213,7 +333,7 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
+  public static RunwayStatus deniedExternal() {
+    return new RunwayStatus(false, -1);
+  }
+
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -43,6 +47,10 @@
     return !allowed && inFlightPushId == 0;
   }
 
+  public boolean isExternalInflight() {
+    return !allowed && inFlightPushId == -1;
+  }
+
   public int getInFlightPushId() {
     return inFlightPushId;
   }