Don't lose ref-updated events on plugin restart

When a ref-updated event is received, persist the event in the directory
defined by the replication.eventsDirectory. When the updated ref is
replicated deleted the persisted event.

If replication queue is non-empty and plugin gets stopped, ref updates
will not be replicated and, therefore, the persisted events will not get
deleted. When the plugin starts it will schedule replication for all
persisted events and delete them.

This change provides two benefits:
* no ref-updated events are lost on plugin restart
* eliminate need for the replicateOnStartup=true setting which schedules
  replication of all refs for all projects and typically creates a humongous
  replication queue on every plugin restart.

Change-Id: Ieacd084fabe703333241ffda11c8b6c78cced37a
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 8b6b8fc..766be73 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,11 +14,13 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.common.FileUtil;
+import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.List;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.slf4j.Logger;
@@ -33,13 +35,18 @@
   private final SitePaths site;
   private final WorkQueue workQueue;
   private final DestinationFactory destinationFactory;
+  private final Path pluginDataDir;
 
   @Inject
   public AutoReloadConfigDecorator(
-      SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory)
+      SitePaths site,
+      WorkQueue workQueue,
+      DestinationFactory destinationFactory,
+      @PluginData Path pluginDataDir)
       throws ConfigInvalidException, IOException {
     this.site = site;
     this.destinationFactory = destinationFactory;
+    this.pluginDataDir = pluginDataDir;
     this.currentConfig = loadConfig();
     this.currentConfigTs = getLastModified(currentConfig);
     this.workQueue = workQueue;
@@ -50,7 +57,7 @@
   }
 
   private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
-    return new ReplicationFileBasedConfig(site, destinationFactory);
+    return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
   }
 
   private synchronized boolean isAutoReload() {
@@ -102,6 +109,11 @@
   }
 
   @Override
+  public Path getEventsDirectory() {
+    return currentConfig.getEventsDirectory();
+  }
+
+  @Override
   public synchronized int shutdown() {
     return currentConfig.shutdown();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
new file mode 100644
index 0000000..dc2e6e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jgit.lib.ObjectId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class EventsStorage {
+  private static final Logger log = LoggerFactory.getLogger(EventsStorage.class);
+
+  public static class ReplicateRefUpdate {
+    public String project;
+    public String ref;
+  }
+
+  private static Gson GSON = new Gson();
+
+  private final Path refUpdates;
+
+  @Inject
+  EventsStorage(ReplicationConfig config) {
+    refUpdates = config.getEventsDirectory().resolve("ref-updates");
+  }
+
+  public String persist(String project, String ref) {
+    ReplicateRefUpdate r = new ReplicateRefUpdate();
+    r.project = project;
+    r.ref = ref;
+
+    String json = GSON.toJson(r);
+    String eventKey = sha1(json).name();
+    Path file = refUpdates().resolve(eventKey);
+
+    if (Files.exists(file)) {
+      return eventKey;
+    }
+
+    try {
+      Files.write(file, json.getBytes(UTF_8));
+    } catch (IOException e) {
+      log.warn("Couldn't persist event {}", json);
+    }
+    return eventKey;
+  }
+
+  public void delete(String eventKey) {
+    if (eventKey != null) {
+      try {
+        Files.delete(refUpdates().resolve(eventKey));
+      } catch (IOException e) {
+        log.error("Error while deleting event {}", eventKey);
+      }
+    }
+  }
+
+  public List<ReplicateRefUpdate> list() {
+    ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
+    try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+      for (Path e : events) {
+        if (Files.isRegularFile(e)) {
+          String json = new String(Files.readAllBytes(e), UTF_8);
+          result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+          Files.delete(e);
+        }
+      }
+    } catch (IOException e) {
+      log.error("Error when firing pending events", e);
+    }
+    return result;
+  }
+
+  private ObjectId sha1(String s) {
+    return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
+  }
+
+  private Path refUpdates() {
+    try {
+      return Files.createDirectories(refUpdates);
+    } catch (IOException e) {
+      throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 8b0aa3d..ac0262d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -31,17 +32,20 @@
   private final PushAll.Factory pushAll;
   private final ReplicationConfig config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final Factory replicationStateFactory;
 
   @Inject
   protected OnStartStop(
       ServerInformation srvInfo,
       PushAll.Factory pushAll,
       ReplicationConfig config,
-      DynamicItem<EventDispatcher> eventDispatcher) {
+      DynamicItem<EventDispatcher> eventDispatcher,
+      ReplicationState.Factory replicationStateFactory) {
     this.srvInfo = srvInfo;
     this.pushAll = pushAll;
     this.config = config;
     this.eventDispatcher = eventDispatcher;
+    this.replicationStateFactory = replicationStateFactory;
     this.pushAllFuture = Atomics.newReference();
   }
 
@@ -49,7 +53,8 @@
   public void start() {
     if (srvInfo.getState() == ServerInformation.State.STARTUP
         && config.isReplicateAllOnPluginStart()) {
-      ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
+      ReplicationState state =
+          replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
       pushAllFuture.set(
           pushAll
               .create(null, ReplicationFilter.all(), state, false)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index e94abbd..9693e2d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -14,6 +14,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.gerrit.server.git.WorkQueue;
+import java.nio.file.Path;
 import java.util.List;
 
 public interface ReplicationConfig {
@@ -32,6 +33,8 @@
 
   boolean isEmpty();
 
+  Path getEventsDirectory();
+
   int shutdown();
 
   void startup(WorkQueue workQueue);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index bec4f20..2990155 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -15,8 +15,10 @@
 
 import static java.util.stream.Collectors.toList;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
@@ -42,17 +44,22 @@
 public class ReplicationFileBasedConfig implements ReplicationConfig {
   static final Logger log = LoggerFactory.getLogger(ReplicationFileBasedConfig.class);
   private List<Destination> destinations;
+  private final SitePaths site;
   private Path cfgPath;
   private boolean replicateAllOnPluginStart;
   private boolean defaultForceUpdate;
   private final FileBasedConfig config;
+  private final Path pluginDataDir;
 
   @Inject
-  public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory)
+  public ReplicationFileBasedConfig(
+      SitePaths site, DestinationFactory destinationFactory, @PluginData Path pluginDataDir)
       throws ConfigInvalidException, IOException {
+    this.site = site;
     this.cfgPath = site.etc_dir.resolve("replication.config");
     this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
     this.destinations = allDestinations(destinationFactory);
+    this.pluginDataDir = pluginDataDir;
   }
 
   /*
@@ -180,6 +187,15 @@
     return destinations.isEmpty();
   }
 
+  @Override
+  public Path getEventsDirectory() {
+    String eventsDirectory = config.getString("replication", null, "eventsDirectory");
+    if (!Strings.isNullOrEmpty(eventsDirectory)) {
+      return site.resolve(eventsDirectory);
+    }
+    return pluginDataDir;
+  }
+
   Path getCfgPath() {
     return cfgPath;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 2247229..b989827 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -59,6 +59,7 @@
 
     install(new FactoryModuleBuilder().build(PushAll.Factory.class));
     install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class));
+    install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
 
     bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
     bind(ReplicationStateListener.class).to(ReplicationStateLogger.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index d73733a..8e74a5f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -72,6 +72,8 @@
   private final DynamicItem<EventDispatcher> dispatcher;
   private final ReplicationConfig config;
   private final GerritSshApi gerritAdmin;
+  private final ReplicationState.Factory replicationStateFactory;
+  private final EventsStorage eventsStorage;
   private volatile boolean running;
 
   @Inject
@@ -81,19 +83,24 @@
       GerritSshApi ga,
       ReplicationConfig rc,
       DynamicItem<EventDispatcher> dis,
-      ReplicationStateListener sl) {
+      ReplicationStateListener sl,
+      ReplicationState.Factory rsf,
+      EventsStorage es) {
     workQueue = wq;
     sshHelper = sh;
     dispatcher = dis;
     config = rc;
     stateLog = sl;
     gerritAdmin = ga;
+    replicationStateFactory = rsf;
+    eventsStorage = es;
   }
 
   @Override
   public void start() {
     config.startup(workQueue);
     running = true;
+    firePendingEvents();
   }
 
   @Override
@@ -127,23 +134,37 @@
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+  }
+
+  private void onGitReferenceUpdated(String projectName, String refName) {
+    ReplicationState state =
+        replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
     if (!running) {
       stateLog.warn("Replication plugin did not finish startup before event", state);
       return;
     }
 
-    Project.NameKey project = new Project.NameKey(event.getProjectName());
+    Project.NameKey project = new Project.NameKey(projectName);
     for (Destination cfg : config.getDestinations(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) {
+      if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+        String eventKey = eventsStorage.persist(projectName, refName);
+        state.setEventKey(eventKey);
         for (URIish uri : cfg.getURIs(project, null)) {
-          cfg.schedule(project, event.getRefName(), uri, state);
+          cfg.schedule(project, refName, uri, state);
         }
       }
     }
     state.markAllPushTasksScheduled();
   }
 
+  private void firePendingEvents() {
+    for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
+      repLog.info("Firing pending event {}", e);
+      onGitReferenceUpdated(e.project, e.ref);
+    }
+  }
+
   @Override
   public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
     Project.NameKey projectName = new Project.NameKey(event.getProjectName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 86557e2..6f0803a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,6 +16,8 @@
 
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -23,7 +25,13 @@
 import org.eclipse.jgit.transport.URIish;
 
 public class ReplicationState {
+
+  public interface Factory {
+    ReplicationState create(PushResultProcessing processing);
+  }
+
   private boolean allScheduled;
+  private final EventsStorage eventsStorage;
   private final PushResultProcessing pushResultProcessing;
 
   private final Lock countingLock = new ReentrantLock();
@@ -49,7 +57,11 @@
   private int totalPushTasksCount;
   private int finishedPushTasksCount;
 
-  public ReplicationState(PushResultProcessing processing) {
+  private String eventKey;
+
+  @AssistedInject
+  ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
+    eventsStorage = storage;
     pushResultProcessing = processing;
     statusByProjectRef = HashBasedTable.create();
   }
@@ -74,6 +86,7 @@
       URIish uri,
       RefPushResult status,
       RemoteRefUpdate.Status refUpdateStatus) {
+    deleteEvent();
     pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
 
     RefReplicationStatus completedRefStatus = null;
@@ -103,6 +116,12 @@
     }
   }
 
+  private void deleteEvent() {
+    if (eventKey != null) {
+      eventsStorage.delete(eventKey);
+    }
+  }
+
   public void markAllPushTasksScheduled() {
     countingLock.lock();
     try {
@@ -173,4 +192,8 @@
       return name().toLowerCase().replace("_", "-");
     }
   }
+
+  public void setEventKey(String eventKey) {
+    this.eventKey = eventKey;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index ec8d1f6..48c28b6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -52,13 +52,15 @@
 
   @Inject private PushAll.Factory pushFactory;
 
+  @Inject private ReplicationState.Factory replicationStateFactory;
+
   @Override
   protected void run() throws Failure {
     if (all && projectPatterns.size() > 0) {
       throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
     }
 
-    ReplicationState state = new ReplicationState(new CommandProcessing(this));
+    ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
     Future<?> future = null;
 
     ReplicationFilter projectFilter;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index c066513..a701cf8 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -116,6 +116,17 @@
 
 	By default, pushes are retried indefinitely.
 
+replication.eventsDirectory
+: Directory where replication events are persisted
+
+	When scheduling a replication, the replication event is persisted
+	under this directory. When the replication is done, the event is deleted.
+	If plugin is stopped before all scheduled replications are done, the
+	persisted events will not be deleted. When the plugin is started again,
+	it will trigger all replications found under this directory.
+
+	When not set, defaults to the plugin's data directory.
+
 remote.NAME.url
 :	Address of the remote server to push to.  Multiple URLs may be
 	specified within a single remote block, listing different
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..cf6715e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,12 +32,15 @@
 
   private ReplicationState replicationState;
   private PushResultProcessing pushResultProcessingMock;
+  private EventsStorage eventsStorage;
 
   @Before
   public void setUp() throws Exception {
     pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
     replay(pushResultProcessingMock);
-    replicationState = new ReplicationState(pushResultProcessingMock);
+    eventsStorage = createNiceMock(EventsStorage.class);
+    replay(eventsStorage);
+    replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
   }
 
   @Test