Merge branch 'stable-3.2'
* stable-3.2:
Revert "Get a URI lock before running tasks."
Change-Id: I855a004147fd895f51b4349bb368cb9ae6a63060
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 936bb2c..adc3133 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -583,9 +583,7 @@
if (inFlightOp != null) {
return RunwayStatus.denied(inFlightOp.getId());
}
- if (!replicationTasksStorage.get().start(op)) {
- return RunwayStatus.deniedExternal();
- }
+ replicationTasksStorage.get().start(op);
inFlight.put(op.getURI(), op);
}
return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 9ccb70c..d598925 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -335,8 +335,6 @@
if (status.isCanceled()) {
repLog.atInfo().log(
"PushOp for replication to %s was canceled and thus won't be rescheduled", uri);
- } else if (status.isExternalInflight()) {
- repLog.atInfo().log("PushOp for replication to %s was denied externally", uri);
} else {
repLog.atInfo().log(
"Rescheduling replication to %s to avoid collision with the in-flight push [%s].",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5af0983..506b175 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -26,7 +26,6 @@
import java.io.IOException;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
-import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -50,13 +49,10 @@
*
* <p><code>
* .../building/<tmp_name> new replication tasks under construction
- * .../running/<uri_sha1>/ lock for URI
- * .../running/<uri_sha1>/<task_sha1> running replication tasks
+ * .../running/<sha1> running replication tasks
* .../waiting/<task_sha1_NN_shard>/<task_sha1> outstanding replication tasks
* </code>
*
- * <p>The URI lock is acquired by creating the directory and released by removing it.
- *
* <p>Tasks are moved atomically via a rename between those directories to indicate the current
* state of each task.
*
@@ -69,50 +65,26 @@
private boolean disableDeleteForTesting;
- public static class ReplicateRefUpdate extends UriUpdate {
- public final String ref;
-
- public ReplicateRefUpdate(UriUpdate update, String ref) {
- this(update.project, ref, update.uri, update.remote);
- }
-
- public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
- this(project, ref, uri.toASCIIString(), remote);
- }
-
- protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
- super(project, uri, remote);
- this.ref = ref;
- }
-
- @Override
- public String toString() {
- return "ref-update " + ref + " (" + super.toString() + ")";
- }
- }
-
- public static class UriUpdate {
+ public static class ReplicateRefUpdate {
public final String project;
+ public final String ref;
public final String uri;
public final String remote;
- public UriUpdate(PushOne push) {
- this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+ public ReplicateRefUpdate(PushOne push, String ref) {
+ this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName());
}
- public UriUpdate(String project, URIish uri, String remote) {
- this(project, uri.toASCIIString(), remote);
- }
-
- public UriUpdate(String project, String uri, String remote) {
+ public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
this.project = project;
- this.uri = uri;
+ this.ref = ref;
+ this.uri = uri.toASCIIString();
this.remote = remote;
}
@Override
public String toString() {
- return "uri-update " + project + " uri:" + uri + " remote:" + remote;
+ return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
}
}
@@ -140,52 +112,21 @@
this.disableDeleteForTesting = deleteDisabled;
}
- public synchronized boolean start(PushOne push) {
- UriLock lock = new UriLock(push);
- if (!lock.acquire()) {
- return false;
- }
-
- boolean started = false;
+ public synchronized void start(PushOne push) {
for (String ref : push.getRefs()) {
- started = new Task(lock, ref).start() || started;
+ new Task(new ReplicateRefUpdate(push, ref)).start();
}
-
- if (!started) { // No tasks left, likely replicated externally
- lock.release();
- }
- return started;
}
public synchronized void reset(PushOne push) {
- UriLock lock = new UriLock(push);
for (String ref : push.getRefs()) {
- new Task(lock, ref).reset();
+ new Task(new ReplicateRefUpdate(push, ref)).reset();
}
- lock.release();
}
public synchronized void resetAll() {
- try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
- for (Path dir : dirs) {
- UriLock lock = null;
- try {
- for (ReplicateRefUpdate u : list(dir)) {
- if (lock == null) {
- lock = new UriLock(u);
- }
- new Task(u).reset();
- }
- } catch (DirectoryIteratorException d) {
- // iterating over the sub-directories is expected to have dirs disappear
- Nfs.throwIfNotStaleFileHandle(d.getCause());
- }
- if (lock != null) {
- lock.release();
- }
- }
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while aborting running tasks");
+ for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
+ new Task(r).reset();
}
}
@@ -194,11 +135,9 @@
}
public void finish(PushOne push) {
- UriLock lock = new UriLock(push);
- for (ReplicateRefUpdate r : list(lock.runningDir)) {
- new Task(lock, r.ref).finish();
+ for (String ref : push.getRefs()) {
+ new Task(new ReplicateRefUpdate(push, ref)).finish();
}
- lock.release();
}
public synchronized List<ReplicateRefUpdate> listWaiting() {
@@ -263,73 +202,22 @@
}
}
- private class UriLock {
- public final UriUpdate update;
- public final String uriKey;
- public final Path runningDir;
-
- public UriLock(PushOne push) {
- this(new UriUpdate(push));
- }
-
- public UriLock(UriUpdate update) {
- this.update = update;
- uriKey = sha1(update.uri).name();
- runningDir = createDir(runningUpdates).resolve(uriKey);
- }
-
- public boolean acquire() {
- try {
- logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
- Files.createDirectory(runningDir);
- return true;
- } catch (FileAlreadyExistsException e) {
- return false; // already running, likely externally
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
- return true; // safer to risk a duplicate than to skip it
- }
- }
-
- public void release() {
- try {
- if (disableDeleteForTesting && Files.list(runningDir).findFirst().isPresent()) {
- logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
- return;
- }
-
- logger.atFine().log("DELETE %s %s", runningDir, updateLog());
- Files.delete(runningDir);
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
- }
- }
-
- private String updateLog() {
- return String.format("(%s => %s)", update.project, update.uri);
- }
- }
-
private class Task {
public final ReplicateRefUpdate update;
public final String taskKey;
public final Path running;
public final Path waiting;
- public Task(ReplicateRefUpdate r) {
- this(new UriLock(r), r.ref);
- }
-
public Task(PushOne push, String ref) {
- this(new UriLock(push), ref);
+ this(new ReplicateRefUpdate(push, ref));
}
- public Task(UriLock lock, String ref) {
- update = new ReplicateRefUpdate(lock.update, ref);
+ public Task(ReplicateRefUpdate update) {
+ this.update = update;
String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
taskKey = sha1(key).name();
- running = lock.runningDir.resolve(taskKey);
- waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey);
+ running = createDir(runningUpdates).resolve(taskKey);
+ waiting = createDir(waitingUpdates).resolve(taskKey);
}
public String create() {
@@ -350,9 +238,8 @@
return taskKey;
}
- public boolean start() {
+ public void start() {
rename(waiting, running);
- return Files.exists(running);
}
public void reset() {
@@ -373,7 +260,7 @@
logger.atFine().log("DELETE %s %s", running, updateLog());
Files.delete(running);
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
+ logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index f7071d8..bcb1e2f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,10 +27,6 @@
return new RunwayStatus(false, inFlightPushId);
}
- public static RunwayStatus deniedExternal() {
- return new RunwayStatus(false, -1);
- }
-
private final boolean allowed;
private final int inFlightPushId;
@@ -47,10 +43,6 @@
return !allowed && inFlightPushId == 0;
}
- public boolean isExternalInflight() {
- return !allowed && inFlightPushId == -1;
- }
-
public int getInFlightPushId() {
return inFlightPushId;
}