Persist parsing-queue on plugin stop
Solves: Jira GER-1715
Change-Id: I8218cca768d5c50ea398df9ce3ca0f35aeb6789e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java
index 519698c..16d61c3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java
@@ -31,6 +31,7 @@
@Override
public void start() {
eventHub.startPublishing();
+ parsingQueue.init();
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
index 0808f51..d7bfd68 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
@@ -18,6 +18,7 @@
import static com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType.SCC;
import static com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType.SCS;
+import com.google.common.collect.Lists;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.Project.NameKey;
@@ -30,6 +31,8 @@
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
@@ -38,13 +41,18 @@
private volatile EiffelEventParsingExecutor pool;
private final EiffelEventParser eventParser;
+ private final ParsingQueuePersistance persistance;
private final ConcurrentHashMap<EventParsingWorker, ScheduledFuture<?>> pending =
new ConcurrentHashMap<>();
@Inject
- public EiffelEventParsingQueue(EiffelEventParsingExecutor pool, EiffelEventParser eventParser) {
+ public EiffelEventParsingQueue(
+ EiffelEventParsingExecutor pool,
+ EiffelEventParser eventParser,
+ ParsingQueuePersistance persistance) {
this.pool = pool;
this.eventParser = eventParser;
+ this.persistance = persistance;
}
public void scheduleSccCreation(RevisionCreatedListener.Event event) {
@@ -149,12 +157,21 @@
}
public void shutDown() {
- for (ScheduledFuture<?> f : pending.values()) {
- f.cancel(true);
+ List<ParsingQueueTask> tasks = Lists.newArrayListWithCapacity(pending.size());
+ for (Map.Entry<EventParsingWorker, ScheduledFuture<?>> e : pending.entrySet()) {
+ e.getValue().cancel(true);
+ tasks.add(e.getKey().task);
}
+ persistance.persist(tasks);
pool.shutDown();
}
+ public void init() {
+ for (ParsingQueueTask task : persistance.getPersistedTasks()) {
+ requeueTask(task);
+ }
+ }
+
protected void markAsCompleted(EventParsingWorker worker) {
pending.remove(worker);
}
@@ -221,6 +238,37 @@
}
}
+ private void requeueTask(ParsingQueueTask task) {
+ switch (task.type) {
+ case SCC:
+ if (task.commitId != null) {
+ scheduleSccCreation(task.repoName, task.branchRefOrTag, task.commitId);
+ } else {
+ scheduleSccCreation(task.repoName, task.branchRefOrTag);
+ }
+ break;
+ case SCS:
+ if (task.commitId != null) {
+ scheduleScsCreation(
+ task.repoName,
+ task.branchRefOrTag,
+ task.commitId,
+ task.updater,
+ task.updateTime,
+ task.previousTip);
+ } else {
+ scheduleScsCreation(task.repoName, task.branchRefOrTag);
+ }
+ break;
+ case ARTC:
+ scheduleArtcCreation(task.repoName, task.branchRefOrTag, task.updateTime, task.force);
+ break;
+ case CD: // CD is never scheduled for creation directly.
+ default:
+ break;
+ }
+ }
+
abstract class EventParsingWorker implements ProjectRunnable {
private final ParsingQueueTask task;
boolean running;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistance.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistance.java
new file mode 100644
index 0000000..afe7ca2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistance.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.eventseiffel.parsing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.reflect.TypeToken;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonSyntaxException;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+public class ParsingQueuePersistance {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final Gson GSON = new Gson();
+ private static final String PERSISTED_FILE_NAME = "persisted-queue.json";
+
+ private final Path persistanceFile;
+ private final Path pluginDataDir;
+
+ @Inject
+ public ParsingQueuePersistance(SitePaths sitePaths, @PluginName String pluginName) {
+ this.pluginDataDir = sitePaths.data_dir.resolve(pluginName);
+ this.persistanceFile = pluginDataDir.resolve(PERSISTED_FILE_NAME);
+ }
+
+ public void persist(List<ParsingQueueTask> tasks) {
+ try {
+ if (Files.isRegularFile(persistanceFile)) {
+ Files.delete(persistanceFile);
+ }
+ if (tasks == null || tasks.isEmpty()) {
+ return;
+ }
+ if (!Files.isDirectory(pluginDataDir)) {
+ Files.createDirectory(pluginDataDir);
+ }
+ Files.write(persistanceFile, GSON.toJson(tasks).getBytes(UTF_8));
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Failed to persist parsing queue");
+ }
+ }
+
+ public List<ParsingQueueTask> getPersistedTasks() {
+ List<ParsingQueueTask> persistedTasks = null;
+ try {
+ if (Files.isRegularFile(persistanceFile)) {
+ persistedTasks =
+ GSON.fromJson(
+ Files.newBufferedReader(persistanceFile, UTF_8),
+ new TypeToken<List<ParsingQueueTask>>() {
+
+ private static final long serialVersionUID = 1L;
+ }.getType());
+ }
+ } catch (JsonIOException | JsonSyntaxException | IOException e) {
+ logger.atSevere().withCause(e).log("Failed to get peristed parsing queue.");
+ }
+ return persistedTasks != null ? persistedTasks : List.of();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
new file mode 100644
index 0000000..6bb1f16
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
@@ -0,0 +1,125 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.eventseiffel.parsing;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.gerrit.acceptance.PushOneCommit;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.googlesource.gerrit.plugins.eventseiffel.TestEventHub;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.ArtifactEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.CompositionDefinedEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType;
+import java.util.List;
+import org.eclipse.jgit.lib.Constants;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+ name = "events-eiffel",
+ sysModule =
+ "com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParsingQueueIT$TestModule")
+public class EiffelEventParsingQueueIT extends EiffelEventParsingTest {
+
+ private ParsingQueuePersistance queuePersistance;
+ private EiffelEventParsingQueue parsingQueue;
+
+ @Before
+ public void setUp() throws Exception {
+ queuePersistance = plugin.getSysInjector().getInstance(ParsingQueuePersistance.class);
+ parsingQueue = plugin.getSysInjector().getInstance(EiffelEventParsingQueue.class);
+ TestEventHub.EVENTS.clear();
+ }
+
+ @Test
+ public void persistedScsFromBranchIsScheduledAndPushed() throws Exception {
+ queuePersistance.persist(
+ List.of(ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), getHead()).build()));
+ parsingQueue.init();
+ assertCorrectEvent(0, eventForHead(EiffelEventType.SCC));
+ assertCorrectEvent(1, eventForHead(EiffelEventType.SCS));
+ }
+
+ @Test
+ public void persistedScsFromCommitIsScheduledAndPushed() throws Exception {
+ setScsHandled();
+ PushOneCommit.Result res = createChange();
+ merge(res);
+ queuePersistance.persist(
+ List.of(
+ ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master")
+ .commit(res.getCommit().name())
+ .build()));
+ parsingQueue.init();
+ assertCorrectEvent(0, toSccKey(res));
+ assertCorrectEvent(1, toScsKey(res));
+ }
+
+ @Test
+ public void persistedSccFromCommitIsScheduledAndPushed() throws Exception {
+ setScsHandled();
+ PushOneCommit.Result res = createChange();
+ queuePersistance.persist(
+ List.of(
+ ParsingQueueTask.builder(EiffelEventType.SCC, project.get(), "refs/heads/master")
+ .commit(res.getCommit().name())
+ .build()));
+ parsingQueue.init();
+ assertCorrectEvent(0, toSccKey(res));
+ }
+
+ @Test
+ public void persistedSccFromBranchIsScheduledAndPushed() throws Exception {
+ setScsHandled();
+ PushOneCommit.Result res = createChange();
+ merge(res);
+ queuePersistance.persist(
+ List.of(
+ ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master")
+ .build()));
+ parsingQueue.init();
+ assertCorrectEvent(0, toSccKey(res));
+ }
+
+ @Test
+ public void persistedArtcIsScheduledAndPushed() throws Exception {
+ setScsHandled();
+ String tag =
+ createTagRef(getHead(repo(), "HEAD").getName(), true).substring(Constants.R_TAGS.length());
+ ArtifactEventKey artc = ArtifactEventKey.create(tagPURL(project.get(), tag, "localhost"));
+ CompositionDefinedEventKey cd =
+ CompositionDefinedEventKey.create(tagCompositionName(project.get(), "localhost"), tag);
+ queuePersistance.persist(
+ List.of(
+ ParsingQueueTask.builder(EiffelEventType.ARTC, project.get(), tag)
+ .updateTime(EPOCH_MILLIS)
+ .force(false)
+ .build()));
+ parsingQueue.init();
+ assertEquals(2, TestEventHub.EVENTS.size());
+ assertCorrectEvent(0, cd);
+ assertCorrectEvent(1, artc);
+ }
+
+ public static class TestModule extends ParsingTestModule {
+
+ @Override
+ protected void configure() {
+ bind(EiffelEventParsingExecutor.class).to(EiffelEventParsingExecutor.Direct.class);
+ super.configure();
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java
index 7d8c926..a0e8d8a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java
@@ -6,6 +6,7 @@
import com.googlesource.gerrit.plugins.eventseiffel.TestEventHub;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import org.junit.Ignore;
@@ -42,9 +43,11 @@
}
protected void setScsHandled() throws Exception {
- SourceChangeEventKey scs =
- SourceChangeEventKey.scsKey(project.get(), getHead(), getHeadRevision());
- markAsHandled(scs, getHead(repo(), "HEAD"));
+ markAsHandled(eventForHead(EiffelEventType.SCS), getHead(repo(), "HEAD"));
+ }
+
+ protected SourceChangeEventKey eventForHead(EiffelEventType type) throws Exception {
+ return SourceChangeEventKey.create(project.get(), getHead(), getHeadRevision(), type);
}
protected String getHead() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistanceIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistanceIT.java
new file mode 100644
index 0000000..9e00301
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistanceIT.java
@@ -0,0 +1,204 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.eventseiffel.parsing;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import autovaluegson.factory.shaded.com.google.common.collect.Lists;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.common.AccountInfo;
+import com.google.gerrit.extensions.events.RevisionCreatedListener.Event;
+import com.google.gerrit.server.git.ProjectRunnable;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType;
+import java.util.List;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+ name = "events-eiffel",
+ sysModule =
+ "com.googlesource.gerrit.plugins.eventseiffel.parsing.ParsingQueuePersistanceIT$TestModule")
+public class ParsingQueuePersistanceIT extends LightweightPluginDaemonTest {
+
+ private ParsingQueuePersistance queuePersistance;
+ private EiffelEventParsingQueue parsingQueue;
+
+ @Before
+ public void setUp() throws Exception {
+ queuePersistance = plugin.getSysInjector().getInstance(ParsingQueuePersistance.class);
+ parsingQueue = plugin.getSysInjector().getInstance(EiffelEventParsingQueue.class);
+ }
+
+ @Test
+ public void tasksArePersisted() throws Exception {
+ List<ParsingQueueTask> original = createTasks(1, 5);
+ queuePersistance.persist(original);
+ List<ParsingQueueTask> deserialized = queuePersistance.getPersistedTasks();
+ assertEquals(original, deserialized);
+ }
+
+ @Test
+ public void multiplePersistanceRuns() throws Exception {
+ List<ParsingQueueTask> original = createTasks(1, 6);
+ queuePersistance.persist(original);
+ List<ParsingQueueTask> deserialized = queuePersistance.getPersistedTasks();
+ assertEquals(original, deserialized);
+
+ original = createTasks(6, 10);
+ queuePersistance.persist(original);
+ deserialized = queuePersistance.getPersistedTasks();
+ assertEquals(original, deserialized);
+ }
+
+ @Test
+ public void persistedTasksAreScheduledOnInit() throws Exception {
+ List<ParsingQueueTask> tasks = createTasks(1, 6);
+ queuePersistance.persist(tasks);
+ parsingQueue.init();
+ assertThat(
+ TestExecutor.scheduledItems().stream()
+ .map(p -> p.getProjectNameKey())
+ .collect(Collectors.toList()))
+ .containsExactlyElementsIn(
+ tasks.stream().map(t -> Project.nameKey(t.repoName)).collect(Collectors.toList()));
+ }
+
+ private List<ParsingQueueTask> createTasks(int start, int end) {
+ return IntStream.range(start, end)
+ .mapToObj(ParsingQueuePersistanceIT::createTask)
+ .collect(Collectors.toList());
+ }
+
+ public static ParsingQueueTask createTask(int id) {
+ return ParsingQueueTask.builder(EiffelEventType.SCC, "repo" + id, "branch" + id)
+ .commit("commit-sha1" + id)
+ .build();
+ }
+
+ public static class TestExecutor
+ implements EiffelEventParsingExecutor, Provider<EiffelEventParsingExecutor> {
+ private static TestExecutor INSTANCE;
+
+ public static List<ProjectRunnable> scheduledItems() {
+ return INSTANCE.scheduledItems;
+ }
+
+ private final List<ProjectRunnable> scheduledItems = Lists.newArrayList();
+
+ @Override
+ public ScheduledFuture<?> schedule(ProjectRunnable runnable) {
+ scheduledItems.add(runnable);
+ return new ScheduledFuture<ProjectRunnable>() {
+
+ @Override
+ public long getDelay(TimeUnit arg0) {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(Delayed arg0) {
+ return 0;
+ }
+
+ @Override
+ public boolean cancel(boolean arg0) {
+ return false;
+ }
+
+ @Override
+ public ProjectRunnable get() throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public ProjectRunnable get(long arg0, TimeUnit arg1)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public void shutDown() {}
+
+ @Override
+ public EiffelEventParsingExecutor get() {
+ INSTANCE = new TestExecutor();
+ return INSTANCE;
+ }
+ }
+
+ public static class NoOpEventParser implements EiffelEventParser {
+
+ public NoOpEventParser() {}
+
+ @Override
+ public void createAndScheduleSccFromEvent(Event event) {}
+
+ @Override
+ public void createAndScheduleSccFromBranch(String repoName, String ref) {}
+
+ @Override
+ public void createAndScheduleSccFromCommit(
+ String repoName, String targetBranch, String commit) {}
+
+ @Override
+ public void createAndScheduleMissingScssFromBranch(String repoName, String branch) {}
+
+ @Override
+ public void createAndScheduleMissingScss(
+ SourceChangeEventKey scs,
+ String commitSha1TransactionEnd,
+ AccountInfo submitter,
+ Long submittedAt) {}
+
+ @Override
+ public void createAndScheduleArtc(
+ String projectName, String tagName, Long creationTime, boolean force) {}
+ }
+
+ public static class TestModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(ParsingQueuePersistance.class);
+ bind(EiffelEventParser.class).to(NoOpEventParser.class);
+ bind(EiffelEventParsingExecutor.class).toProvider(TestExecutor.class);
+ }
+ }
+}