Persist parsing-queue on plugin stop

Solves: Jira GER-1715
Change-Id: I8218cca768d5c50ea398df9ce3ca0f35aeb6789e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java
index 519698c..16d61c3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/Manager.java
@@ -31,6 +31,7 @@
   @Override
   public void start() {
     eventHub.startPublishing();
+    parsingQueue.init();
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
index 0808f51..d7bfd68 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueue.java
@@ -18,6 +18,7 @@
 import static com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType.SCC;
 import static com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType.SCS;
 
+import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
@@ -30,6 +31,8 @@
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 
@@ -38,13 +41,18 @@
 
   private volatile EiffelEventParsingExecutor pool;
   private final EiffelEventParser eventParser;
+  private final ParsingQueuePersistance persistance;
   private final ConcurrentHashMap<EventParsingWorker, ScheduledFuture<?>> pending =
       new ConcurrentHashMap<>();
 
   @Inject
-  public EiffelEventParsingQueue(EiffelEventParsingExecutor pool, EiffelEventParser eventParser) {
+  public EiffelEventParsingQueue(
+      EiffelEventParsingExecutor pool,
+      EiffelEventParser eventParser,
+      ParsingQueuePersistance persistance) {
     this.pool = pool;
     this.eventParser = eventParser;
+    this.persistance = persistance;
   }
 
   public void scheduleSccCreation(RevisionCreatedListener.Event event) {
@@ -149,12 +157,21 @@
   }
 
   public void shutDown() {
-    for (ScheduledFuture<?> f : pending.values()) {
-      f.cancel(true);
+    List<ParsingQueueTask> tasks = Lists.newArrayListWithCapacity(pending.size());
+    for (Map.Entry<EventParsingWorker, ScheduledFuture<?>> e : pending.entrySet()) {
+      e.getValue().cancel(true);
+      tasks.add(e.getKey().task);
     }
+    persistance.persist(tasks);
     pool.shutDown();
   }
 
+  public void init() {
+    for (ParsingQueueTask task : persistance.getPersistedTasks()) {
+      requeueTask(task);
+    }
+  }
+
   protected void markAsCompleted(EventParsingWorker worker) {
     pending.remove(worker);
   }
@@ -221,6 +238,37 @@
     }
   }
 
+  private void requeueTask(ParsingQueueTask task) {
+    switch (task.type) {
+      case SCC:
+        if (task.commitId != null) {
+          scheduleSccCreation(task.repoName, task.branchRefOrTag, task.commitId);
+        } else {
+          scheduleSccCreation(task.repoName, task.branchRefOrTag);
+        }
+        break;
+      case SCS:
+        if (task.commitId != null) {
+          scheduleScsCreation(
+              task.repoName,
+              task.branchRefOrTag,
+              task.commitId,
+              task.updater,
+              task.updateTime,
+              task.previousTip);
+        } else {
+          scheduleScsCreation(task.repoName, task.branchRefOrTag);
+        }
+        break;
+      case ARTC:
+        scheduleArtcCreation(task.repoName, task.branchRefOrTag, task.updateTime, task.force);
+        break;
+      case CD: // CD is never scheduled for creation directly.
+      default:
+        break;
+    }
+  }
+
   abstract class EventParsingWorker implements ProjectRunnable {
     private final ParsingQueueTask task;
     boolean running;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistance.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistance.java
new file mode 100644
index 0000000..afe7ca2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistance.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.eventseiffel.parsing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.reflect.TypeToken;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonSyntaxException;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+public class ParsingQueuePersistance {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final Gson GSON = new Gson();
+  private static final String PERSISTED_FILE_NAME = "persisted-queue.json";
+
+  private final Path persistanceFile;
+  private final Path pluginDataDir;
+
+  @Inject
+  public ParsingQueuePersistance(SitePaths sitePaths, @PluginName String pluginName) {
+    this.pluginDataDir = sitePaths.data_dir.resolve(pluginName);
+    this.persistanceFile = pluginDataDir.resolve(PERSISTED_FILE_NAME);
+  }
+
+  public void persist(List<ParsingQueueTask> tasks) {
+    try {
+      if (Files.isRegularFile(persistanceFile)) {
+        Files.delete(persistanceFile);
+      }
+      if (tasks == null || tasks.isEmpty()) {
+        return;
+      }
+      if (!Files.isDirectory(pluginDataDir)) {
+        Files.createDirectory(pluginDataDir);
+      }
+      Files.write(persistanceFile, GSON.toJson(tasks).getBytes(UTF_8));
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Failed to persist parsing queue");
+    }
+  }
+
+  public List<ParsingQueueTask> getPersistedTasks() {
+    List<ParsingQueueTask> persistedTasks = null;
+    try {
+      if (Files.isRegularFile(persistanceFile)) {
+        persistedTasks =
+            GSON.fromJson(
+                Files.newBufferedReader(persistanceFile, UTF_8),
+                new TypeToken<List<ParsingQueueTask>>() {
+
+                  private static final long serialVersionUID = 1L;
+                }.getType());
+      }
+    } catch (JsonIOException | JsonSyntaxException | IOException e) {
+      logger.atSevere().withCause(e).log("Failed to get peristed parsing queue.");
+    }
+    return persistedTasks != null ? persistedTasks : List.of();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
new file mode 100644
index 0000000..6bb1f16
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingQueueIT.java
@@ -0,0 +1,125 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.eventseiffel.parsing;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.gerrit.acceptance.PushOneCommit;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.googlesource.gerrit.plugins.eventseiffel.TestEventHub;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.ArtifactEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.CompositionDefinedEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType;
+import java.util.List;
+import org.eclipse.jgit.lib.Constants;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "events-eiffel",
+    sysModule =
+        "com.googlesource.gerrit.plugins.eventseiffel.parsing.EiffelEventParsingQueueIT$TestModule")
+public class EiffelEventParsingQueueIT extends EiffelEventParsingTest {
+
+  private ParsingQueuePersistance queuePersistance;
+  private EiffelEventParsingQueue parsingQueue;
+
+  @Before
+  public void setUp() throws Exception {
+    queuePersistance = plugin.getSysInjector().getInstance(ParsingQueuePersistance.class);
+    parsingQueue = plugin.getSysInjector().getInstance(EiffelEventParsingQueue.class);
+    TestEventHub.EVENTS.clear();
+  }
+
+  @Test
+  public void persistedScsFromBranchIsScheduledAndPushed() throws Exception {
+    queuePersistance.persist(
+        List.of(ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), getHead()).build()));
+    parsingQueue.init();
+    assertCorrectEvent(0, eventForHead(EiffelEventType.SCC));
+    assertCorrectEvent(1, eventForHead(EiffelEventType.SCS));
+  }
+
+  @Test
+  public void persistedScsFromCommitIsScheduledAndPushed() throws Exception {
+    setScsHandled();
+    PushOneCommit.Result res = createChange();
+    merge(res);
+    queuePersistance.persist(
+        List.of(
+            ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master")
+                .commit(res.getCommit().name())
+                .build()));
+    parsingQueue.init();
+    assertCorrectEvent(0, toSccKey(res));
+    assertCorrectEvent(1, toScsKey(res));
+  }
+
+  @Test
+  public void persistedSccFromCommitIsScheduledAndPushed() throws Exception {
+    setScsHandled();
+    PushOneCommit.Result res = createChange();
+    queuePersistance.persist(
+        List.of(
+            ParsingQueueTask.builder(EiffelEventType.SCC, project.get(), "refs/heads/master")
+                .commit(res.getCommit().name())
+                .build()));
+    parsingQueue.init();
+    assertCorrectEvent(0, toSccKey(res));
+  }
+
+  @Test
+  public void persistedSccFromBranchIsScheduledAndPushed() throws Exception {
+    setScsHandled();
+    PushOneCommit.Result res = createChange();
+    merge(res);
+    queuePersistance.persist(
+        List.of(
+            ParsingQueueTask.builder(EiffelEventType.SCS, project.get(), "refs/heads/master")
+                .build()));
+    parsingQueue.init();
+    assertCorrectEvent(0, toSccKey(res));
+  }
+
+  @Test
+  public void persistedArtcIsScheduledAndPushed() throws Exception {
+    setScsHandled();
+    String tag =
+        createTagRef(getHead(repo(), "HEAD").getName(), true).substring(Constants.R_TAGS.length());
+    ArtifactEventKey artc = ArtifactEventKey.create(tagPURL(project.get(), tag, "localhost"));
+    CompositionDefinedEventKey cd =
+        CompositionDefinedEventKey.create(tagCompositionName(project.get(), "localhost"), tag);
+    queuePersistance.persist(
+        List.of(
+            ParsingQueueTask.builder(EiffelEventType.ARTC, project.get(), tag)
+                .updateTime(EPOCH_MILLIS)
+                .force(false)
+                .build()));
+    parsingQueue.init();
+    assertEquals(2, TestEventHub.EVENTS.size());
+    assertCorrectEvent(0, cd);
+    assertCorrectEvent(1, artc);
+  }
+
+  public static class TestModule extends ParsingTestModule {
+
+    @Override
+    protected void configure() {
+      bind(EiffelEventParsingExecutor.class).to(EiffelEventParsingExecutor.Direct.class);
+      super.configure();
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java
index 7d8c926..a0e8d8a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/EiffelEventParsingTest.java
@@ -6,6 +6,7 @@
 import com.googlesource.gerrit.plugins.eventseiffel.TestEventHub;
 import com.googlesource.gerrit.plugins.eventseiffel.eiffel.EventKey;
 import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import org.junit.Ignore;
@@ -42,9 +43,11 @@
   }
 
   protected void setScsHandled() throws Exception {
-    SourceChangeEventKey scs =
-        SourceChangeEventKey.scsKey(project.get(), getHead(), getHeadRevision());
-    markAsHandled(scs, getHead(repo(), "HEAD"));
+    markAsHandled(eventForHead(EiffelEventType.SCS), getHead(repo(), "HEAD"));
+  }
+
+  protected SourceChangeEventKey eventForHead(EiffelEventType type) throws Exception {
+    return SourceChangeEventKey.create(project.get(), getHead(), getHeadRevision(), type);
   }
 
   protected String getHead() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistanceIT.java b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistanceIT.java
new file mode 100644
index 0000000..9e00301
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/eventseiffel/parsing/ParsingQueuePersistanceIT.java
@@ -0,0 +1,204 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.eventseiffel.parsing;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import autovaluegson.factory.shaded.com.google.common.collect.Lists;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.common.AccountInfo;
+import com.google.gerrit.extensions.events.RevisionCreatedListener.Event;
+import com.google.gerrit.server.git.ProjectRunnable;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.SourceChangeEventKey;
+import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEventType;
+import java.util.List;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "events-eiffel",
+    sysModule =
+        "com.googlesource.gerrit.plugins.eventseiffel.parsing.ParsingQueuePersistanceIT$TestModule")
+public class ParsingQueuePersistanceIT extends LightweightPluginDaemonTest {
+
+  private ParsingQueuePersistance queuePersistance;
+  private EiffelEventParsingQueue parsingQueue;
+
+  @Before
+  public void setUp() throws Exception {
+    queuePersistance = plugin.getSysInjector().getInstance(ParsingQueuePersistance.class);
+    parsingQueue = plugin.getSysInjector().getInstance(EiffelEventParsingQueue.class);
+  }
+
+  @Test
+  public void tasksArePersisted() throws Exception {
+    List<ParsingQueueTask> original = createTasks(1, 5);
+    queuePersistance.persist(original);
+    List<ParsingQueueTask> deserialized = queuePersistance.getPersistedTasks();
+    assertEquals(original, deserialized);
+  }
+
+  @Test
+  public void multiplePersistanceRuns() throws Exception {
+    List<ParsingQueueTask> original = createTasks(1, 6);
+    queuePersistance.persist(original);
+    List<ParsingQueueTask> deserialized = queuePersistance.getPersistedTasks();
+    assertEquals(original, deserialized);
+
+    original = createTasks(6, 10);
+    queuePersistance.persist(original);
+    deserialized = queuePersistance.getPersistedTasks();
+    assertEquals(original, deserialized);
+  }
+
+  @Test
+  public void persistedTasksAreScheduledOnInit() throws Exception {
+    List<ParsingQueueTask> tasks = createTasks(1, 6);
+    queuePersistance.persist(tasks);
+    parsingQueue.init();
+    assertThat(
+            TestExecutor.scheduledItems().stream()
+                .map(p -> p.getProjectNameKey())
+                .collect(Collectors.toList()))
+        .containsExactlyElementsIn(
+            tasks.stream().map(t -> Project.nameKey(t.repoName)).collect(Collectors.toList()));
+  }
+
+  private List<ParsingQueueTask> createTasks(int start, int end) {
+    return IntStream.range(start, end)
+        .mapToObj(ParsingQueuePersistanceIT::createTask)
+        .collect(Collectors.toList());
+  }
+
+  public static ParsingQueueTask createTask(int id) {
+    return ParsingQueueTask.builder(EiffelEventType.SCC, "repo" + id, "branch" + id)
+        .commit("commit-sha1" + id)
+        .build();
+  }
+
+  public static class TestExecutor
+      implements EiffelEventParsingExecutor, Provider<EiffelEventParsingExecutor> {
+    private static TestExecutor INSTANCE;
+
+    public static List<ProjectRunnable> scheduledItems() {
+      return INSTANCE.scheduledItems;
+    }
+
+    private final List<ProjectRunnable> scheduledItems = Lists.newArrayList();
+
+    @Override
+    public ScheduledFuture<?> schedule(ProjectRunnable runnable) {
+      scheduledItems.add(runnable);
+      return new ScheduledFuture<ProjectRunnable>() {
+
+        @Override
+        public long getDelay(TimeUnit arg0) {
+          return 0;
+        }
+
+        @Override
+        public int compareTo(Delayed arg0) {
+          return 0;
+        }
+
+        @Override
+        public boolean cancel(boolean arg0) {
+          return false;
+        }
+
+        @Override
+        public ProjectRunnable get() throws InterruptedException, ExecutionException {
+          return null;
+        }
+
+        @Override
+        public ProjectRunnable get(long arg0, TimeUnit arg1)
+            throws InterruptedException, ExecutionException, TimeoutException {
+          return null;
+        }
+
+        @Override
+        public boolean isCancelled() {
+          return false;
+        }
+
+        @Override
+        public boolean isDone() {
+          return false;
+        }
+      };
+    }
+
+    @Override
+    public void shutDown() {}
+
+    @Override
+    public EiffelEventParsingExecutor get() {
+      INSTANCE = new TestExecutor();
+      return INSTANCE;
+    }
+  }
+
+  public static class NoOpEventParser implements EiffelEventParser {
+
+    public NoOpEventParser() {}
+
+    @Override
+    public void createAndScheduleSccFromEvent(Event event) {}
+
+    @Override
+    public void createAndScheduleSccFromBranch(String repoName, String ref) {}
+
+    @Override
+    public void createAndScheduleSccFromCommit(
+        String repoName, String targetBranch, String commit) {}
+
+    @Override
+    public void createAndScheduleMissingScssFromBranch(String repoName, String branch) {}
+
+    @Override
+    public void createAndScheduleMissingScss(
+        SourceChangeEventKey scs,
+        String commitSha1TransactionEnd,
+        AccountInfo submitter,
+        Long submittedAt) {}
+
+    @Override
+    public void createAndScheduleArtc(
+        String projectName, String tagName, Long creationTime, boolean force) {}
+  }
+
+  public static class TestModule extends AbstractModule {
+    @Override
+    protected void configure() {
+      bind(ParsingQueuePersistance.class);
+      bind(EiffelEventParser.class).to(NoOpEventParser.class);
+      bind(EiffelEventParsingExecutor.class).toProvider(TestExecutor.class);
+    }
+  }
+}