Persist failed tasks
Solves: Jira GER-1730
Change-Id: I3a1f8daf21478a2bed5599cef20362ca74733383
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
index 3849289..01d5233 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
@@ -115,12 +115,16 @@
e.getValue().cancel(true);
tasks.add(e.getKey().task);
}
- persistence.persist(tasks);
+ persistence.persistQueue(tasks);
+ persistence.persistFailed(failedTasks);
pool.shutDown();
}
public void init() {
- for (ParsingQueueTask task : persistence.getPersistedTasks()) {
+ for (ParsingQueueTask task : persistence.getFailedTasks()) {
+ schedule(task);
+ }
+ for (ParsingQueueTask task : persistence.getPersistedQueue()) {
schedule(task);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java
index 2852d4d..25874f0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java
@@ -27,55 +27,70 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
public class ParsingQueuePersistence {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final Gson GSON = new Gson();
- private static final String PERSISTED_FILE_NAME = "persisted-queue.json";
+ private static final String PERSISTED_QUEUE_FILE_NAME = "persisted-queue.json";
+ private static final String PERSISTED_FAILED_FILE_NAME = "peristed-failed.json";
- private final Path persistenceFile;
- private final Path pluginDataDir;
-
- @Inject
- public ParsingQueuePersistence(SitePaths sitePaths, @PluginName String pluginName) {
- this.pluginDataDir = sitePaths.data_dir.resolve(pluginName);
- this.persistenceFile = pluginDataDir.resolve(PERSISTED_FILE_NAME);
- }
-
- public void persist(Set<ParsingQueueTask> tasks) {
+ private static void persistTo(Set<ParsingQueueTask> tasks, Path dir, String fileName) {
+ Path file = dir.resolve(fileName);
try {
- if (Files.isRegularFile(persistenceFile)) {
- Files.delete(persistenceFile);
+ if (Files.isRegularFile(file)) {
+ Files.delete(file);
}
if (tasks == null || tasks.isEmpty()) {
return;
}
- if (!Files.isDirectory(pluginDataDir)) {
- Files.createDirectory(pluginDataDir);
+ if (!Files.isDirectory(dir)) {
+ Files.createDirectory(dir);
}
- Files.write(persistenceFile, GSON.toJson(tasks).getBytes(UTF_8));
+ Files.write(file, GSON.toJson(tasks).getBytes(UTF_8));
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Failed to persist parsing queue");
+ logger.atSevere().withCause(e).log("Failed to persist tasks to : %s", file);
}
}
- public Collection<ParsingQueueTask> getPersistedTasks() {
+ private static Set<ParsingQueueTask> getTasksFrom(Path file) {
Set<ParsingQueueTask> persistedTasks = null;
try {
- if (Files.isRegularFile(persistenceFile)) {
+ if (Files.isRegularFile(file)) {
persistedTasks =
GSON.fromJson(
- Files.newBufferedReader(persistenceFile, UTF_8),
+ Files.newBufferedReader(file, UTF_8),
new TypeToken<Set<ParsingQueueTask>>() {
private static final long serialVersionUID = 1L;
}.getType());
}
} catch (JsonIOException | JsonSyntaxException | IOException e) {
- logger.atSevere().withCause(e).log("Failed to get peristed parsing queue.");
+ logger.atSevere().withCause(e).log("Failed to get tasks from %s.", file);
}
- return persistedTasks != null ? persistedTasks : List.of();
+ return persistedTasks != null ? persistedTasks : Set.of();
+ }
+
+ private final Path pluginDataDir;
+
+ @Inject
+ public ParsingQueuePersistence(SitePaths sitePaths, @PluginName String pluginName) {
+ this.pluginDataDir = sitePaths.data_dir.resolve(pluginName);
+ }
+
+ public void persistQueue(Set<ParsingQueueTask> tasks) {
+ persistTo(tasks, pluginDataDir, PERSISTED_QUEUE_FILE_NAME);
+ }
+
+ public void persistFailed(Set<ParsingQueueTask> failedTasks) {
+ persistTo(failedTasks, pluginDataDir, PERSISTED_FAILED_FILE_NAME);
+ }
+
+ public Collection<ParsingQueueTask> getPersistedQueue() {
+ return getTasksFrom(pluginDataDir.resolve(PERSISTED_QUEUE_FILE_NAME));
+ }
+
+ public Collection<ParsingQueueTask> getFailedTasks() {
+ return getTasksFrom(pluginDataDir.resolve(PERSISTED_FAILED_FILE_NAME));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
index c07a8bb..3de49b8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
@@ -46,7 +46,7 @@
@Test
public void persistedScsFromBranchIsScheduledAndPushed() throws Exception {
- queuePersistence.persist(
+ queuePersistence.persistQueue(
Set.of(ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), getHead()).build()));
parsingQueue.init();
assertCorrectEvent(0, eventForHead(EiffelEventType.SCC));
@@ -58,7 +58,7 @@
setScsHandled();
PushOneCommit.Result res = createChange();
merge(res);
- queuePersistence.persist(
+ queuePersistence.persistQueue(
Set.of(
ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master")
.commit(res.getCommit().name())
@@ -72,7 +72,7 @@
public void persistedSccFromCommitIsScheduledAndPushed() throws Exception {
setScsHandled();
PushOneCommit.Result res = createChange();
- queuePersistence.persist(
+ queuePersistence.persistQueue(
Set.of(
ParsingQueueTask.builder(EiffelEventType.SCC, project.get(), "refs/heads/master")
.commit(res.getCommit().name())
@@ -86,7 +86,7 @@
setScsHandled();
PushOneCommit.Result res = createChange();
merge(res);
- queuePersistence.persist(
+ queuePersistence.persistQueue(
Set.of(
ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master")
.build()));
@@ -102,7 +102,7 @@
ArtifactEventKey artc = ArtifactEventKey.create(tagPURL(project.get(), tag, "localhost"));
CompositionDefinedEventKey cd =
CompositionDefinedEventKey.create(tagCompositionName(project.get(), "localhost"), tag);
- queuePersistence.persist(
+ queuePersistence.persistQueue(
Set.of(
ParsingQueueTask.builder(EiffelEventType.ARTC, project.get(), tag)
.updateTime(EPOCH_MILLIS)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java
index c0f1457..933ed48 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java
@@ -52,31 +52,55 @@
@Before
public void setUp() throws Exception {
queuePersistence = plugin.getSysInjector().getInstance(ParsingQueuePersistence.class);
+ /* Purge persistence from previous runs. */
+ queuePersistence.persistFailed(Set.of());
+ queuePersistence.persistQueue(Set.of());
+
parsingQueue = plugin.getSysInjector().getInstance(EiffelEventParsingQueue.class);
}
@Test
- public void tasksArePersisted() throws Exception {
+ public void queueIsPersisted() throws Exception {
Set<ParsingQueueTask> original = createTasks(1, 5);
- queuePersistence.persist(original);
- assertThat(queuePersistence.getPersistedTasks()).containsExactlyElementsIn(original);
+ queuePersistence.persistQueue(original);
+ assertThat(original).containsExactlyElementsIn(queuePersistence.getPersistedQueue());
}
@Test
- public void multiplePersistenceRuns() throws Exception {
+ public void failedTasksArePersisted() throws Exception {
+ Set<ParsingQueueTask> original = createTasks(1, 5);
+ queuePersistence.persistFailed(original);
+ assertThat(original).containsExactlyElementsIn(queuePersistence.getFailedTasks());
+ }
+
+ @Test
+ public void multipleQueuePersistenceRuns() throws Exception {
Set<ParsingQueueTask> original = createTasks(1, 6);
- queuePersistence.persist(original);
- assertThat(queuePersistence.getPersistedTasks()).containsExactlyElementsIn(original);
+ queuePersistence.persistQueue(original);
+ assertThat(original).containsExactlyElementsIn(queuePersistence.getPersistedQueue());
original = createTasks(6, 10);
- queuePersistence.persist(original);
- assertThat(queuePersistence.getPersistedTasks()).containsExactlyElementsIn(original);
+ queuePersistence.persistQueue(original);
+ assertThat(original).containsExactlyElementsIn(queuePersistence.getPersistedQueue());
}
@Test
- public void persistedTasksAreScheduledOnInit() throws Exception {
+ public void persistedQueuedTasksAreScheduledOnInit() throws Exception {
Set<ParsingQueueTask> tasks = createTasks(1, 6);
- queuePersistence.persist(tasks);
+ queuePersistence.persistQueue(tasks);
+ parsingQueue.init();
+ assertThat(
+ TestExecutor.scheduledItems().stream()
+ .map(p -> p.getProjectNameKey())
+ .collect(Collectors.toList()))
+ .containsExactlyElementsIn(
+ tasks.stream().map(t -> Project.nameKey(t.repoName)).collect(Collectors.toList()));
+ }
+
+ @Test
+ public void persistedFailedTasksAreScheduledOnInit() throws Exception {
+ Set<ParsingQueueTask> tasks = createTasks(1, 6);
+ queuePersistence.persistFailed(tasks);
parsingQueue.init();
assertThat(
TestExecutor.scheduledItems().stream()