Persist failed tasks Solves: Jira GER-1730 Change-Id: I3a1f8daf21478a2bed5599cef20362ca74733383
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java index 3849289..01d5233 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
@@ -115,12 +115,16 @@ e.getValue().cancel(true); tasks.add(e.getKey().task); } - persistence.persist(tasks); + persistence.persistQueue(tasks); + persistence.persistFailed(failedTasks); pool.shutDown(); } public void init() { - for (ParsingQueueTask task : persistence.getPersistedTasks()) { + for (ParsingQueueTask task : persistence.getFailedTasks()) { + schedule(task); + } + for (ParsingQueueTask task : persistence.getPersistedQueue()) { schedule(task); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java index 2852d4d..25874f0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java +++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistence.java
@@ -27,55 +27,70 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; -import java.util.List; import java.util.Set; public class ParsingQueuePersistence { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final Gson GSON = new Gson(); - private static final String PERSISTED_FILE_NAME = "persisted-queue.json"; + private static final String PERSISTED_QUEUE_FILE_NAME = "persisted-queue.json"; + private static final String PERSISTED_FAILED_FILE_NAME = "peristed-failed.json"; - private final Path persistenceFile; - private final Path pluginDataDir; - - @Inject - public ParsingQueuePersistence(SitePaths sitePaths, @PluginName String pluginName) { - this.pluginDataDir = sitePaths.data_dir.resolve(pluginName); - this.persistenceFile = pluginDataDir.resolve(PERSISTED_FILE_NAME); - } - - public void persist(Set<ParsingQueueTask> tasks) { + private static void persistTo(Set<ParsingQueueTask> tasks, Path dir, String fileName) { + Path file = dir.resolve(fileName); try { - if (Files.isRegularFile(persistenceFile)) { - Files.delete(persistenceFile); + if (Files.isRegularFile(file)) { + Files.delete(file); } if (tasks == null || tasks.isEmpty()) { return; } - if (!Files.isDirectory(pluginDataDir)) { - Files.createDirectory(pluginDataDir); + if (!Files.isDirectory(dir)) { + Files.createDirectory(dir); } - Files.write(persistenceFile, GSON.toJson(tasks).getBytes(UTF_8)); + Files.write(file, GSON.toJson(tasks).getBytes(UTF_8)); } catch (IOException e) { - logger.atSevere().withCause(e).log("Failed to persist parsing queue"); + logger.atSevere().withCause(e).log("Failed to persist tasks to : %s", file); } } - public Collection<ParsingQueueTask> getPersistedTasks() { + private static Set<ParsingQueueTask> getTasksFrom(Path file) { Set<ParsingQueueTask> persistedTasks = null; try { - if (Files.isRegularFile(persistenceFile)) { + if (Files.isRegularFile(file)) { persistedTasks = GSON.fromJson( - Files.newBufferedReader(persistenceFile, UTF_8), + Files.newBufferedReader(file, UTF_8), new TypeToken<Set<ParsingQueueTask>>() { private static final long serialVersionUID = 1L; }.getType()); } } catch (JsonIOException | JsonSyntaxException | IOException e) { - logger.atSevere().withCause(e).log("Failed to get peristed parsing queue."); + logger.atSevere().withCause(e).log("Failed to get tasks from %s.", file); } - return persistedTasks != null ? persistedTasks : List.of(); + return persistedTasks != null ? persistedTasks : Set.of(); + } + + private final Path pluginDataDir; + + @Inject + public ParsingQueuePersistence(SitePaths sitePaths, @PluginName String pluginName) { + this.pluginDataDir = sitePaths.data_dir.resolve(pluginName); + } + + public void persistQueue(Set<ParsingQueueTask> tasks) { + persistTo(tasks, pluginDataDir, PERSISTED_QUEUE_FILE_NAME); + } + + public void persistFailed(Set<ParsingQueueTask> failedTasks) { + persistTo(failedTasks, pluginDataDir, PERSISTED_FAILED_FILE_NAME); + } + + public Collection<ParsingQueueTask> getPersistedQueue() { + return getTasksFrom(pluginDataDir.resolve(PERSISTED_QUEUE_FILE_NAME)); + } + + public Collection<ParsingQueueTask> getFailedTasks() { + return getTasksFrom(pluginDataDir.resolve(PERSISTED_FAILED_FILE_NAME)); } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java index c07a8bb..3de49b8 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
@@ -46,7 +46,7 @@ @Test public void persistedScsFromBranchIsScheduledAndPushed() throws Exception { - queuePersistence.persist( + queuePersistence.persistQueue( Set.of(ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), getHead()).build())); parsingQueue.init(); assertCorrectEvent(0, eventForHead(EiffelEventType.SCC)); @@ -58,7 +58,7 @@ setScsHandled(); PushOneCommit.Result res = createChange(); merge(res); - queuePersistence.persist( + queuePersistence.persistQueue( Set.of( ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master") .commit(res.getCommit().name()) @@ -72,7 +72,7 @@ public void persistedSccFromCommitIsScheduledAndPushed() throws Exception { setScsHandled(); PushOneCommit.Result res = createChange(); - queuePersistence.persist( + queuePersistence.persistQueue( Set.of( ParsingQueueTask.builder(EiffelEventType.SCC, project.get(), "refs/heads/master") .commit(res.getCommit().name()) @@ -86,7 +86,7 @@ setScsHandled(); PushOneCommit.Result res = createChange(); merge(res); - queuePersistence.persist( + queuePersistence.persistQueue( Set.of( ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master") .build())); @@ -102,7 +102,7 @@ ArtifactEventKey artc = ArtifactEventKey.create(tagPURL(project.get(), tag, "localhost")); CompositionDefinedEventKey cd = CompositionDefinedEventKey.create(tagCompositionName(project.get(), "localhost"), tag); - queuePersistence.persist( + queuePersistence.persistQueue( Set.of( ParsingQueueTask.builder(EiffelEventType.ARTC, project.get(), tag) .updateTime(EPOCH_MILLIS)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java index c0f1457..933ed48 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistenceIT.java
@@ -52,31 +52,55 @@ @Before public void setUp() throws Exception { queuePersistence = plugin.getSysInjector().getInstance(ParsingQueuePersistence.class); + /* Purge persistence from previous runs. */ + queuePersistence.persistFailed(Set.of()); + queuePersistence.persistQueue(Set.of()); + parsingQueue = plugin.getSysInjector().getInstance(EiffelEventParsingQueue.class); } @Test - public void tasksArePersisted() throws Exception { + public void queueIsPersisted() throws Exception { Set<ParsingQueueTask> original = createTasks(1, 5); - queuePersistence.persist(original); - assertThat(queuePersistence.getPersistedTasks()).containsExactlyElementsIn(original); + queuePersistence.persistQueue(original); + assertThat(original).containsExactlyElementsIn(queuePersistence.getPersistedQueue()); } @Test - public void multiplePersistenceRuns() throws Exception { + public void failedTasksArePersisted() throws Exception { + Set<ParsingQueueTask> original = createTasks(1, 5); + queuePersistence.persistFailed(original); + assertThat(original).containsExactlyElementsIn(queuePersistence.getFailedTasks()); + } + + @Test + public void multipleQueuePersistenceRuns() throws Exception { Set<ParsingQueueTask> original = createTasks(1, 6); - queuePersistence.persist(original); - assertThat(queuePersistence.getPersistedTasks()).containsExactlyElementsIn(original); + queuePersistence.persistQueue(original); + assertThat(original).containsExactlyElementsIn(queuePersistence.getPersistedQueue()); original = createTasks(6, 10); - queuePersistence.persist(original); - assertThat(queuePersistence.getPersistedTasks()).containsExactlyElementsIn(original); + queuePersistence.persistQueue(original); + assertThat(original).containsExactlyElementsIn(queuePersistence.getPersistedQueue()); } @Test - public void persistedTasksAreScheduledOnInit() throws Exception { + public void persistedQueuedTasksAreScheduledOnInit() throws Exception { Set<ParsingQueueTask> tasks = createTasks(1, 6); - queuePersistence.persist(tasks); + queuePersistence.persistQueue(tasks); + parsingQueue.init(); + assertThat( + TestExecutor.scheduledItems().stream() + .map(p -> p.getProjectNameKey()) + .collect(Collectors.toList())) + .containsExactlyElementsIn( + tasks.stream().map(t -> Project.nameKey(t.repoName)).collect(Collectors.toList())); + } + + @Test + public void persistedFailedTasksAreScheduledOnInit() throws Exception { + Set<ParsingQueueTask> tasks = createTasks(1, 6); + queuePersistence.persistFailed(tasks); parsingQueue.init(); assertThat( TestExecutor.scheduledItems().stream()