Complete Multi-master safety for the event plugin

Use a new EventSequence to synchronize the event submissions with the
Head sequence.  This completes the last remaining piece to ensure that
the event plugin is multi-master safe.  The fsstore under
site_dir>/data/plugin/events/fsstore-2 must reside  on a cluster (likely
NFS) filesystem in order to share the FsStore among multiple nodes.

Change-Id: I92aa11811f28d5339d56484f5cdd9335795be749
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java
index 2a15167..8a7fca6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java
@@ -56,6 +56,11 @@
     order = Long.toString(orders);
   }
 
+  /** Get the Path where `i` should be stored. */
+  public Path path(long i) {
+    return dir(i).resolve(Long.toString(i));
+  }
+
   /** Get the Path of the block that `i` should be stored in. */
   public Path dir(long i) {
     Path dir = base.resolve(subtreeName(i));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
new file mode 100644
index 0000000..d894816
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events.fsstore;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Use a file to store a sequence in a multi node/process (Multi-Master) safe way, and allow an
+ * event to be delivered for each update to the sequence.
+ *
+ * <p>The event submitter must perform the first phase of the UpdatableFileValue transaction by
+ * initiating the ownerless transaction with an event file in it. Any actor may perform the
+ * remaining 5 UpdatableFileValue transaction phases.
+ */
+public class EventSequence extends UpdatableFileValue<Long> {
+  public static final Path EVENT = Paths.get("event");
+
+  protected static class EventBuilder extends UpdatableFileValue.UpdateBuilder {
+    public EventBuilder(BasePaths paths, String event) throws IOException {
+      super(paths);
+      FileValue.prepare(udir.resolve(EVENT), event);
+      // build/<tmp>/<uuid>/event
+    }
+  }
+
+  protected class UniqueUpdate extends UpdatableFileValue.UniqueUpdate<Long> {
+    final Path event;
+    Path destination;
+
+    UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
+      super(EventSequence.this, uuid, ours, maxTries);
+      event = upaths.udir.resolve(EVENT);
+      spinFinish();
+    }
+
+    @Override
+    protected void finish() throws IOException {
+      storeEvent();
+      super.finish();
+    }
+
+    protected void storeEvent() throws IOException {
+      Path destination = getEventDestination(next);
+      if (Fs.tryAtomicMove(event, destination)) {
+        // update/<uuid>/event -> destination
+        this.destination = destination;
+      }
+    }
+  }
+
+  public long totalSpins;
+  public long totalUpdates;
+
+  public EventSequence(Path base) {
+    super(base);
+  }
+
+  public void initFs() throws IOException {
+    initFs((long) 0);
+  }
+
+  protected UniqueUpdate spinSubmit(String event, long maxTries) throws IOException {
+    try (EventBuilder b = new EventBuilder(paths, event)) {
+      for (long tries = 0; tries < maxTries; tries++) {
+        if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
+          // update/<uuid>/event
+          synchronized (this) {
+            totalUpdates++;
+            totalSpins += tries - 1;
+          }
+          return createUniqueUpdate(b.uuid, true, maxTries - tries);
+        }
+
+        UniqueUpdate update = (UniqueUpdate) completeOngoing(maxTries - tries);
+        if (update != null) {
+          tries += update.tries - 1;
+        }
+      }
+      throw new IOException("Cannot submit event " + paths.base + " after " + maxTries + " tries.");
+    }
+  }
+
+  protected Long getToValue(Long currentValue) {
+    return currentValue + 1;
+  }
+
+  protected UniqueUpdate createUniqueUpdate(String uuid, boolean ours, long maxTries)
+      throws IOException {
+    return new UniqueUpdate(uuid, ours, maxTries);
+  }
+
+  protected Path getEventDestination(Long n) {
+    return paths.base.resolve(EVENT);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
index 01488f4..8109836 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
@@ -18,13 +18,12 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.events.EventStore;
 import java.io.IOException;
-import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.UUID;
 import javax.inject.Singleton;
 
-/** This class is only Thread, but not process (Multi-Master) safe */
+/** Use a filesystem to store events in a multi node/process (Multi-Master) safe way. */
 @Singleton
 public class FsStore implements EventStore {
   protected static class BasePaths {
@@ -39,35 +38,46 @@
       head = base.resolve("head");
       tail = base.resolve("tail");
     }
+  }
 
-    public Path event(long event) {
-      return events.dir(event).resolve(Long.toString(event));
+  protected static class Head extends EventSequence {
+    FsStore.BasePaths paths;
+
+    Head(FsStore.BasePaths paths) {
+      super(paths.head);
+      this.paths = paths;
     }
 
-    public boolean isEventLastDirEntry(long event) {
-      return events.isLastDirEntry(event);
+    protected Path getEventDestination(Long n) {
+      Path event = paths.events.path(n);
+      try {
+        Fs.createDirectories(event.getParent());
+      } catch (IOException e) {
+      }
+      return event;
     }
   }
 
   protected static class Stores {
     final FsId uuid;
-    final FsSequence head;
+    final Head head;
     final FsSequence tail;
 
     public Stores(BasePaths bases) {
       uuid = new FsId(bases.uuid);
-      head = new FsSequence(bases.head);
+      head = new Head(bases);
       tail = new FsSequence(bases.tail);
     }
 
     public void initFs() throws IOException {
       uuid.initFs();
-      head.initFs((long) 0);
+      head.initFs();
       tail.initFs((long) 1);
     }
   }
 
   public static final long MAX_GET_SPINS = 1000;
+  public static final long MAX_SUBMIT_SPINS = 100000;
   public static final long MAX_INCREMENT_SPINS = 1000;
 
   protected final BasePaths paths;
@@ -76,7 +86,7 @@
 
   @Inject
   public FsStore(SitePaths site) throws IOException {
-    this(site.data_dir.toPath().resolve("plugin").resolve("events").resolve("fstore-v1.2"));
+    this(site.data_dir.toPath().resolve("plugin").resolve("events").resolve("fstore-v2"));
   }
 
   public FsStore(Path base) throws IOException {
@@ -91,14 +101,9 @@
     return uuid;
   }
 
-  /** Only Thread, but not process (Multi-Master) safe */
   @Override
-  public synchronized void add(String event) throws IOException {
-    long next = getHead() + 1;
-    Path epath = paths.event(next);
-    Fs.createDirectories(epath.getParent());
-    Files.write(epath, (event + "\n").getBytes());
-    stores.head.spinIncrement(MAX_INCREMENT_SPINS);
+  public void add(String event) throws IOException {
+    stores.head.spinSubmit(event + "\n", MAX_SUBMIT_SPINS);
   }
 
   @Override
@@ -110,7 +115,7 @@
   public String get(long num) throws IOException {
     if (getTail() <= num && num <= getHead()) {
       try {
-        return Fs.readFile(paths.event(num));
+        return Fs.readFile(paths.events.path(num));
       } catch (NoSuchFileException e) {
       }
     }
@@ -135,9 +140,9 @@
     if (trim > 0) {
       for (long i = getTail(); i <= trim; i++) {
         long delete = stores.tail.spinIncrement(MAX_INCREMENT_SPINS) - 1;
-        Path event = paths.event(delete);
+        Path event = paths.events.path(delete);
         Fs.tryRecursiveDelete(event);
-        if (paths.isEventLastDirEntry(delete)) {
+        if (paths.events.isLastDirEntry(delete)) {
           Fs.unsafeRecursiveRmdir(event.getParent().toFile());
         }
       }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
new file mode 100644
index 0000000..5e0fb94
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events.fsstore;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class EventSequenceTest extends TestCase {
+  private static String dir = "events-EventSequence";
+  private static Path base;
+  private Path myBase;
+  private EventSequence seq;
+  private long count = 1000;
+  private long maxSpins = 1;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    myBase = base;
+    if (myBase == null) {
+      myBase = Files.createTempDirectory(dir);
+    }
+    seq = new EventSequence(myBase);
+    seq.initFs((long) 0);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Fs.tryRecursiveDelete(myBase);
+  }
+
+  @Test
+  public void testGetZero() throws IOException {
+    assertEquals((long) 0, (long) seq.get());
+  }
+
+  @Test
+  public void testSpinSubmit() throws IOException {
+    Long next = seq.get() + (long) 1;
+    String event = "Event " + next;
+
+    EventSequence.UniqueUpdate up = seq.spinSubmit(event, maxSpins);
+    assertEquals(next, seq.get());
+    assertNotNull(up.destination);
+    assertEquals(event, Fs.readFile(up.destination));
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
index 5885c05..e852459 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
@@ -217,10 +217,12 @@
    *
    * <p>Note: if you do not specify <dir>, it will create a directory under /tmp
    *
-   * <p>Performance: NFS(Lowlatency,SSDs), 1 worker 10K, 2m33s ~153ms/event find events|wc -l .4s rm
-   * -rf 1.3s
+   * <p>Performance: NFS(Lowlatency,SSDs), 1 worker 1M, 266m ~16ms/event find events|wc -l 12s rm
+   * -rf 1m49s du -sh -> 3.9G 1m7s
    *
-   * <p>Local(spinning) 1 workers 1M 16m7s ~1ms/event find events|wc -l 1.6s rm -rf 36s
+   * <p>Local(spinning) 1 workers 1M 14.34s ~14us/event find events|wc -l 1.3s rm -rf 42s
+   *
+   * <p>Mixed workers: NFS(WAN) 1 worker (+NFS LAN continuous) count=10, 10m6s
    */
   public static void main(String[] argv) throws Exception {
     if (argv.length > 0) {