blob: 6f6f922b462ad65696ef160d58560b840504f20a [file] [log] [blame]
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.events.fsstore;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* An infrastructure to create updates to a FileValue using a 6 phase transaction which is multi
* node/process (Multi-Master) safe. The 6 phases of the transaction are:
*
* <p>1) Intiate a unique ownerless transaction, locking and preventing the value from changing
* while the transaction is still open. 2) Read the current locked file value 3) Create a proposed
* update file containing the new proposed value based on the value read in phase 2. 4) Close the
* transaction with the new proposed value, locking in the new value in the transaction. 5)
* Atomically commit the new value to the locked file from the transaction. 6) Clean up the
* transaction
*
* <p>Any actor may perform any/all of the above phases.
*/
public abstract class UpdatableFileValue<T> extends FileValue<T> {
public static final Path CLOSED = Paths.get("closed");
public static final Path INIT = Paths.get("init");
public static final Path PRESERVED = Paths.get("preserved");
public static final Path UPDATE = Paths.get("update");
public static final Path VALUE = Paths.get("value");
public static class BasePaths extends FsTransaction.BasePaths {
public final Path update;
public final Path preserved;
public BasePaths(Path base) {
super(base);
update = base.resolve(UPDATE);
preserved = base.resolve(PRESERVED);
}
}
protected static class UpdateBuilder extends FsTransaction.Builder {
String uuid = UUID.randomUUID().toString();
Path udir = dir.resolve(uuid);
public UpdateBuilder(BasePaths paths) throws IOException {
super(paths);
Files.createDirectories(udir);
// build/<tmp>/uuid/
}
}
protected static class NextBuilder extends FsTransaction.Builder {
public NextBuilder(BasePaths paths, String next) throws IOException {
super(paths);
Path closed = dir.resolve(CLOSED);
Files.createDirectory(closed);
FileValue.prepare(closed.resolve(VALUE), next);
// build/<tmp>/closed/value(next)
}
}
protected static class InitBuilder extends FsTransaction.Builder {
public InitBuilder(BasePaths paths, String init) throws IOException {
super(paths);
FileValue.prepare(dir.resolve(INIT), init);
// build/<tmp>/init(init)
}
}
protected static class UpdatePaths {
public final Path udir;
public final Path closed;
public final Path value;
UpdatePaths(Path base, String uuid) {
udir = base.resolve(uuid);
closed = udir.resolve(CLOSED);
value = closed.resolve(VALUE);
}
}
/** This helper class may only be used by one thread. */
protected static class UniqueUpdate<T> {
final UpdatableFileValue<T> updatable;
final String uuid;
final UpdatePaths upaths;
final boolean ours;
final T currentValue;
final T next;
long maxTries;
long tries;
boolean closed;
boolean preserved;
boolean committed;
boolean finished;
boolean myCommit;
UniqueUpdate(UpdatableFileValue<T> updatable, String uuid, boolean ours, long maxTries)
throws IOException {
this.updatable = updatable;
this.uuid = uuid;
this.ours = ours;
this.maxTries = maxTries;
upaths = new UpdatePaths(updatable.paths.update, uuid);
currentValue = spinGet();
next = currentValue == null ? null : updatable.getToValue(currentValue);
}
protected void spinFinish() throws IOException {
for (; tries < maxTries && !finished; tries++) {
finish();
}
}
protected void finish() throws IOException {
createAndProposeNext();
commit();
clean();
}
protected T spinGet() throws IOException {
IOException ioe = new IOException("No chance to read " + updatable.path);
for (; tries < maxTries; tries++) {
try {
return updatable.get();
} catch (IOException e) {
Nfs.throwIfNotStaleFileHandle(e);
finished = !Files.exists(upaths.udir);
if (finished) {
// stale handle must have been caused by completion by another
return null;
}
ioe = e;
}
}
throw ioe;
}
protected void createAndProposeNext() throws IOException {
if (!closed && !ours) {
// In the default fast path (!closed && ours), we would not expect
// it to be closed, so skip this check to get to the building faster.
// Conversely, if not ours, a quick check here might allow us
// to skip the slow building phase
closed = Files.exists(upaths.closed);
}
if (!closed) {
try (NextBuilder b =
new NextBuilder(updatable.paths, updatable.serializer.fromGeneric(next))) {
// build/<tmp>/ -> update/<uuid>/
Fs.tryAtomicMove(b.dir, upaths.udir);
// update/<uuid>/closed/value(next)
}
// Do not use the result of the move to determine if it is closed.
// The move result could provide false positives (a second move
// could succeed after the transaction has been finished and the
// first "closed" has been deleted under the "delete" dir).
// Additionally, this check allows us to be able to detect closes
// by other actors, not just ourselves.
closed = Files.exists(upaths.closed);
}
}
protected void commit() throws IOException {
if (!committed) {
// Safe to perform this block (for performance reasons) even if we
// have not detected "closed yet", since it can only actually succeed
// when closed (operations depend on "closed" in paths).
perserve();
// mv update/<uuid>/closed/value(next) -> value
committed = myCommit = Fs.tryAtomicMove(upaths.value, updatable.path);
}
if (!committed && closed) {
committed = !Files.exists(upaths.value);
}
}
protected void clean() throws IOException {
if (committed) {
FsTransaction.renameAndDeleteUnique(upaths.udir, updatable.paths.delete);
updatable.cleanPreserved();
}
finished = !Files.exists(upaths.udir);
}
/**
* Creating an extra hard link to future "value" files keeps a filesystem reference to them
* after the "value" file is replaced with a new "value" file. Keeping the reference around
* allows readers on other nodes to still read the contents of the file without experiencing a
* stale file handle exception over NFS. This can reduce the amount of spinning required for
* readers.
*/
protected void perserve() {
if (!preserved) {
preserved = Fs.tryCreateLink(updatable.paths.preserved.resolve(uuid), upaths.value);
}
}
}
protected final BasePaths paths;
public UpdatableFileValue(Path base) {
super(base.resolve(VALUE)); // value(val)
this.paths = new BasePaths(base);
}
public void initFs(T init) throws IOException {
super.init(init);
Files.createDirectories(paths.preserved);
while (!Files.exists(path)) {
try (InitBuilder b = new InitBuilder(paths, serializer.fromGeneric(init))) {
Fs.tryAtomicMove(b.dir, paths.update); // mv build/<tmp>/ -> update/
// update/init(init) using a non unique name, "init", to allow recovery
if (!Files.exists(path)) {
// mv update/init(init) -> value
Fs.tryAtomicMove(paths.update.resolve(INIT), path);
}
}
}
Fs.tryDelete(paths.update.resolve(INIT)); // cleanup
}
protected abstract T getToValue(T currentValue);
protected UniqueUpdate<T> completeOngoing(long maxTries) throws IOException {
if (shouldCompleteOngoing()) {
Path ongoing = Nfs.getFirstDirEntry(paths.update);
if (ongoing != null) {
// Attempt to complete previous updates;
return createUniqueUpdate(Fs.basename(ongoing).toString(), false, maxTries);
}
}
return null;
}
protected boolean shouldCompleteOngoing() {
// Collisions are expected, and we don't actually want to
// complete them too often since it affects fairness
// by potentially preventing slower actors from ever
// committing. We do however need to prevent deadlock from
// a stale proposal, so we do need to complete proposals
// that stay around too long.
// Maximum delay incurred due to a server crash.
FileTime expiry = Fs.getFileTimeAgo(10, TimeUnit.SECONDS);
return Fs.isAllEntriesOlderThan(paths.update, expiry);
}
protected abstract UniqueUpdate<T> createUniqueUpdate(String uuid, boolean ours, long maxTries)
throws IOException;
/**
* 1 second seems to be long enough even for slow readers (over a WAN) under high contention
* ("value" file being updated by a fast writer), to avoid spinning on reads most of the time.
*/
protected void cleanPreserved() {
FileTime expiry = Fs.getFileTimeAgo(1, TimeUnit.SECONDS);
Fs.tryRecursiveDeleteEntriesOlderThan(paths.preserved, expiry, 5);
}
}