Merge "Add Zuul configuration to run tests on the CI node"
diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs
new file mode 100644
index 0000000..839d647
--- /dev/null
+++ b/.settings/org.eclipse.core.resources.prefs
@@ -0,0 +1,5 @@
+eclipse.preferences.version=1
+encoding//src/main/java=UTF-8
+encoding//src/main/resources=UTF-8
+encoding//src/test/java=UTF-8
+encoding/<project>=UTF-8
diff --git a/.settings/org.eclipse.jdt.apt.core.prefs b/.settings/org.eclipse.jdt.apt.core.prefs
new file mode 100644
index 0000000..d4313d4
--- /dev/null
+++ b/.settings/org.eclipse.jdt.apt.core.prefs
@@ -0,0 +1,2 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.apt.aptEnabled=false
diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs
new file mode 100644
index 0000000..1b6e1ef
--- /dev/null
+++ b/.settings/org.eclipse.jdt.core.prefs
@@ -0,0 +1,9 @@
+eclipse.preferences.version=1
+org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
+org.eclipse.jdt.core.compiler.compliance=1.8
+org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
+org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
+org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
+org.eclipse.jdt.core.compiler.processAnnotations=disabled
+org.eclipse.jdt.core.compiler.release=disabled
+org.eclipse.jdt.core.compiler.source=1.8
diff --git a/WORKSPACE b/WORKSPACE
index a1ce3b2..c156344 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -3,7 +3,7 @@
load("//:bazlets.bzl", "load_bazlets")
load_bazlets(
- commit = "f96f4bce9ffafeaa200fc009a378921c512fcb0a",
+ commit = "7ff4605f48db148197675a0d2ea41ee07cb72fd3",
)
load(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
index dd1463f..e23c9e2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
@@ -15,10 +15,9 @@
package com.googlesource.gerrit.plugins.events;
import com.google.common.base.Supplier;
-import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.EntitiesAdapterFactory;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.change.ChangeKeyAdapter;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectNameKeyAdapter;
@@ -39,7 +38,7 @@
new GsonBuilder()
.registerTypeAdapter(Supplier.class, new SupplierSerializer())
.registerTypeAdapter(Project.NameKey.class, new ProjectNameKeyAdapter())
- .registerTypeAdapter(Change.Key.class, new ChangeKeyAdapter())
+ .registerTypeAdapterFactory(EntitiesAdapterFactory.create())
.create();
protected final DynamicSet<StreamEventListener> listeners;
protected final EventStore store;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
index b424923..7a8c198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
@@ -15,10 +15,12 @@
package com.googlesource.gerrit.plugins.events;
import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.CapabilityScope;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.annotations.RequiresCapability;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.server.DynamicOptions;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.sshd.BaseCommand;
@@ -38,7 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@RequiresCapability(GlobalCapability.STREAM_EVENTS)
+@RequiresCapability(value = GlobalCapability.STREAM_EVENTS, scope = CapabilityScope.CORE)
@CommandMetaData(name = "stream", description = "Monitor events occurring in real time")
public final class StreamEvents extends BaseCommand {
private static final Logger log = LoggerFactory.getLogger(StreamEvents.class);
@@ -54,7 +56,7 @@
)
protected void parseId(String arg) throws IOException {
resume = 0;
- if (arg.equals("0")) {
+ if ("0".equals(arg)) {
return;
}
@@ -102,24 +104,27 @@
@Override
public void start(ChannelSession channel, Environment env) throws IOException {
- try {
- parseCommandLine();
- } catch (UnloggedFailure e) {
- String msg = e.getMessage();
- if (!msg.endsWith("\n")) {
- msg += "\n";
+ try (DynamicOptions pluginOptions =
+ new DynamicOptions(injector, dynamicBeans)) {
+ try {
+ parseCommandLine(pluginOptions);
+ } catch (UnloggedFailure e) {
+ String msg = e.getMessage();
+ if (!msg.endsWith("\n")) {
+ msg += "\n";
+ }
+ err.write(msg.getBytes("UTF-8"));
+ err.flush();
+ onExit(1);
+ return;
}
- err.write(msg.getBytes("UTF-8"));
- err.flush();
- onExit(1);
- return;
- }
- stdout = toPrintWriter(out);
+ stdout = toPrintWriter(out);
- initSent();
- flusherRunnable = createFlusherRunnable();
- subscribe();
- startFlush();
+ initSent();
+ flusherRunnable = createFlusherRunnable();
+ subscribe();
+ startFlush();
+ }
}
protected CancelableRunnable createFlusherRunnable() {
@@ -155,10 +160,8 @@
protected void startFlush() throws IOException {
synchronized (crossThreadlock) {
- if (flusherTask == null && !shuttingDown) {
- if (sent < events.getHead()) {
- flusherTask = threadPool.submit(flusherRunnable);
- }
+ if (!isFlushing() && !shuttingDown && !isUpToDate()) {
+ flusherTask = threadPool.submit(flusherRunnable);
}
}
}
@@ -178,7 +181,7 @@
synchronized (crossThreadlock) {
boolean alreadyShuttingDown = shuttingDown;
shuttingDown = true;
- if (flusherTask != null) {
+ if (isFlushing()) {
flusherTask.cancel(true);
} else if (!alreadyShuttingDown) {
onExit(0);
@@ -211,8 +214,7 @@
protected void flushBatch() throws IOException {
String uuid = events.getUuid().toString();
int processed = 0;
- long head = events.getHead();
- while (sent < head && processed < BATCH_SIZE) {
+ while (!isUpToDate() && processed < BATCH_SIZE) {
long sending = sent + 1;
String event = events.get(sending);
if (Thread.interrupted() || stdout.checkError()) {
@@ -248,4 +250,12 @@
stdout.flush();
}
}
+
+ protected boolean isUpToDate() throws IOException {
+ return sent >= events.getHead();
+ }
+
+ protected boolean isFlushing() {
+ return flusherTask != null;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
index d894816..9ab49fd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequence.java
@@ -22,28 +22,34 @@
* Use a file to store a sequence in a multi node/process (Multi-Master) safe way, and allow an
* event to be delivered for each update to the sequence.
*
+ * <p>Adds phase 1+, add an event file in the <uuid> dir.
+ *
+ * <p>Adds phase 2+, move the event to the event store.
+ *
* <p>The event submitter must perform the first phase of the UpdatableFileValue transaction by
* initiating the ownerless transaction with an event file in it. Any actor may perform the
- * remaining 5 UpdatableFileValue transaction phases.
+ * remaining 5 UpdatableFileValue.
*/
public class EventSequence extends UpdatableFileValue<Long> {
public static final Path EVENT = Paths.get("event");
+ /** For phase 1 - 1+ */
protected static class EventBuilder extends UpdatableFileValue.UpdateBuilder {
public EventBuilder(BasePaths paths, String event) throws IOException {
super(paths);
- FileValue.prepare(udir.resolve(EVENT), event);
- // build/<tmp>/<uuid>/event
+ FileValue.prepare(next.resolve(EVENT), event); // Phase 1+
+ // build/<tmp>/<uuid>/next/event
}
}
protected class UniqueUpdate extends UpdatableFileValue.UniqueUpdate<Long> {
- final Path event;
- Path destination;
+ protected final Path event;
+ protected Path destination;
- UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
+ /** Advance through phases 2 - 6 */
+ protected UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
super(EventSequence.this, uuid, ours, maxTries);
- event = upaths.udir.resolve(EVENT);
+ event = upaths.next.resolve(EVENT);
spinFinish();
}
@@ -53,10 +59,12 @@
super.finish();
}
+ /** Contains phase 2+ */
protected void storeEvent() throws IOException {
Path destination = getEventDestination(next);
- if (Fs.tryAtomicMove(event, destination)) {
- // update/<uuid>/event -> destination
+ // Phase 2+
+ if (Fs.tryAtomicMove(event, destination)) { // rename update/<uuid>/next/event -> destination
+ // now there should be: update/<uuid>/next/ and destination (file)
this.destination = destination;
}
}
@@ -73,16 +81,18 @@
initFs((long) 0);
}
+ // Advance through phases 1 - 6
protected UniqueUpdate spinSubmit(String event, long maxTries) throws IOException {
try (EventBuilder b = new EventBuilder(paths, event)) {
for (long tries = 0; tries < maxTries; tries++) {
- if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
- // update/<uuid>/event
+ // Phase 1 (can only succeed if update is empty or non-existant)
+ if (Fs.tryAtomicMove(b.dir, paths.update)) { // rename build/<tmp>/ -> update/
+ // now there should be: update/<uuid>/next/event
synchronized (this) {
totalUpdates++;
totalSpins += tries - 1;
}
- return createUniqueUpdate(b.uuid, true, maxTries - tries);
+ return createUniqueUpdate(b.uuid, true, maxTries - tries); // Advance phases 2 - 6
}
UniqueUpdate update = (UniqueUpdate) completeOngoing(maxTries - tries);
@@ -98,12 +108,14 @@
return currentValue + 1;
}
+ @Override
protected UniqueUpdate createUniqueUpdate(String uuid, boolean ours, long maxTries)
throws IOException {
return new UniqueUpdate(uuid, ours, maxTries);
}
+ /** Override to shard */
protected Path getEventDestination(Long n) {
- return paths.base.resolve(EVENT);
+ return paths.base.resolve(EVENT).resolve(n.toString());
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FileValue.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FileValue.java
index 2572c31..05c7770 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FileValue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FileValue.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.events.fsstore;
import java.io.IOException;
-import java.nio.file.Files;
import java.nio.file.Path;
/** Helper file for serialzing and storing a single type in a file */
@@ -44,9 +43,10 @@
/**
* Auto setup the serializer based on the type used to initialize the class.
*
- * <p>Must be called with a supported type before use if a serializer has been set manualy. Safe
+ * <p>Must be called with a supported type before use if a serializer has been set manually. Safe
* to call if the Serializer was already set.
*/
+ @SuppressWarnings("unchecked") // we check the type of init, so these casts are safe
protected void initSerializer(T init) {
if (serializer == null) {
if (init instanceof String) {
@@ -77,7 +77,7 @@
/** The lowest level raw String read of the file */
protected String read() throws IOException {
- return Fs.readFile(path);
+ return Fs.readUtf8(path);
}
/** Serialize object to given tmp file in preparation to call update() */
@@ -92,6 +92,6 @@
/** Low level raw string write to given tmp file in preparation to call update(). */
protected static void prepare(Path tmp, String s) throws IOException {
- Files.write(tmp, s.getBytes());
+ Fs.writeUtf8(tmp, s);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Fs.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Fs.java
index 4b162ce..1a21941 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Fs.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Fs.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileVisitResult;
@@ -78,7 +79,7 @@
/**
* Try to recursively delete entries, up to max count, in a dir older than expiry. Do NOT throw
- * IOExceptions.
+ * IOExceptions or DirectoryIteratorExceptions.
*
* @return whether all entries were deleted
*/
@@ -93,12 +94,18 @@
}
}
} catch (IOException e) { // Intent of 'try' function is to ignore these.
+ } catch (DirectoryIteratorException e) {
+ // dir was deleted by another actor, thus so were all its entries
}
return true;
}
- /** Are all entries in a directory tree older than expiry? Do NOT throw IOExceptions. */
- public static boolean isAllEntriesOlderThan(Path dir, FileTime expiry) {
+ /**
+ * Are all entries in a directory tree older than expiry?
+ *
+ * @throws IOException
+ */
+ public static boolean isAllEntriesOlderThan(Path dir, FileTime expiry) throws IOException {
if (!isOlderThan(dir, expiry)) {
return false;
}
@@ -109,8 +116,8 @@
}
}
} catch (NotDirectoryException e) { // can't recurse if not a directory
- } catch (IOException e) {
- return false; // Modified after start, so not older
+ } catch (DirectoryIteratorException e) {
+ throw e.getCause(); // Throw the causal checked exception
}
return true;
}
@@ -154,12 +161,27 @@
}
/** Read the contents of a UTF_8 encoded file as a String */
- public static String readFile(Path file) throws IOException {
- StringBuffer buffer = new StringBuffer();
- for (String line : Files.readAllLines(file, StandardCharsets.UTF_8)) {
- buffer.append(line);
+ public static String readUtf8(Path file) throws IOException {
+ try {
+ return readUtf8Unsafe(file);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ // A concurrent update can cause this, make sure Exception becomes checked.
+ throw new IOException("File modified or deleted during read.", e);
}
- return buffer.toString();
+ }
+
+ /** Read the contents of a UTF_8 encoded file as a String */
+ protected static String readUtf8Unsafe(Path file) throws IOException {
+ StringBuilder builder = new StringBuilder();
+ for (String line : Files.readAllLines(file, StandardCharsets.UTF_8)) {
+ builder.append(line);
+ }
+ return builder.toString();
+ }
+
+ /** Write the contents of a String as a UTF_8 encoded file */
+ public static void writeUtf8(Path file, String s) throws IOException {
+ Files.write(file, s.getBytes(StandardCharsets.UTF_8));
}
/** Get the first entry in a directory. */
@@ -167,6 +189,8 @@
try (DirectoryStream<Path> dirEntries = Files.newDirectoryStream(dir)) {
Iterator<Path> it = dirEntries.iterator();
return it.hasNext() ? it.next() : null;
+ } catch (DirectoryIteratorException e) {
+ throw e.getCause(); // Throw the causal checked exception
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java
index 76d4046..5b7135e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequence.java
@@ -25,6 +25,7 @@
* value.
*/
public class FsSequence extends UpdatableFileValue<Long> {
+ /** Advance through phases 2 - 6 */
protected class UniqueUpdate extends UpdatableFileValue.UniqueUpdate<Long> {
UniqueUpdate(String uuid, boolean ours, long maxTries) throws IOException {
super(FsSequence.this, uuid, ours, maxTries);
@@ -44,7 +45,7 @@
}
/**
- * Attempt up to maxTries to increment the sequence
+ * Attempt up to maxTries to increment the sequence (advance through all 6 phases).
*
* @param maxTries How many times to attempt to increment the sequence
* @return the new sequence value after this increment.
@@ -55,9 +56,10 @@
try (UpdateBuilder b = new UpdateBuilder(paths)) {
for (; tries < maxTries; tries++) {
UniqueUpdate update = null;
- if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
- update = new UniqueUpdate(b.uuid, true, maxTries);
- // update/<uuid>/
+ // Phase 1
+ if (Fs.tryAtomicMove(b.dir, paths.update)) { // rename build/<tmp>/ -> update/
+ // now there should be: update/<uuid>/
+ update = new UniqueUpdate(b.uuid, true, maxTries); // Advances through phases 2 - 6
} else {
update = (UniqueUpdate) completeOngoing(maxTries);
}
@@ -79,12 +81,17 @@
"Cannot increment sequence file " + path + " after " + maxTries + " tries.");
}
- /** Do NOT spin on this, it creates a new transaction every time. */
+ /**
+ * Attempt once to increment the sequence (advance through all 6 phases).
+ *
+ * <p>Do NOT spin on this, it creates a new transaction every time!
+ */
protected Long increment() throws IOException {
try (UpdateBuilder b = new UpdateBuilder(paths)) {
+ // Phase 1
if (Fs.tryAtomicMove(b.dir, paths.update)) { // build/<tmp>/ -> update/
- // update/<uuid>/
- UniqueUpdate update = new UniqueUpdate(b.uuid, true, 1);
+ // now there should be: update/<uuid>/
+ UniqueUpdate update = new UniqueUpdate(b.uuid, true, 1); // Advances through phases 2 - 6
if (update.myCommit) {
return update.next;
}
@@ -102,6 +109,7 @@
return currentValue + 1;
}
+ @Override
protected UniqueUpdate createUniqueUpdate(String uuid, boolean ours, long maxTries)
throws IOException {
return new UniqueUpdate(uuid, ours, maxTries);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
index 08b0a3b..67c9ff6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsStore.java
@@ -48,6 +48,7 @@
this.paths = paths;
}
+ @Override
protected Path getEventDestination(Long n) {
Path event = paths.events.path(n);
try {
@@ -121,7 +122,7 @@
if (cachedTail.isLessThanOrEqualTo(num, MAX_GET_SPINS)
&& cachedHead.isGreaterThanOrEqualTo(num, MAX_GET_SPINS)) {
try {
- return Fs.readFile(paths.events.path(num));
+ return Fs.readUtf8(paths.events.path(num));
} catch (NoSuchFileException e) {
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsTransaction.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsTransaction.java
index 6884daa..9340bd0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsTransaction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/FsTransaction.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.events.fsstore;
import java.io.IOException;
+import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -68,7 +69,7 @@
public boolean clean(FileTime expiry, int max) {
try {
return Fs.tryRecursiveDeleteEntriesOlderThan(delete, expiry, max)
- || renameAndDeleteEntriesOlderThan(build, delete, expiry, max);
+ && renameAndDeleteEntriesOlderThan(build, delete, expiry, max);
} catch (IOException e) {
// If we knew if it was a repeat offender, we could consider logging it.
return true; // Don't keep retrying failures.
@@ -116,7 +117,7 @@
* Used to atomically delete a directory tree when the src directory name is guaranteed to be
* unique.
*/
- public static void renameAndDeleteUnique(Path src, Path del) throws IOException {
+ public static void renameAndDeleteUnique(Path src, Path del) {
Path reparented = Fs.reparent(src, del);
Fs.tryAtomicMove(src, reparented);
Fs.tryRecursiveDelete(reparented);
@@ -124,9 +125,10 @@
/**
* Used to atomically delete entries in a directory tree older than expiry, up to max count. Do
- * NOT throw IOExceptions.
+ * NOT throw DirectoryIteratorExceptions.
*
* @return whether all entries were deleted
+ * @throws IOException
*/
public static boolean renameAndDeleteEntriesOlderThan(
Path dir, Path del, FileTime expiry, int max) throws IOException {
@@ -139,7 +141,10 @@
renameAndDelete(path, del);
}
}
- return true;
+ } catch (DirectoryIteratorException e) {
+ // dir was deleted by another actor, thus so were all its entries
+ Nfs.throwIfNotStaleFileHandle(e.getCause());
}
+ return true;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Nfs.java
index aacb3a9..1454fbf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Nfs.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/Nfs.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.events.fsstore;
import java.io.IOException;
-import java.nio.file.DirectoryIteratorException;
import java.nio.file.Path;
import java.nio.file.attribute.FileTime;
import java.util.Locale;
@@ -58,23 +57,23 @@
}
/**
- * Is any entry in a directory tree older than expiry. Do NOT throw IOExceptions, or
- * DirectoryIteratorExceptions.
+ * Is any entry in a directory tree older than expiry
+ *
+ * @throws IOException
*/
- public static boolean isAllEntriesOlderThan(Path dir, FileTime expiry) {
+ public static boolean isAllEntriesOlderThan(Path dir, FileTime expiry) throws IOException {
try {
return Fs.isAllEntriesOlderThan(dir, expiry);
- } catch (DirectoryIteratorException e) {
- return false; // Modified after start, so not older
+ } catch (IOException e) {
+ throwIfNotStaleFileHandle(e);
}
+ return false; // Modified after start, so not older
}
/** Get the first entry in a directory. */
public static Path getFirstDirEntry(Path dir) throws IOException {
try {
return Fs.getFirstDirEntry(dir);
- } catch (DirectoryIteratorException e) {
- throwIfNotStaleFileHandle(e);
} catch (IOException e) {
throwIfNotStaleFileHandle(e);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/NfsFileValue.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/NfsFileValue.java
new file mode 100644
index 0000000..5be6a39
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/NfsFileValue.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events.fsstore;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Helper file for serialzing and storing a single type in a file on NFS.
+ *
+ * <p>Sacrifice read speed and success rates to avoid triggering bad NFS behavior when updating the
+ * file which can result in the file disappearing.
+ *
+ * <p>If a process on a Linux NFS client machine wants to perform an NFS rename while a process on
+ * the same machine has a handle to the target file (even just to read it), the Linux NFS driver
+ * will performa a NFS_silly_rename (it will rename the file to hopefully unique .nfs000000000xxx
+ * looking file) on the target file before performing the actual rename requested. The silly rename
+ * is to allow the process holding the file handle to not fail. This NFS_silly_rename is risky as it
+ * can result in the target file disappearing permanently if either a) the Linux NFS client machine
+ * dies before the second rename, or b) another Linux NFS client machine tries to do the same thing
+ * right after the source file was renamed to the target file by the first Linux NFS client.
+ *
+ * <p>This class tries to avoid this bad situation by never reading the 'path' file while updating
+ * it from the same process and thus never triggering the NFS_silly_rename on the 'path' file on
+ * ourselves. As long as the value file is only accessed by one java process per physical machine,
+ * using the same object, the 'path' file should never get deleted unexpectedly.
+ */
+public class NfsFileValue<T> extends FileValue<T> {
+ protected final Object pathLock = new Object(); // Use pathLock when reading or updating.
+
+ /**
+ * Use this constructor to use a builtin serializer (String or Long), and be sure to call init(T)
+ * with a value of your type for the serializer auto identification to happen.
+ */
+ public NfsFileValue(Path path) {
+ this(path, (Serializer<T>) null);
+ }
+
+ public NfsFileValue(Path path, Serializer<T> serializer) {
+ super(path, serializer);
+ }
+
+ @Override
+ protected String read() throws IOException {
+ synchronized (pathLock) {
+ return super.read();
+ }
+ }
+
+ @Override
+ protected boolean update(Path src) throws IOException {
+ synchronized (pathLock) {
+ return Fs.tryAtomicMove(src, path);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java
index 6f6f922..67d51b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/fsstore/UpdatableFileValue.java
@@ -27,17 +27,26 @@
* node/process (Multi-Master) safe. The 6 phases of the transaction are:
*
* <p>1) Intiate a unique ownerless transaction, locking and preventing the value from changing
- * while the transaction is still open. 2) Read the current locked file value 3) Create a proposed
- * update file containing the new proposed value based on the value read in phase 2. 4) Close the
- * transaction with the new proposed value, locking in the new value in the transaction. 5)
- * Atomically commit the new value to the locked file from the transaction. 6) Clean up the
- * transaction
+ * while the transaction is still open.
+ *
+ * <p>2) Read the current locked file value
+ *
+ * <p>3) Create a proposed update file containing the new proposed value based on the value read in
+ * phase 2.
+ *
+ * <p>4) Close the transaction with the new proposed value, locking in the new value in the
+ * transaction.
+ *
+ * <p>5) Atomically commit the new value to the locked file from the transaction.
+ *
+ * <p>6) Clean up the transaction
*
* <p>Any actor may perform any/all of the above phases.
*/
-public abstract class UpdatableFileValue<T> extends FileValue<T> {
+public abstract class UpdatableFileValue<T> extends NfsFileValue<T> {
public static final Path CLOSED = Paths.get("closed");
public static final Path INIT = Paths.get("init");
+ public static final Path NEXT = Paths.get("next");
public static final Path PRESERVED = Paths.get("preserved");
public static final Path UPDATE = Paths.get("update");
public static final Path VALUE = Paths.get("value");
@@ -53,67 +62,73 @@
}
}
+ /** For Phase 1 */
protected static class UpdateBuilder extends FsTransaction.Builder {
- String uuid = UUID.randomUUID().toString();
- Path udir = dir.resolve(uuid);
+ public final String uuid = UUID.randomUUID().toString();
+ public final Path udir = dir.resolve(uuid);
+ public final Path next;
public UpdateBuilder(BasePaths paths) throws IOException {
super(paths);
- Files.createDirectories(udir);
- // build/<tmp>/uuid/
+ next = udir.resolve(NEXT);
+ Files.createDirectories(next); // build/<tmp>/<uuid>/next
}
}
+ /** For Phase 3 */
protected static class NextBuilder extends FsTransaction.Builder {
public NextBuilder(BasePaths paths, String next) throws IOException {
super(paths);
Path closed = dir.resolve(CLOSED);
Files.createDirectory(closed);
- FileValue.prepare(closed.resolve(VALUE), next);
- // build/<tmp>/closed/value(next)
+ FileValue.prepare(closed.resolve(VALUE), next); // build/<tmp>/closed/value(next)
}
}
+ /** Used to create the first value in the FS. Can handle contention, and existing values. */
protected static class InitBuilder extends FsTransaction.Builder {
public InitBuilder(BasePaths paths, String init) throws IOException {
super(paths);
- FileValue.prepare(dir.resolve(INIT), init);
- // build/<tmp>/init(init)
+ FileValue.prepare(dir.resolve(INIT), init); // build/<tmp>/init(init)
}
}
protected static class UpdatePaths {
public final Path udir;
+ public final Path next;
public final Path closed;
public final Path value;
- UpdatePaths(Path base, String uuid) {
+ protected UpdatePaths(Path base, String uuid) {
udir = base.resolve(uuid);
- closed = udir.resolve(CLOSED);
+ next = udir.resolve(NEXT);
+ closed = next.resolve(CLOSED);
value = closed.resolve(VALUE);
}
}
- /** This helper class may only be used by one thread. */
+ /** Phase 2 -6 helper. */
protected static class UniqueUpdate<T> {
- final UpdatableFileValue<T> updatable;
- final String uuid;
- final UpdatePaths upaths;
- final boolean ours;
- final T currentValue;
- final T next;
+ protected final UpdatableFileValue<T> updatable;
+ protected final String uuid;
+ protected final UpdatePaths upaths;
+ protected final boolean ours;
+ protected final T currentValue;
+ protected final T next;
- long maxTries;
+ protected long maxTries;
- long tries;
- boolean closed;
- boolean preserved;
- boolean committed;
- boolean finished;
+ protected long tries;
+ protected boolean closed;
+ protected boolean preserved;
+ protected boolean committed;
+ protected boolean finished;
- boolean myCommit;
+ protected boolean myCommit;
- UniqueUpdate(UpdatableFileValue<T> updatable, String uuid, boolean ours, long maxTries)
+ /** Advance through phase 2 */
+ protected UniqueUpdate(
+ UpdatableFileValue<T> updatable, String uuid, boolean ours, long maxTries)
throws IOException {
this.updatable = updatable;
this.uuid = uuid;
@@ -126,28 +141,31 @@
next = currentValue == null ? null : updatable.getToValue(currentValue);
}
+ /** Spin attempting phases 3 - 6 */
protected void spinFinish() throws IOException {
for (; tries < maxTries && !finished; tries++) {
finish();
}
}
+ /** Attempt advance through phases 3 - 6 */
protected void finish() throws IOException {
createAndProposeNext();
commit();
clean();
}
+ /** Contains phase 2 */
protected T spinGet() throws IOException {
IOException ioe = new IOException("No chance to read " + updatable.path);
for (; tries < maxTries; tries++) {
try {
- return updatable.get();
+ return updatable.get(); // Phase 2
} catch (IOException e) {
Nfs.throwIfNotStaleFileHandle(e);
finished = !Files.exists(upaths.udir);
if (finished) {
- // stale handle must have been caused by completion by another
+ // stale handle must have been caused by another actor completing instead
return null;
}
ioe = e;
@@ -156,6 +174,7 @@
throw ioe;
}
+ /** Contains phases 3 & 4 */
protected void createAndProposeNext() throws IOException {
if (!closed && !ours) {
// In the default fast path (!closed && ours), we would not expect
@@ -166,53 +185,58 @@
}
if (!closed) {
try (NextBuilder b =
- new NextBuilder(updatable.paths, updatable.serializer.fromGeneric(next))) {
- // build/<tmp>/ -> update/<uuid>/
- Fs.tryAtomicMove(b.dir, upaths.udir);
- // update/<uuid>/closed/value(next)
+ new NextBuilder(updatable.paths, updatable.serializer.fromGeneric(next))) { // Phase 3
+
+ // Phase 4. Rename can only succeed if update/<uuid>/next/ is empty (desired) or
+ // non-existent (not desired). The later case is detected after the move.
+ Fs.tryAtomicMove(b.dir, upaths.next); // rename build/<tmp>/ -> update/<uuid>/next/
+ // now there should be: update/<uuid>/next/closed/value(next)
}
// Do not use the result of the move to determine if it is closed.
- // The move result could provide false positives (a second move
- // could succeed after the transaction has been finished and the
- // first "closed" has been deleted under the "delete" dir).
- // Additionally, this check allows us to be able to detect closes
- // by other actors, not just ourselves.
+ // The move result could provide false positives due to some filesystem
+ // implementions allowing a second move to succeed after the transaction
+ // has been finished and the first "closed" has been deleted under the
+ // "delete" dir. Additionally, this check allows us to be able to detect
+ // closes by other actors, not just ourselves.
closed = Files.exists(upaths.closed);
}
}
+ /** Contains phase 5 */
protected void commit() throws IOException {
if (!committed) {
// Safe to perform this block (for performance reasons) even if we
// have not detected "closed yet", since it can only actually succeed
// when closed (operations depend on "closed" in paths).
- perserve();
+ preserve();
- // mv update/<uuid>/closed/value(next) -> value
- committed = myCommit = Fs.tryAtomicMove(upaths.value, updatable.path);
+ // rename update/<uuid>/next/closed/value(next) -> value
+ committed = myCommit = updatable.update(upaths.value); // Phase 5
+ // now there should be: update/<uuid>/next/closed/ and: value (file)
}
if (!committed && closed) {
committed = !Files.exists(upaths.value);
}
}
- protected void clean() throws IOException {
+ /** Contains phase 6 */
+ protected void clean() {
if (committed) {
- FsTransaction.renameAndDeleteUnique(upaths.udir, updatable.paths.delete);
+ FsTransaction.renameAndDeleteUnique(upaths.udir, updatable.paths.delete); // Phase 6
updatable.cleanPreserved();
}
finished = !Files.exists(upaths.udir);
}
- /**
+ /*
* Creating an extra hard link to future "value" files keeps a filesystem reference to them
* after the "value" file is replaced with a new "value" file. Keeping the reference around
* allows readers on other nodes to still read the contents of the file without experiencing a
* stale file handle exception over NFS. This can reduce the amount of spinning required for
* readers.
*/
- protected void perserve() {
+ protected void preserve() {
if (!preserved) {
preserved = Fs.tryCreateLink(updatable.paths.preserved.resolve(uuid), upaths.value);
}
@@ -231,11 +255,12 @@
Files.createDirectories(paths.preserved);
while (!Files.exists(path)) {
try (InitBuilder b = new InitBuilder(paths, serializer.fromGeneric(init))) {
- Fs.tryAtomicMove(b.dir, paths.update); // mv build/<tmp>/ -> update/
- // update/init(init) using a non unique name, "init", to allow recovery
+ Fs.tryAtomicMove(b.dir, paths.update); // rename build/<tmp>/ -> update/
+ // now there should be: update/init(init_value)
if (!Files.exists(path)) {
- // mv update/init(init) -> value
- Fs.tryAtomicMove(paths.update.resolve(INIT), path);
+ // using a non unique name, "init", to allow recovery below
+ Fs.tryAtomicMove(paths.update.resolve(INIT), path); // rename update/init(init) -> value
+ // now there should be: update/ and: value (file)
}
}
}
@@ -248,14 +273,14 @@
if (shouldCompleteOngoing()) {
Path ongoing = Nfs.getFirstDirEntry(paths.update);
if (ongoing != null) {
- // Attempt to complete previous updates;
+ // Attempt to complete previous updates
return createUniqueUpdate(Fs.basename(ongoing).toString(), false, maxTries);
}
}
return null;
}
- protected boolean shouldCompleteOngoing() {
+ protected boolean shouldCompleteOngoing() throws IOException {
// Collisions are expected, and we don't actually want to
// complete them too often since it affects fairness
// by potentially preventing slower actors from ever
@@ -265,17 +290,15 @@
// Maximum delay incurred due to a server crash.
FileTime expiry = Fs.getFileTimeAgo(10, TimeUnit.SECONDS);
- return Fs.isAllEntriesOlderThan(paths.update, expiry);
+ return Nfs.isAllEntriesOlderThan(paths.update, expiry);
}
protected abstract UniqueUpdate<T> createUniqueUpdate(String uuid, boolean ours, long maxTries)
throws IOException;
- /**
- * 1 second seems to be long enough even for slow readers (over a WAN) under high contention
- * ("value" file being updated by a fast writer), to avoid spinning on reads most of the time.
- */
protected void cleanPreserved() {
+ // 1 second seems to be long enough even for slow readers (over a WAN) under high contention
+ // ("value" file being updated by a fast writer), to avoid spinning on reads most of the time.
FileTime expiry = Fs.getFileTimeAgo(1, TimeUnit.SECONDS);
Fs.tryRecursiveDeleteEntriesOlderThan(paths.preserved, expiry, 5);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
index 5e0fb94..23fe774 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/EventSequenceTest.java
@@ -37,7 +37,14 @@
if (myBase == null) {
myBase = Files.createTempDirectory(dir);
}
- seq = new EventSequence(myBase);
+ seq =
+ new EventSequence(myBase) {
+ @Override
+ protected Path getEventDestination(Long n) {
+ return paths.base.resolve(EVENT);
+ }
+ };
+
seq.initFs((long) 0);
}
@@ -59,6 +66,6 @@
EventSequence.UniqueUpdate up = seq.spinSubmit(event, maxSpins);
assertEquals(next, seq.get());
assertNotNull(up.destination);
- assertEquals(event, Fs.readFile(up.destination));
+ assertEquals(event, Fs.readUtf8(up.destination));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequenceTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequenceTest.java
index 4d809c1..68c89cd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequenceTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsSequenceTest.java
@@ -62,7 +62,7 @@
@Test
public void testSpinIncrement() throws IOException {
long next = seq.get() + (long) 1;
- assertEquals(next, (long) seq.spinIncrement(1));
+ assertEquals(next, seq.spinIncrement(1));
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
index 7e78875..80fa5d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/events/fsstore/FsStoreTest.java
@@ -20,6 +20,9 @@
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -29,14 +32,17 @@
import org.junit.Test;
public class FsStoreTest extends TestCase {
- private static String dir = "events-FsStore";
- private static Path base;
- private Path myBase;
- private FsStore store;
- private String submitMarker = "";
+ public static String dir = "events-FsStore";
+ public static Path base;
+ public static List<OneThread> threads = new LinkedList<>();
- private long count = 1000;
- Map<String, Long> reported = new HashMap<String, Long>();
+ public Path myBase;
+ public FsStore store;
+ public String id = UUID.randomUUID().toString();
+ public String submitMarker = "";
+ public long count = 1000;
+
+ public Map<String, Long> reported = new HashMap<String, Long>();
@Override
@Before
@@ -143,14 +149,14 @@
}
}
- public void count(String id) throws Exception {
+ public void count() throws Exception {
for (long i = 1; i <= count; i++) {
store.add(id + " " + i);
System.out.print(submitMarker);
}
}
- public boolean verify(String id, long head) throws Exception {
+ public boolean verify(long head) throws Exception {
Set<Long> found = new HashSet<Long>();
long stop = store.getHead();
long mine = 1;
@@ -205,6 +211,30 @@
return false;
}
+ public static class OneThread implements Runnable {
+ public FsStoreTest test = new FsStoreTest();
+ public Boolean result;
+
+ @Override
+ public void run() {
+ try {
+ test.setUp();
+ long head = test.store.getHead();
+ test.count();
+ result = !test.verify(head);
+ } catch (Exception e) {
+ }
+ if (threads.size() > 1) {
+ if (result != null && result) {
+ System.out.println("\nPASS " + test.id);
+ } else {
+ result = false;
+ System.out.println("\nFAIL " + test.id);
+ }
+ }
+ }
+ }
+
/**
* First, make the junit jar easily available
*
@@ -213,40 +243,102 @@
* <p>To run type:
*
* <p>java -cp target/classes:target/test-classes:target/junit-4.8.1.jar \
- * com.googlesource.gerrit.plugins.events.fsstore.FsStoreTest \ [dir [count [store-id]]]
+ * com.googlesource.gerrit.plugins.events.fsstore.FsStoreTest [--id store-id] [dir [count]]
*
* <p>Note: if you do not specify <dir>, it will create a directory under /tmp
*
- * <p>Performance: NFS(Lowlatency,SSDs), 1 worker 1M, 266m ~16ms/event find events|wc -l 12s rm
- * -rf 1m49s du -sh -> 3.9G 1m7s
- *
- * <p>Local(spinning) 1 workers 1M 14.34s ~14us/event find events|wc -l 1.3s rm -rf 42s
- *
- * <p>Mixed workers: NFS(WAN) 1 worker (+NFS LAN continuous) count=10, 3m28s
+ * <p>Local(spinning) 1 worker 1M 28m2s ~17ms/event find events|wc -l .97 rm -rf 30s
*/
public static void main(String[] argv) throws Exception {
- if (argv.length > 0) {
- base = Paths.get(argv[0]);
- }
- FsStoreTest t = new FsStoreTest();
- if (argv.length > 1) {
- t.count = Long.parseLong(argv[1]);
+ List<String> args = new LinkedList<>();
+ for (String arg : argv) {
+ args.add(arg);
}
- String id = UUID.randomUUID().toString();
- t.submitMarker = ".";
- if (argv.length > 2) {
- id = argv[2];
- t.submitMarker += id + ".";
+ setThreadCount(1);
+ int thread = 0;
+ setSubmitMarker(".");
+ for (Iterator<String> it = args.iterator(); it.hasNext(); ) {
+ String arg = it.next();
+ if (it.hasNext()) {
+ if ("--id".equals(arg)) {
+ it.remove();
+ setId(thread++, it.next());
+ it.remove();
+ } else if ("-j".equals(arg) || "--threads".equals(arg)) {
+ it.remove();
+ setThreadCount(new Integer(it.next()));
+ it.remove();
+ }
+ }
}
- t.setUp();
- long head = t.store.getHead();
- t.count(id);
- if (t.verify(id, head)) {
+ if (args.size() > 0) {
+ base = Paths.get(args.remove(0));
+ }
+
+ if (args.size() > 0) {
+ setCount(Long.parseLong(args.remove(0)));
+ }
+
+ runThreads();
+
+ if (getResult()) {
+ System.out.println("\nPASS");
+ } else {
System.out.println("\nFAIL");
- System.exit(1);
}
- System.out.println("\nPASS");
+ }
+
+ private static void runThreads()
+ throws IllegalArgumentException, InterruptedException, SecurityException {
+ List<Thread> running = new LinkedList<>();
+
+ for (OneThread t : threads) {
+ Thread thread = new Thread(t);
+ thread.start();
+ running.add(thread);
+ }
+
+ for (Thread thread : running) {
+ thread.join();
+ }
+ }
+
+ private static boolean getResult() {
+ for (OneThread t : threads) {
+ if (t.result == null || !t.result) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void setCount(long count) {
+ for (OneThread t : threads) {
+ t.test.count = count;
+ }
+ }
+
+ private static void setSubmitMarker(String marker) {
+ for (OneThread t : threads) {
+ t.test.submitMarker = marker;
+ }
+ }
+
+ private static void setId(int n, String id) {
+ OneThread t = threads.get(n);
+ t.test.id = id;
+ t.test.submitMarker += id + ".";
+ }
+
+ private static void setThreadCount(int n) {
+ /* Use a common FsStore for all threads so that synchronized will work across them. */
+ FsStore commonStore = threads.get(0).test.store;
+ while (threads.size() < n) {
+ OneThread t = new OneThread();
+ t.test.store = commonStore;
+ threads.add(t);
+ }
}
}
diff --git a/test/docker/gerrit/Dockerfile b/test/docker/gerrit/Dockerfile
index 63ddb00..d6e7699 100755
--- a/test/docker/gerrit/Dockerfile
+++ b/test/docker/gerrit/Dockerfile
@@ -1,4 +1,4 @@
-FROM gerritcodereview/gerrit:3.2.10-ubuntu20
+FROM gerritcodereview/gerrit:3.4.0-ubuntu20
USER root
diff --git a/test/test_events_plugin.sh b/test/test_events_plugin.sh
index e76532a..f44d51e 100755
--- a/test/test_events_plugin.sh
+++ b/test/test_events_plugin.sh
@@ -244,7 +244,7 @@
result_type "$GROUP $type" "ref-updated" 2
type=change-abandoned
-capture_events 2
+capture_events 3
review "$ch1,1" --abandon
result_type "$GROUP" "$type"
@@ -271,7 +271,7 @@
result_type "$GROUP" "$type" 2
type=change-merged
-events_count=3
+events_count=4
# If reviewnotes plugin is installed, an extra event of type 'ref-updated'
# on 'refs/notes/review' is fired when a change is merged.
is_plugin_installed reviewnotes && events_count="$((events_count+1))"