Complete Multi-master safety for the event plugin
Use a new EventSequence to synchronize the event submissions with the
Head sequence. This completes the last remaining piece to ensure that
the event plugin is multi-master safe. The fsstore under
site_dir>/data/plugin/events/fsstore-2 must reside on a cluster (likely
NFS) filesystem in order to share the FsStore among multiple nodes.
Change-Id: I92aa11811f28d5339d56484f5cdd9335795be749
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java
index 2a15167..8a7fca6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/DynamicRangeSharder.java
@@ -56,6 +56,11 @@
order = Long.toString(orders);
}
+ /** Get the Path where `i` should be stored. */
+ public Path path(long i) {
+ return dir(i).resolve(Long.toString(i));
+ }
+
/** Get the Path of the block that `i` should be stored in. */
public Path dir(long i) {
Path dir = base.resolve(subtreeName(i));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
new file mode 100644
index 0000000..d894816
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events.fsstore;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Use a file to store a sequence in a multi node/process (Multi-Master) safe way, and allow an
+ * event to be delivered for each update to the sequence.
+ *
+ * <p>The event submitter must perform the first phase of the UpdatableFileValue transaction by
+ * initiating the ownerless transaction with an event file in it. Any actor may perform the
+ * remaining 5 UpdatableFileValue transaction phases.
+ */
+public class EventSequence extends UpdatableFileValue<Long> {
+ public static final Path EVENT = Paths.get("event");
+
+ protected static class EventBuilder extends UpdatableFileValue.UpdateBuilder {
+ public EventBuilder(BasePaths paths, String event) throws IOException {
+ super(paths);
+ FileValue.prepare(udir.resolve(EVENT), event);
+ // build/<tmp>/<uuid>/event
+ }
+ }
+
+ protected class UniqueUpdate extends UpdatableFileValue.UniqueUpdate<Long> {
+ final Path event;
+ Path destination;
+
+ UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
+ super(EventSequence.this, uuid, ours, maxTries);
+ event = upaths.udir.resolve(EVENT);
+ spinFinish();
+ }
+
+ @Override
+ protected void finish() throws IOException {
+ storeEvent();
+ super.finish();
+ }
+
+ protected void storeEvent() throws IOException {
+ Path destination = getEventDestination(next);
+ if (Fs.tryAtomicMove(event, destination)) {
+ // update/<uuid>/event -> destination
+ this.destination = destination;
+ }
+ }
+ }
+
+ public long totalSpins;
+ public long totalUpdates;
+
+ public EventSequence(Path base) {
+ super(base);
+ }
+
+ public void initFs() throws IOException {
+ initFs((long) 0);
+ }
+
+ protected UniqueUpdate spinSubmit(String event, long maxTries) throws IOException {
+ try (EventBuilder b = new EventBuilder(paths, event)) {
+ for (long tries = 0; tries < maxTries; tries++) {
+ if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
+ // update/<uuid>/event
+ synchronized (this) {
+ totalUpdates++;
+ totalSpins += tries - 1;
+ }
+ return createUniqueUpdate(b.uuid, true, maxTries - tries);
+ }
+
+ UniqueUpdate update = (UniqueUpdate) completeOngoing(maxTries - tries);
+ if (update != null) {
+ tries += update.tries - 1;
+ }
+ }
+ throw new IOException("Cannot submit event " + paths.base + " after " + maxTries + " tries.");
+ }
+ }
+
+ protected Long getToValue(Long currentValue) {
+ return currentValue + 1;
+ }
+
+ protected UniqueUpdate createUniqueUpdate(String uuid, boolean ours, long maxTries)
+ throws IOException {
+ return new UniqueUpdate(uuid, ours, maxTries);
+ }
+
+ protected Path getEventDestination(Long n) {
+ return paths.base.resolve(EVENT);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
index 01488f4..8109836 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
@@ -18,13 +18,12 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.events.EventStore;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.UUID;
import javax.inject.Singleton;
-/** This class is only Thread, but not process (Multi-Master) safe */
+/** Use a filesystem to store events in a multi node/process (Multi-Master) safe way. */
@Singleton
public class FsStore implements EventStore {
protected static class BasePaths {
@@ -39,35 +38,46 @@
head = base.resolve("head");
tail = base.resolve("tail");
}
+ }
- public Path event(long event) {
- return events.dir(event).resolve(Long.toString(event));
+ protected static class Head extends EventSequence {
+ FsStore.BasePaths paths;
+
+ Head(FsStore.BasePaths paths) {
+ super(paths.head);
+ this.paths = paths;
}
- public boolean isEventLastDirEntry(long event) {
- return events.isLastDirEntry(event);
+ protected Path getEventDestination(Long n) {
+ Path event = paths.events.path(n);
+ try {
+ Fs.createDirectories(event.getParent());
+ } catch (IOException e) {
+ }
+ return event;
}
}
protected static class Stores {
final FsId uuid;
- final FsSequence head;
+ final Head head;
final FsSequence tail;
public Stores(BasePaths bases) {
uuid = new FsId(bases.uuid);
- head = new FsSequence(bases.head);
+ head = new Head(bases);
tail = new FsSequence(bases.tail);
}
public void initFs() throws IOException {
uuid.initFs();
- head.initFs((long) 0);
+ head.initFs();
tail.initFs((long) 1);
}
}
public static final long MAX_GET_SPINS = 1000;
+ public static final long MAX_SUBMIT_SPINS = 100000;
public static final long MAX_INCREMENT_SPINS = 1000;
protected final BasePaths paths;
@@ -76,7 +86,7 @@
@Inject
public FsStore(SitePaths site) throws IOException {
- this(site.data_dir.toPath().resolve("plugin").resolve("events").resolve("fstore-v1.2"));
+ this(site.data_dir.toPath().resolve("plugin").resolve("events").resolve("fstore-v2"));
}
public FsStore(Path base) throws IOException {
@@ -91,14 +101,9 @@
return uuid;
}
- /** Only Thread, but not process (Multi-Master) safe */
@Override
- public synchronized void add(String event) throws IOException {
- long next = getHead() + 1;
- Path epath = paths.event(next);
- Fs.createDirectories(epath.getParent());
- Files.write(epath, (event + "\n").getBytes());
- stores.head.spinIncrement(MAX_INCREMENT_SPINS);
+ public void add(String event) throws IOException {
+ stores.head.spinSubmit(event + "\n", MAX_SUBMIT_SPINS);
}
@Override
@@ -110,7 +115,7 @@
public String get(long num) throws IOException {
if (getTail() <= num && num <= getHead()) {
try {
- return Fs.readFile(paths.event(num));
+ return Fs.readFile(paths.events.path(num));
} catch (NoSuchFileException e) {
}
}
@@ -135,9 +140,9 @@
if (trim > 0) {
for (long i = getTail(); i <= trim; i++) {
long delete = stores.tail.spinIncrement(MAX_INCREMENT_SPINS) - 1;
- Path event = paths.event(delete);
+ Path event = paths.events.path(delete);
Fs.tryRecursiveDelete(event);
- if (paths.isEventLastDirEntry(delete)) {
+ if (paths.events.isLastDirEntry(delete)) {
Fs.unsafeRecursiveRmdir(event.getParent().toFile());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
new file mode 100644
index 0000000..5e0fb94
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events.fsstore;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import junit.framework.TestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class EventSequenceTest extends TestCase {
+ private static String dir = "events-EventSequence";
+ private static Path base;
+ private Path myBase;
+ private EventSequence seq;
+ private long count = 1000;
+ private long maxSpins = 1;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ myBase = base;
+ if (myBase == null) {
+ myBase = Files.createTempDirectory(dir);
+ }
+ seq = new EventSequence(myBase);
+ seq.initFs((long) 0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Fs.tryRecursiveDelete(myBase);
+ }
+
+ @Test
+ public void testGetZero() throws IOException {
+ assertEquals((long) 0, (long) seq.get());
+ }
+
+ @Test
+ public void testSpinSubmit() throws IOException {
+ Long next = seq.get() + (long) 1;
+ String event = "Event " + next;
+
+ EventSequence.UniqueUpdate up = seq.spinSubmit(event, maxSpins);
+ assertEquals(next, seq.get());
+ assertNotNull(up.destination);
+ assertEquals(event, Fs.readFile(up.destination));
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
index 5885c05..e852459 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
@@ -217,10 +217,12 @@
*
* <p>Note: if you do not specify <dir>, it will create a directory under /tmp
*
- * <p>Performance: NFS(Lowlatency,SSDs), 1 worker 10K, 2m33s ~153ms/event find events|wc -l .4s rm
- * -rf 1.3s
+ * <p>Performance: NFS(Lowlatency,SSDs), 1 worker 1M, 266m ~16ms/event find events|wc -l 12s rm
+ * -rf 1m49s du -sh -> 3.9G 1m7s
*
- * <p>Local(spinning) 1 workers 1M 16m7s ~1ms/event find events|wc -l 1.6s rm -rf 36s
+ * <p>Local(spinning) 1 workers 1M 14.34s ~14us/event find events|wc -l 1.3s rm -rf 42s
+ *
+ * <p>Mixed workers: NFS(WAN) 1 worker (+NFS LAN continuous) count=10, 10m6s
*/
public static void main(String[] argv) throws Exception {
if (argv.length > 0) {