Merge branch 'stable-3.1' into stable-3.2

* stable-3.1:
  Improve readability of shouldFirePendingOnlyToStoredUri test
  Fix flakiness in ReplicationIT for pending events firing
  Only fire the specified pending event URI

Change-Id: Ie83763e4a9fe13522f356b569fc2360fa5883224
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index adc3133..8f04c74 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -609,6 +609,10 @@
     return names;
   }
 
+  boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
+    return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       repLog.atFine().log("Skipping replication of project %s", project.get());
@@ -662,6 +666,16 @@
     return config.replicateProjectDeletions();
   }
 
+  private boolean matches(URIish uri, Project.NameKey project) {
+    for (URIish configUri : config.getRemoteConfig().getURIs()) {
+      URIish projectUri = getURI(configUri, project);
+      if (uri.equals(projectUri)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   List<URIish> getURIs(Project.NameKey project, String urlMatch) {
     List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
     for (URIish configUri : config.getRemoteConfig().getURIs()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index ecfbb8e..ac14657 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -38,6 +38,7 @@
 import com.googlesource.gerrit.plugins.replication.Destination.Factory;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -157,6 +158,17 @@
   }
 
   @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    List<Destination> dests = new ArrayList<>();
+    for (Destination dest : getAll(FilterType.ALL)) {
+      if (dest.wouldPush(uri, project, ref)) {
+        dests.add(dest);
+      }
+    }
+    return dests;
+  }
+
+  @Override
   public boolean isEmpty() {
     return destinations.isEmpty();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index b191d7d..18ccc66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -44,6 +44,16 @@
    */
   List<Destination> getAll(FilterType filterType);
 
+  /**
+   * Return the active replication destinations for a uri/project/ref triplet.
+   *
+   * @param uriish uri of the destinations
+   * @param project name of the project
+   * @param ref ref name
+   * @return the list of active destinations
+   */
+  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
   /** @return true if there are no destinations, false otherwise. */
   boolean isEmpty();
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 21e4227..e685237 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -34,6 +34,7 @@
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Queue;
@@ -154,6 +155,14 @@
     }
   }
 
+  private void fire(URIish uri, Project.NameKey project, String refName) {
+    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
+      dest.schedule(project, refName, uri, state);
+    }
+    state.markAllPushTasksScheduled();
+  }
+
   @UsedAt(UsedAt.Project.COLLABNET)
   public void pushReference(Destination cfg, Project.NameKey project, String refName) {
     pushReference(cfg, project, null, refName, null, true, false);
@@ -190,14 +199,16 @@
   private void firePendingEvents() {
     replaying = true;
     try {
-      Set<String> eventsReplayed = new HashSet<>();
       replaying = true;
       for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
-        String eventKey = String.format("%s:%s", t.project, t.ref);
-        if (!eventsReplayed.contains(eventKey)) {
-          repLog.atInfo().log("Firing pending task %s", eventKey);
-          fire(t.project, t.ref, true);
-          eventsReplayed.add(eventKey);
+        if (t == null) {
+          repLog.atWarning().log("Encountered null replication event in ReplicationTasksStorage");
+          continue;
+        }
+        try {
+          fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
+        } catch (URISyntaxException e) {
+          repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
         }
       }
     } finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 506b175..bfd0886 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -112,6 +112,11 @@
     this.disableDeleteForTesting = deleteDisabled;
   }
 
+  @VisibleForTesting
+  public void delete(ReplicateRefUpdate r) {
+    new Task(r).delete();
+  }
+
   public synchronized void start(PushOne push) {
     for (String ref : push.getRefs()) {
       new Task(new ReplicateRefUpdate(push, ref)).start();
@@ -264,6 +269,15 @@
       }
     }
 
+    public void delete() {
+      try {
+        Files.deleteIfExists(waiting);
+        Files.deleteIfExists(running);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+      }
+    }
+
     private void rename(Path from, Path to) {
       try {
         logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 653e5a1..08b8c65 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -48,6 +48,7 @@
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
@@ -513,6 +514,46 @@
     return pushOne == null ? false : pushOne.isRetrying();
   }
 
+  @Test
+  public void shouldFirePendingOnlyToStoredUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject(project + suffix1);
+    Project.NameKey target2 = createTestProject(project + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE, false);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false);
+    reloadConfig();
+
+    String changeRef = createChange().getPatchSet().refName();
+
+    tasksStorage.disableDeleteForTesting(false);
+    changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+    tasksStorage.disableDeleteForTesting(true);
+
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+    assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+
+    assertThat(isPushCompleted(target2, changeRef, TEST_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, changeRef, TEST_TIMEOUT)).isEqualTo(false);
+  }
+
+  public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+    try (Repository repo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot open repo for project" + project, e);
+    }
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
@@ -528,30 +569,47 @@
 
   private void setReplicationDestination(
       String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
-    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, false);
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, false);
   }
 
   private void setReplicationDestination(
       String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
       throws IOException {
-    setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, mirror);
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, mirror);
+  }
+
+  private void setReplicationDestination(
+      String remoteName,
+      String replicaSuffix,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
+      throws IOException {
+    setReplicationDestination(
+        remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
   }
 
   private void setReplicationDestination(
       String remoteName, List<String> replicaSuffixes, Optional<String> project)
       throws IOException {
-    setReplicationDestination(remoteName, replicaSuffixes, project, false);
+    setReplicationDestination(remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
   }
 
   private void setReplicationDestination(
-      String remoteName, List<String> replicaSuffixes, Optional<String> project, boolean mirror)
+      String remoteName,
+      List<String> replicaSuffixes,
+      Optional<String> project,
+      int replicationDelay,
+      boolean mirror)
       throws IOException {
     List<String> replicaUrls =
         replicaSuffixes.stream()
             .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
             .collect(toList());
     config.setStringList("remote", remoteName, "url", replicaUrls);
-    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
     config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
     config.setBoolean("remote", remoteName, "mirror", mirror);
     project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
@@ -597,6 +655,13 @@
     return plugin.getSysInjector().getInstance(classObj);
   }
 
+  private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+      String changeRef, String remote) {
+    return tasksStorage.list().stream()
+        .filter(task -> changeRef.equals(task.ref))
+        .filter(task -> remote.equals(task.remote));
+  }
+
   private Project.NameKey createTestProject(String name) throws Exception {
     return projectOperations.newProject().name(name).create();
   }