Merge branch 'stable-2.15' into stable-2.16

* stable-2.15:
  Fix issue with dropping events on start

Change-Id: I8d1b52bcf8f8c288892492b4375178756c784ec3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 91f7ae1..d73ab7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -18,7 +18,9 @@
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
 import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.HeadUpdatedListener;
@@ -36,6 +38,7 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
@@ -71,6 +74,7 @@
   private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
   private volatile boolean replaying;
+  private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
 
   @Inject
   ReplicationQueue(
@@ -86,6 +90,7 @@
     stateLog = sl;
     adminApiFactory = aaf;
     replicationTasksStorage = rts;
+    beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
   }
 
   @Override
@@ -94,6 +99,7 @@
       config.startup(workQueue);
       running = true;
       firePendingEvents();
+      fireBeforeStartupEvents();
     }
   }
 
@@ -146,7 +152,10 @@
   private void onGitReferenceUpdated(String projectName, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
     if (!running) {
-      stateLog.warn("Replication plugin did not finish startup before event", state);
+      stateLog.warn(
+          "Replication plugin did not finish startup before event, event replication is postponed",
+          state);
+      beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName));
       return;
     }
 
@@ -196,6 +205,18 @@
     }
   }
 
+  private void fireBeforeStartupEvents() {
+    Set<String> eventsReplayed = new HashSet<>();
+    for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
+      String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
+      if (!eventsReplayed.contains(eventKey)) {
+        repLog.info("Firing pending task {}", event);
+        onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+        eventsReplayed.add(eventKey);
+      }
+    }
+  }
+
   private Set<URIish> getURIs(
       @Nullable String remoteName, Project.NameKey projectName, FilterType filterType) {
     if (config.getDestinations(filterType).isEmpty()) {
@@ -299,4 +320,34 @@
   private void warnCannotPerform(String op, URIish uri) {
     repLog.warn("Cannot {} on remote site {}.", op, uri);
   }
+
+  private static class ReferenceUpdatedEvent {
+    private String projectName;
+    private String refName;
+
+    public ReferenceUpdatedEvent(String projectName, String refName) {
+      this.projectName = projectName;
+      this.refName = refName;
+    }
+
+    public String getProjectName() {
+      return projectName;
+    }
+
+    public String getRefName() {
+      return refName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(projectName, refName);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (obj instanceof ReferenceUpdatedEvent)
+          && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName)
+          && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName);
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
new file mode 100644
index 0000000..2a395ca
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -0,0 +1,134 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Stopwatch;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationQueueIT extends LightweightPluginDaemonTest {
+  private static final Logger logger = LoggerFactory.getLogger(ReplicationQueueIT.class);
+
+  private static final int TEST_REPLICATION_DELAY = 1;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  private Path gitPath;
+  private FileBasedConfig config;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+
+    setReplicationDestination("foo", "replica");
+    super.setUpTestPlugin();
+  }
+
+  @Test
+  public void shouldNotDropEventsWhenStarting() throws Exception {
+    Project.NameKey targetProject = createProject("projectreplica");
+
+    replicationQueueStop();
+    Result pushResult = createChange();
+    replicationQueueStart();
+
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    try (Repository repo = repoManager.openRepository(targetProject)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.error("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationDestination(String remoteName, String replicaSuffix)
+      throws IOException {
+    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix));
+  }
+
+  private void setReplicationDestination(String remoteName, List<String> replicaSuffixes)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setStringList("remote", remoteName, "url", replicaUrls);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    config.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    while (!waitCondition.get()) {
+      if (stopwatch.elapsed().compareTo(TEST_TIMEOUT) > 0) {
+        throw new InterruptedException();
+      }
+      TimeUnit.MILLISECONDS.sleep(50);
+    }
+  }
+
+  private void replicationQueueStart() {
+    plugin.getSysInjector().getInstance(ReplicationQueue.class).start();
+  }
+
+  private void replicationQueueStop() {
+    plugin.getSysInjector().getInstance(ReplicationQueue.class).stop();
+  }
+}