Revert "Get a URI lock before running tasks."

This reverts commit 7a130c30c42ca8bc1e8858513ee8643ca6041b3a.

Reason for revert: May still be missing some lock release calls
and caused the loss of replication events in queue when backed
by the same task storage file.

Bug: Issue 12986
Change-Id: Ib13a02f2748448ade8e56339eac764fe916e3b9c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 936bb2c..adc3133 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -583,9 +583,7 @@
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      if (!replicationTasksStorage.get().start(op)) {
-        return RunwayStatus.deniedExternal();
-      }
+      replicationTasksStorage.get().start(op);
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 9ccb70c..d598925 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -335,8 +335,6 @@
       if (status.isCanceled()) {
         repLog.atInfo().log(
             "PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
-      } else if (status.isExternalInflight()) {
-        repLog.atInfo().log("PushOp for replication to %s was denied externally", uri);
       } else {
         repLog.atInfo().log(
             "Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5af0983..506b175 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -26,7 +26,6 @@
 import java.io.IOException;
 import java.nio.file.DirectoryIteratorException;
 import java.nio.file.DirectoryStream;
-import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
@@ -50,13 +49,10 @@
  *
  * <p><code>
  *   .../building/<tmp_name>                       new replication tasks under construction
- *   .../running/<uri_sha1>/                       lock for URI
- *   .../running/<uri_sha1>/<task_sha1>            running replication tasks
+ *   .../running/<sha1>                            running replication tasks
  *   .../waiting/<task_sha1_NN_shard>/<task_sha1>  outstanding replication tasks
  * </code>
  *
- * <p>The URI lock is acquired by creating the directory and released by removing it.
- *
  * <p>Tasks are moved atomically via a rename between those directories to indicate the current
  * state of each task.
  *
@@ -69,50 +65,26 @@
 
   private boolean disableDeleteForTesting;
 
-  public static class ReplicateRefUpdate extends UriUpdate {
-    public final String ref;
-
-    public ReplicateRefUpdate(UriUpdate update, String ref) {
-      this(update.project, ref, update.uri, update.remote);
-    }
-
-    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
-      this(project, ref, uri.toASCIIString(), remote);
-    }
-
-    protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
-      super(project, uri, remote);
-      this.ref = ref;
-    }
-
-    @Override
-    public String toString() {
-      return "ref-update " + ref + " (" + super.toString() + ")";
-    }
-  }
-
-  public static class UriUpdate {
+  public static class ReplicateRefUpdate {
     public final String project;
+    public final String ref;
     public final String uri;
     public final String remote;
 
-    public UriUpdate(PushOne push) {
-      this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+    public ReplicateRefUpdate(PushOne push, String ref) {
+      this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
     }
 
-    public UriUpdate(String project, URIish uri, String remote) {
-      this(project, uri.toASCIIString(), remote);
-    }
-
-    public UriUpdate(String project, String uri, String remote) {
+    public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
       this.project = project;
-      this.uri = uri;
+      this.ref = ref;
+      this.uri = uri.toASCIIString();
       this.remote = remote;
     }
 
     @Override
     public String toString() {
-      return "uri-update " + project + " uri:" + uri + " remote:" + remote;
+      return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
     }
   }
 
@@ -140,52 +112,21 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
-  public synchronized boolean start(PushOne push) {
-    UriLock lock = new UriLock(push);
-    if (!lock.acquire()) {
-      return false;
-    }
-
-    boolean started = false;
+  public synchronized void start(PushOne push) {
     for (String ref : push.getRefs()) {
-      started = new Task(lock, ref).start() || started;
+      new Task(new ReplicateRefUpdate(push, ref)).start();
     }
-
-    if (!started) { // No tasks left, likely replicated externally
-      lock.release();
-    }
-    return started;
   }
 
   public synchronized void reset(PushOne push) {
-    UriLock lock = new UriLock(push);
     for (String ref : push.getRefs()) {
-      new Task(lock, ref).reset();
+      new Task(new ReplicateRefUpdate(push, ref)).reset();
     }
-    lock.release();
   }
 
   public synchronized void resetAll() {
-    try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
-      for (Path dir : dirs) {
-        UriLock lock = null;
-        try {
-          for (ReplicateRefUpdate u : list(dir)) {
-            if (lock == null) {
-              lock = new UriLock(u);
-            }
-            new Task(u).reset();
-          }
-        } catch (DirectoryIteratorException d) {
-          // iterating over the sub-directories is expected to have dirs disappear
-          Nfs.throwIfNotStaleFileHandle(d.getCause());
-        }
-        if (lock != null) {
-          lock.release();
-        }
-      }
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error while aborting running tasks");
+    for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
+      new Task(r).reset();
     }
   }
 
@@ -194,11 +135,9 @@
   }
 
   public void finish(PushOne push) {
-    UriLock lock = new UriLock(push);
-    for (ReplicateRefUpdate r : list(lock.runningDir)) {
-      new Task(lock, r.ref).finish();
+    for (String ref : push.getRefs()) {
+      new Task(new ReplicateRefUpdate(push, ref)).finish();
     }
-    lock.release();
   }
 
   public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -263,73 +202,22 @@
     }
   }
 
-  private class UriLock {
-    public final UriUpdate update;
-    public final String uriKey;
-    public final Path runningDir;
-
-    public UriLock(PushOne push) {
-      this(new UriUpdate(push));
-    }
-
-    public UriLock(UriUpdate update) {
-      this.update = update;
-      uriKey = sha1(update.uri).name();
-      runningDir = createDir(runningUpdates).resolve(uriKey);
-    }
-
-    public boolean acquire() {
-      try {
-        logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
-        Files.createDirectory(runningDir);
-        return true;
-      } catch (FileAlreadyExistsException e) {
-        return false; // already running, likely externally
-      } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
-        return true; // safer to risk a duplicate than to skip it
-      }
-    }
-
-    public void release() {
-      try {
-        if (disableDeleteForTesting && Files.list(runningDir).findFirst().isPresent()) {
-          logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
-          return;
-        }
-
-        logger.atFine().log("DELETE %s %s", runningDir, updateLog());
-        Files.delete(runningDir);
-      } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
-      }
-    }
-
-    private String updateLog() {
-      return String.format("(%s => %s)", update.project, update.uri);
-    }
-  }
-
   private class Task {
     public final ReplicateRefUpdate update;
     public final String taskKey;
     public final Path running;
     public final Path waiting;
 
-    public Task(ReplicateRefUpdate r) {
-      this(new UriLock(r), r.ref);
-    }
-
     public Task(PushOne push, String ref) {
-      this(new UriLock(push), ref);
+      this(new ReplicateRefUpdate(push, ref));
     }
 
-    public Task(UriLock lock, String ref) {
-      update = new ReplicateRefUpdate(lock.update, ref);
+    public Task(ReplicateRefUpdate update) {
+      this.update = update;
       String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
       taskKey = sha1(key).name();
-      running = lock.runningDir.resolve(taskKey);
-      waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
+      running = createDir(runningUpdates).resolve(taskKey);
+      waiting = createDir(waitingUpdates).resolve(taskKey);
     }
 
     public String create() {
@@ -350,9 +238,8 @@
       return taskKey;
     }
 
-    public boolean start() {
+    public void start() {
       rename(waiting, running);
-      return Files.exists(running);
     }
 
     public void reset() {
@@ -373,7 +260,7 @@
         logger.atFine().log("DELETE %s %s", running, updateLog());
         Files.delete(running);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
+        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
       }
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index f7071d8..bcb1e2f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,10 +27,6 @@
     return new RunwayStatus(false, inFlightPushId);
   }
 
-  public static RunwayStatus deniedExternal() {
-    return new RunwayStatus(false, -1);
-  }
-
   private final boolean allowed;
   private final int inFlightPushId;
 
@@ -47,10 +43,6 @@
     return !allowed && inFlightPushId == 0;
   }
 
-  public boolean isExternalInflight() {
-    return !allowed && inFlightPushId == -1;
-  }
-
   public int getInFlightPushId() {
     return inFlightPushId;
   }