Only fire the specified pending event URI

Previously the startup firing of pending events would fire every URI for
a project ref combination on startup. To avoid duplicates, it only ever
fired one round of every URI per project/ref combination. This had the
side effect that if only a single URI were stored, presumably because
the other URIs were completed before shutdown, this would result in the
creation of way more replication events than necessary, presumably many
duplicates of already completed pushes. Fix this behavior by only firing
to the specific stored URI, and remove the duplicate project/ref
filtering since that now would prevent firing to more than one URI for
the same project/ref combination when there actually are stored events
for multiple URIs. Add a test to confirm the correct new more limiting
behavior.

Bug: Issue 12779
Change-Id: I56d314af2ecbf84362dda099fa28f1b8f82cefa7
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 02daa6d..8b2301b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -17,6 +17,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.FileUtil;
 import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
@@ -26,6 +27,7 @@
 import java.nio.file.Path;
 import java.util.List;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.transport.URIish;
 
 @Singleton
 public class AutoReloadConfigDecorator implements ReplicationConfig {
@@ -72,6 +74,12 @@
   }
 
   @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    reloadIfNeeded();
+    return currentConfig.getDestinations(uri, project, ref);
+  }
+
+  @Override
   public synchronized List<Destination> getDestinations(FilterType filterType) {
     reloadIfNeeded();
     return currentConfig.getDestinations(filterType);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index e966751..545b537 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -588,6 +588,10 @@
     return op != null && op.getRefs().contains(ref);
   }
 
+  boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
+    return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
+  }
+
   boolean wouldPushProject(Project.NameKey project) {
     if (!shouldReplicate(project)) {
       repLog.debug("Skipping replication of project {}", project.get());
@@ -654,6 +658,16 @@
     return config.replicateProjectDeletions();
   }
 
+  private boolean matches(URIish uri, Project.NameKey project) {
+    for (URIish configUri : config.getRemoteConfig().getURIs()) {
+      URIish projectUri = getURI(configUri, project);
+      if (uri.equals(projectUri)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   List<URIish> getURIs(Project.NameKey project, String urlMatch) {
     List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
     for (URIish configUri : config.getRemoteConfig().getURIs()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index c9531e3..ee671f5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -13,9 +13,11 @@
 // limitations under the License.
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.git.WorkQueue;
 import java.nio.file.Path;
 import java.util.List;
+import org.eclipse.jgit.transport.URIish;
 
 public interface ReplicationConfig {
 
@@ -25,6 +27,8 @@
     ALL
   }
 
+  List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
   List<Destination> getDestinations(FilterType filterType);
 
   boolean isReplicateAllOnPluginStart();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 683fb9b..68db6c6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
@@ -30,6 +31,7 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -68,6 +70,17 @@
     this.pluginDataDir = pluginDataDir;
   }
 
+  @Override
+  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+    List<Destination> dests = new ArrayList<>();
+    for (Destination dest : getDestinations(FilterType.ALL)) {
+      if (dest.wouldPush(uri, project, ref)) {
+        dests.add(dest);
+      }
+    }
+    return dests;
+  }
+
   /*
    * (non-Javadoc)
    * @see
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 936d704..e704978 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -167,6 +167,14 @@
     state.markAllPushTasksScheduled();
   }
 
+  private void fire(URIish uri, Project.NameKey project, String refName) {
+    ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    for (Destination dest : config.getDestinations(uri, project, refName)) {
+      dest.schedule(project, refName, uri, state);
+    }
+    state.markAllPushTasksScheduled();
+  }
+
   @UsedAt(UsedAt.Project.COLLABNET)
   public void pushReference(Destination cfg, Project.NameKey project, String refName) {
     pushReference(cfg, project, refName, null);
@@ -196,18 +204,16 @@
 
   private void firePendingEvents() {
     try {
-      Set<String> eventsReplayed = new HashSet<>();
       replaying = true;
       for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
         if (t == null) {
           repLog.warn("Encountered null replication event in ReplicationTasksStorage");
           continue;
         }
-        String eventKey = String.format("%s:%s", t.project, t.ref);
-        if (!eventsReplayed.contains(eventKey)) {
-          repLog.info("Firing pending task {}", eventKey);
-          onGitReferenceUpdated(t.project, t.ref);
-          eventsReplayed.add(eventKey);
+        try {
+          fire(new URIish(t.uri), new Project.NameKey(t.project), t.ref);
+        } catch (URISyntaxException e) {
+          repLog.error("Encountered malformed URI for persisted event %s", t);
         }
       }
     } finally {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 64430cf..aa15e3a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -362,10 +362,62 @@
     }
   }
 
+  @Test
+  public void shouldFirePendingOnlyToStoredUri() throws Exception {
+    String suffix1 = "replica1";
+    String suffix2 = "replica2";
+    Project.NameKey target1 = createTestProject("project" + suffix1);
+    Project.NameKey target2 = createTestProject("project" + suffix2);
+    String remote1 = "foo1";
+    String remote2 = "foo2";
+    setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+    setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+    reloadConfig();
+
+    Result pushResult = createChange();
+    String sourceRef = pushResult.getPatchSet().getRefName();
+
+    replicationQueueStop();
+
+    tasksStorage.disableDeleteForTesting(false);
+    listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+        .filter(task -> remote1.equals(task.remote))
+        .forEach(u -> tasksStorage.delete(u));
+    tasksStorage.disableDeleteForTesting(true);
+
+    assertThat(
+            listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+                .filter(task -> remote2.equals(task.remote))
+                .collect(toList()))
+        .hasSize(1);
+
+    assertThat(
+            listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+                .filter(task -> remote1.equals(task.remote))
+                .collect(toList()))
+        .hasSize(0);
+
+    replicationQueueStart();
+
+    assertThat(isPushCompleted(target2, sourceRef, TEST_TIMEOUT)).isEqualTo(true);
+    assertThat(isPushCompleted(target1, sourceRef, TEST_TIMEOUT)).isEqualTo(false);
+  }
+
   private Project.NameKey createTestProject(String name) throws Exception {
     return createProject(name);
   }
 
+  public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+    try (Repository repo = repoManager.openRepository(project)) {
+      WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot open repo for project" + project, e);
+    }
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
@@ -417,6 +469,22 @@
     plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
   }
 
+  private void replicationQueueStart() {
+    getReplicationQueueInstance().start();
+  }
+
+  private void replicationQueueStop() {
+    getReplicationQueueInstance().stop();
+  }
+
+  private ReplicationQueue getReplicationQueueInstance() {
+    return getInstance(ReplicationQueue.class);
+  }
+
+  private <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
   private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
     Pattern refmaskPattern = Pattern.compile(refRegex);
     return tasksStorage.list().stream()