Merge branch 'stable-2.16' into stable-3.0

* stable-2.16:
  Only fire the specified pending event URI

Change-Id: Ib800603d830c9b4ba688b0222ac5642ad50f17a0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 945f869..a43d7d9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -87,6 +87,12 @@
   }
 
   @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    reloadIfNeeded();
+    return currentConfig.getDestinations(uri, project, ref);
+  }
+
+  @Override
   public synchronized List<Destination> getDestinations(FilterType filterType) {
     return currentConfig.getDestinations(filterType);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 432f8a5..d68739a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -610,6 +610,10 @@
     return op != null && op.getRefs().contains(ref);
   }
 
+  boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
+    return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       repLog.debug("Skipping replication of project {}", project.get());
@@ -676,6 +680,16 @@
     return config.replicateProjectDeletions();
   }
 
+  private boolean matches(URIish uri, Project.NameKey project) {
+    for (URIish configUri : config.getRemoteConfig().getURIs()) {
+      URIish projectUri = getURI(configUri, project);
+      if (uri.equals(projectUri)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   List<URIish> getURIs(Project.NameKey project, String urlMatch) {
     List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
     for (URIish configUri : config.getRemoteConfig().getURIs()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 929c538..ccdead8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -29,6 +29,8 @@
     ALL
   }
 
+  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
   List<Destination> getDestinations(FilterType filterType);
 
   Multimap<Destination, URIish> getURIs(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 45fa150..4e6299a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -39,6 +39,7 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -79,6 +80,17 @@
     this.pluginDataDir = pluginDataDir;
   }
 
+  @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    List<Destination> dests = new ArrayList<>();
+    for (Destination dest : getDestinations(FilterType.ALL)) {
+      if (dest.wouldPush(uri, project, ref)) {
+        dests.add(dest);
+      }
+    }
+    return dests;
+  }
+
   /*
    * (non-Javadoc)
    * @see
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index cad7623..cf344f8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -30,6 +30,7 @@
 import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Queue;
@@ -145,6 +146,14 @@
     state.markAllPushTasksScheduled();
   }
 
+  private void fire(URIish uri, Project.NameKey project, String refName) {
+    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    for (Destination dest : config.getDestinations(uri, project, refName)) {
+      dest.schedule(project, refName, uri, state);
+    }
+    state.markAllPushTasksScheduled();
+  }
+
   @UsedAt(UsedAt.Project.COLLABNET)
   public void pushReference(Destination cfg, Project.NameKey project, String refName) {
     pushReference(cfg, project, refName, null);
@@ -175,18 +184,16 @@
   private void firePendingEvents() {
     replaying = true;
     try {
-      Set<String> eventsReplayed = new HashSet<>();
       replaying = true;
       for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
         if (t == null) {
           repLog.warn("Encountered null replication event in ReplicationTasksStorage");
           continue;
         }
-        String eventKey = String.format("%s:%s", t.project, t.ref);
-        if (!eventsReplayed.contains(eventKey)) {
-          repLog.info("Firing pending task {}", eventKey);
-          onGitReferenceUpdated(t.project, t.ref);
-          eventsReplayed.add(eventKey);
+        try {
+          fire(new URIish(t.uri), new Project.NameKey(t.project), t.ref);
+        } catch (URISyntaxException e) {
+          repLog.error("Encountered malformed URI for persisted event %s", t);
         }
       }
     } finally {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 7991216..b7841f5 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -431,6 +431,58 @@
     }
   }
 
+  @Test
+  public void shouldFirePendingOnlyToStoredUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject(project + suffix1);
+    Project.NameKey target2 = createTestProject(project + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    replicationQueueStop();
+
+    tasksStorage.disableDeleteForTesting(false);
+    listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+        .filter(task -> remote1.equals(task.remote))
+        .forEach(u -> tasksStorage.delete(u));
+    tasksStorage.disableDeleteForTesting(true);
+
+    assertThat(
+            listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+                .filter(task -> remote2.equals(task.remote))
+                .collect(toList()))
+        .hasSize(1);
+
+    assertThat(
+            listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+                .filter(task -> remote1.equals(task.remote))
+                .collect(toList()))
+        .hasSize(0);
+
+    replicationQueueStart();
+
+    assertThat(isPushCompleted(target2, sourceRef, TEST_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, sourceRef, TEST_TIMEOUT)).isEqualTo(false);
+  }
+
+  public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+    try (Repository repo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot open repo for project" + project, e);
+    }
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }