Merge branch 'stable-3.1'
* stable-3.1:
Synchronize access to ReplicationTasksStorage
Change-Id: I43047730142e2d2324ca8cc929a4e69e61c995c5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index e827d4f..a557eb7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -113,19 +113,20 @@
private static Gson GSON = new Gson();
+ private final Path refUpdates;
private final Path buildingUpdates;
private final Path runningUpdates;
private final Path waitingUpdates;
@Inject
ReplicationTasksStorage(ReplicationConfig config) {
- Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ refUpdates = config.getEventsDirectory().resolve("ref-updates");
buildingUpdates = refUpdates.resolve("building");
runningUpdates = refUpdates.resolve("running");
waitingUpdates = refUpdates.resolve("waiting");
}
- public String create(ReplicateRefUpdate r) {
+ public synchronized String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
@@ -134,7 +135,7 @@
this.disableDeleteForTesting = deleteDisabled;
}
- public boolean start(PushOne push) {
+ public synchronized boolean start(PushOne push) {
UriLock lock = new UriLock(push);
if (!lock.acquire()) {
return false;
@@ -151,7 +152,7 @@
return started;
}
- public void reset(PushOne push) {
+ public synchronized void reset(PushOne push) {
UriLock lock = new UriLock(push);
for (String ref : push.getRefs()) {
new Task(lock, ref).reset();
@@ -159,7 +160,7 @@
lock.release();
}
- public void resetAll() {
+ public synchronized void resetAll() {
try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
for (Path dir : dirs) {
UriLock lock = null;
@@ -190,20 +191,25 @@
lock.release();
}
- public List<ReplicateRefUpdate> listWaiting() {
+ public synchronized List<ReplicateRefUpdate> listWaiting() {
return list(createDir(waitingUpdates));
}
@VisibleForTesting
- public List<ReplicateRefUpdate> listRunning() {
+ public synchronized List<ReplicateRefUpdate> listRunning() {
return list(createDir(runningUpdates));
}
@VisibleForTesting
- public List<ReplicateRefUpdate> listBuilding() {
+ public synchronized List<ReplicateRefUpdate> listBuilding() {
return list(createDir(buildingUpdates));
}
+ @VisibleForTesting
+ public synchronized List<ReplicateRefUpdate> list() {
+ return list(createDir(refUpdates));
+ }
+
private List<ReplicateRefUpdate> list(Path tasks) {
List<ReplicateRefUpdate> results = new ArrayList<>();
try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
@@ -211,6 +217,8 @@
if (Files.isRegularFile(e)) {
String json = new String(Files.readAllBytes(e), UTF_8);
results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ } else if (Files.isDirectory(e)) {
+ results.addAll(list(e));
}
}
} catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 41f93b3..9cf5489 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -42,7 +42,6 @@
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.stream.Stream;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
@@ -400,18 +399,23 @@
private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
- return Stream.concat(
- tasksStorage.listWaiting().stream(),
- Stream.concat(
- tasksStorage.listBuilding().stream(), tasksStorage.listRunning().stream()))
+ return tasksStorage.list().stream()
.filter(task -> refmaskPattern.matcher(task.ref).matches())
.collect(toList());
}
- private void cleanupReplicationTasks() throws IOException {
- try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+ public void cleanupReplicationTasks() throws IOException {
+ cleanupReplicationTasks(storagePath);
+ }
+
+ private void cleanupReplicationTasks(Path basePath) throws IOException {
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
for (Path path : files) {
- path.toFile().delete();
+ if (Files.isDirectory(path)) {
+ cleanupReplicationTasks(path);
+ } else {
+ path.toFile().delete();
+ }
}
}
}