Improve the docs for the event plugin
Particularly improve the documentation of the various phases and their
transitions (renames). Add some more performance numbers.
Change-Id: Icfb668213e180267cb303df652c77c83dd6f2c22
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
index d894816..1c51219 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
@@ -22,17 +22,21 @@
* Use a file to store a sequence in a multi node/process (Multi-Master) safe way, and allow an
* event to be delivered for each update to the sequence.
*
+ * <p>Adds phase 1+, add an event file in the <uuid> dir.
+ * <p>Adds phase 2+, move the event to the event store.
+ *
* <p>The event submitter must perform the first phase of the UpdatableFileValue transaction by
* initiating the ownerless transaction with an event file in it. Any actor may perform the
- * remaining 5 UpdatableFileValue transaction phases.
+ * remaining 5 UpdatableFileValue.
*/
public class EventSequence extends UpdatableFileValue<Long> {
public static final Path EVENT = Paths.get("event");
+ /** For phase 1 - 1+ */
protected static class EventBuilder extends UpdatableFileValue.UpdateBuilder {
public EventBuilder(BasePaths paths, String event) throws IOException {
super(paths);
- FileValue.prepare(udir.resolve(EVENT), event);
+ FileValue.prepare(udir.resolve(EVENT), event); // Phase 1+
// build/<tmp>/<uuid>/event
}
}
@@ -41,6 +45,7 @@
final Path event;
Path destination;
+ /** Advance through phases 2 - 6 */
UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
super(EventSequence.this, uuid, ours, maxTries);
event = upaths.udir.resolve(EVENT);
@@ -53,10 +58,12 @@
super.finish();
}
+ /** Contains phase 2+ */
protected void storeEvent() throws IOException {
Path destination = getEventDestination(next);
- if (Fs.tryAtomicMove(event, destination)) {
- // update/<uuid>/event -> destination
+ // Phase 2+
+ if (Fs.tryAtomicMove(event, destination)) { // rename update/<uuid>/event -> destination
+ // now there should be: update/<uuid>/ and destination (file)
this.destination = destination;
}
}
@@ -73,16 +80,18 @@
initFs((long) 0);
}
+ // Advance through phases 1 - 6
protected UniqueUpdate spinSubmit(String event, long maxTries) throws IOException {
try (EventBuilder b = new EventBuilder(paths, event)) {
for (long tries = 0; tries < maxTries; tries++) {
- if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
- // update/<uuid>/event
+ // Phase 1
+ if (Fs.tryAtomicMove(b.dir, paths.update)) { // rename build/<tmp>/ -> update/
+ // now there should be: update/<uuid>/event
synchronized (this) {
totalUpdates++;
totalSpins += tries - 1;
}
- return createUniqueUpdate(b.uuid, true, maxTries - tries);
+ return createUniqueUpdate(b.uuid, true, maxTries - tries); // Advance phases 2 - 6
}
UniqueUpdate update = (UniqueUpdate) completeOngoing(maxTries - tries);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java
index 76d4046..7d2d98e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java
@@ -25,6 +25,7 @@
* value.
*/
public class FsSequence extends UpdatableFileValue<Long> {
+ /** Advance through phases 2 - 6 */
protected class UniqueUpdate extends UpdatableFileValue.UniqueUpdate<Long> {
UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
super(FsSequence.this, uuid, ours, maxTries);
@@ -44,7 +45,7 @@
}
/**
- * Attempt up to maxTries to increment the sequence
+ * Attempt up to maxTries to increment the sequence (advance through all 6 phases).
*
* @param maxTries How many times to attempt to increment the sequence
* @return the new sequence value after this increment.
@@ -55,9 +56,10 @@
try (UpdateBuilder b = new UpdateBuilder(paths)) {
for (; tries < maxTries; tries++) {
UniqueUpdate update = null;
- if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
- update = new UniqueUpdate(b.uuid, true, maxTries);
- // update/<uuid>/
+ // Phase 1
+ if (Fs.tryAtomicMove(b.dir, paths.update)) { // rename build/<tmp>/ -> update/
+ // now there should be: update/<uuid>/
+ update = new UniqueUpdate(b.uuid, true, maxTries); // Advances through phases 2 - 6
} else {
update = (UniqueUpdate) completeOngoing(maxTries);
}
@@ -79,12 +81,17 @@
"Cannot increment sequence file " + path + " after " + maxTries + " tries.");
}
- /** Do NOT spin on this, it creates a new transaction every time. */
+ /**
+ * Attempt once to increment the sequence (advance through all 6 phases).
+ *
+ * <p>Do NOT spin on this, it creates a new transaction every time!
+ */
protected Long increment() throws IOException {
try (UpdateBuilder b = new UpdateBuilder(paths)) {
+ // Phase 1
if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
- // update/<uuid>/
- UniqueUpdate update = new UniqueUpdate(b.uuid, true, 1);
+ // now there should be: update/<uuid>/
+ UniqueUpdate update = new UniqueUpdate(b.uuid, true, 1); // Advances through phases 2 - 6
if (update.myCommit) {
return update.next;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java
index 6f6f922..ee1e9fa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java
@@ -27,11 +27,19 @@
* node/process (Multi-Master) safe. The 6 phases of the transaction are:
*
* <p>1) Intiate a unique ownerless transaction, locking and preventing the value from changing
- * while the transaction is still open. 2) Read the current locked file value 3) Create a proposed
- * update file containing the new proposed value based on the value read in phase 2. 4) Close the
- * transaction with the new proposed value, locking in the new value in the transaction. 5)
- * Atomically commit the new value to the locked file from the transaction. 6) Clean up the
- * transaction
+ * while the transaction is still open.
+ *
+ * <p>2) Read the current locked file value
+ *
+ * <p>3) Create a proposed update file containing the new proposed value based on the value read in
+ * phase 2.
+ *
+ * <p>4) Close the transaction with the new proposed value, locking in the new value in the
+ * transaction.
+ *
+ * <p>5) Atomically commit the new value to the locked file from the transaction.
+ *
+ * <p>6) Clean up the transaction
*
* <p>Any actor may perform any/all of the above phases.
*/
@@ -53,32 +61,32 @@
}
}
+ /** For Phase 1 */
protected static class UpdateBuilder extends FsTransaction.Builder {
String uuid = UUID.randomUUID().toString();
Path udir = dir.resolve(uuid);
public UpdateBuilder(BasePaths paths) throws IOException {
super(paths);
- Files.createDirectories(udir);
- // build/<tmp>/uuid/
+ Files.createDirectories(udir); // build/<tmp>/<uuid>/
}
}
+ /** For Phase 3 */
protected static class NextBuilder extends FsTransaction.Builder {
public NextBuilder(BasePaths paths, String next) throws IOException {
super(paths);
Path closed = dir.resolve(CLOSED);
Files.createDirectory(closed);
- FileValue.prepare(closed.resolve(VALUE), next);
- // build/<tmp>/closed/value(next)
+ FileValue.prepare(closed.resolve(VALUE), next); // build/<tmp>/closed/value(next)
}
}
+ /** Used to create the first value in the FS. Can handle contention, and existing values. */
protected static class InitBuilder extends FsTransaction.Builder {
public InitBuilder(BasePaths paths, String init) throws IOException {
super(paths);
- FileValue.prepare(dir.resolve(INIT), init);
- // build/<tmp>/init(init)
+ FileValue.prepare(dir.resolve(INIT), init); // build/<tmp>/init(init)
}
}
@@ -94,7 +102,7 @@
}
}
- /** This helper class may only be used by one thread. */
+ /** Phase 2 -6 helper. */
protected static class UniqueUpdate<T> {
final UpdatableFileValue<T> updatable;
final String uuid;
@@ -113,6 +121,7 @@
boolean myCommit;
+ /** Advance through phase 2 */
UniqueUpdate(UpdatableFileValue<T> updatable, String uuid, boolean ours, long maxTries)
throws IOException {
this.updatable = updatable;
@@ -126,28 +135,31 @@
next = currentValue == null ? null : updatable.getToValue(currentValue);
}
+ /** Spin attempting phases 3 - 6 */
protected void spinFinish() throws IOException {
for (; tries < maxTries && !finished; tries++) {
finish();
}
}
+ /** Attempt advance through phases 3 - 6 */
protected void finish() throws IOException {
createAndProposeNext();
commit();
clean();
}
+ /** Contains phase 2 */
protected T spinGet() throws IOException {
IOException ioe = new IOException("No chance to read " + updatable.path);
for (; tries < maxTries; tries++) {
try {
- return updatable.get();
+ return updatable.get(); // Phase 2
} catch (IOException e) {
Nfs.throwIfNotStaleFileHandle(e);
finished = !Files.exists(upaths.udir);
if (finished) {
- // stale handle must have been caused by completion by another
+ // stale handle must have been caused by another actor completing instead
return null;
}
ioe = e;
@@ -156,6 +168,7 @@
throw ioe;
}
+ /** Contains phases 3 & 4 */
protected void createAndProposeNext() throws IOException {
if (!closed && !ours) {
// In the default fast path (!closed && ours), we would not expect
@@ -166,22 +179,23 @@
}
if (!closed) {
try (NextBuilder b =
- new NextBuilder(updatable.paths, updatable.serializer.fromGeneric(next))) {
- // build/<tmp>/ -> update/<uuid>/
- Fs.tryAtomicMove(b.dir, upaths.udir);
- // update/<uuid>/closed/value(next)
+ new NextBuilder(updatable.paths, updatable.serializer.fromGeneric(next))) { // Phase 3
+ // Phase 4
+ Fs.tryAtomicMove(b.dir, upaths.udir); // rename build/<tmp>/ -> update/<uuid>/
+ // now there should be: update/<uuid>/closed/value(next)
}
// Do not use the result of the move to determine if it is closed.
- // The move result could provide false positives (a second move
- // could succeed after the transaction has been finished and the
- // first "closed" has been deleted under the "delete" dir).
- // Additionally, this check allows us to be able to detect closes
- // by other actors, not just ourselves.
+ // The move result could provide false positives due to some filesystem
+ // implementions allowing a second move to succeed after the transaction
+ // has been finished and the first "closed" has been deleted under the
+ // "delete" dir). Additionally, this check allows us to be able to detect
+ // closes by other actors, not just ourselves.
closed = Files.exists(upaths.closed);
}
}
+ /** Contains phase 5 */
protected void commit() throws IOException {
if (!committed) {
// Safe to perform this block (for performance reasons) even if we
@@ -189,23 +203,25 @@
// when closed (operations depend on "closed" in paths).
perserve();
- // mv update/<uuid>/closed/value(next) -> value
- committed = myCommit = Fs.tryAtomicMove(upaths.value, updatable.path);
+ // rename update/<uuid>/closed/value(next) -> value
+ committed = myCommit = Fs.tryAtomicMove(upaths.value, updatable.path); // Phase 5
+ // now there should be: update/<uuid>/closed/ and: value (file)
}
if (!committed && closed) {
committed = !Files.exists(upaths.value);
}
}
+ /** Contains phase 6 */
protected void clean() throws IOException {
if (committed) {
- FsTransaction.renameAndDeleteUnique(upaths.udir, updatable.paths.delete);
+ FsTransaction.renameAndDeleteUnique(upaths.udir, updatable.paths.delete); // Phase 6
updatable.cleanPreserved();
}
finished = !Files.exists(upaths.udir);
}
- /**
+ /*
* Creating an extra hard link to future "value" files keeps a filesystem reference to them
* after the "value" file is replaced with a new "value" file. Keeping the reference around
* allows readers on other nodes to still read the contents of the file without experiencing a
@@ -231,11 +247,12 @@
Files.createDirectories(paths.preserved);
while (!Files.exists(path)) {
try (InitBuilder b = new InitBuilder(paths, serializer.fromGeneric(init))) {
- Fs.tryAtomicMove(b.dir, paths.update); // mv build/<tmp>/ -> update/
- // update/init(init) using a non unique name, "init", to allow recovery
+ Fs.tryAtomicMove(b.dir, paths.update); // rename build/<tmp>/ -> update/
+ // now there should be: update/init(init_value)
if (!Files.exists(path)) {
- // mv update/init(init) -> value
- Fs.tryAtomicMove(paths.update.resolve(INIT), path);
+ // using a non unique name, "init", to allow recovery below
+ Fs.tryAtomicMove(paths.update.resolve(INIT), path); // rename update/init(init) -> value
+ // now there should be: update/ and: value (file)
}
}
}
@@ -271,11 +288,9 @@
protected abstract UniqueUpdate<T> createUniqueUpdate(String uuid, boolean ours, long maxTries)
throws IOException;
- /**
- * 1 second seems to be long enough even for slow readers (over a WAN) under high contention
- * ("value" file being updated by a fast writer), to avoid spinning on reads most of the time.
- */
protected void cleanPreserved() {
+ // 1 second seems to be long enough even for slow readers (over a WAN) under high contention
+ // ("value" file being updated by a fast writer), to avoid spinning on reads most of the time.
FileTime expiry = Fs.getFileTimeAgo(1, TimeUnit.SECONDS);
Fs.tryRecursiveDeleteEntriesOlderThan(paths.preserved, expiry, 5);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
index 7e78875..605901d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
@@ -222,6 +222,8 @@
*
* <p>Local(spinning) 1 workers 1M 14.34s ~14us/event find events|wc -l 1.3s rm -rf 42s
*
+ * <p>Multi workers: NFS(LowLatency,LAN,SSDs) 8 hosts count=1000 (each) avg 273s 1000/34s
+ *
* <p>Mixed workers: NFS(WAN) 1 worker (+NFS LAN continuous) count=10, 3m28s
*/
public static void main(String[] argv) throws Exception {