Merge "Decouple observable queue implementation from ReplicationQueue"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index d12abc4..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -17,7 +17,6 @@
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.gerrit.server.config.ConfigUtil;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
@@ -167,24 +166,6 @@
     return cfg.getInt("remote", rc.getName(), name, defValue);
   }
 
-  boolean isSingleProjectMatch() {
-    List<String> projects = getProjects();
-    boolean ret = (projects.size() == 1);
-    if (ret) {
-      String projectMatch = projects.get(0);
-      if (ReplicationFilter.getPatternType(projectMatch)
-          != ReplicationFilter.PatternType.EXACT_MATCH) {
-        // projectMatch is either regular expression, or wild-card.
-        //
-        // Even though they might refer to a single project now, they need not
-        // after new projects have been created. Hence, we do not treat them as
-        // matching a single project.
-        ret = false;
-      }
-    }
-    return ret;
-  }
-
   @Override
   public int getSlowLatencyThreshold() {
     return slowLatencyThreshold;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index aa683ff..b66e73c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -14,6 +14,7 @@
 package com.googlesource.gerrit.plugins.replication;
 
 import com.google.common.collect.ImmutableList;
+import java.util.List;
 import org.eclipse.jgit.transport.RemoteConfig;
 
 /** Remote configuration for a replication endpoint */
@@ -95,4 +96,27 @@
    * @return the slow latency threshold
    */
   int getSlowLatencyThreshold();
+
+  /**
+   * Whether the remote configuration is for a single project only
+   *
+   * @return true, when configuration is for a single project, false otherwise
+   */
+  default boolean isSingleProjectMatch() {
+    List<String> projects = getProjects();
+    boolean ret = (projects.size() == 1);
+    if (ret) {
+      String projectMatch = projects.get(0);
+      if (ReplicationFilter.getPatternType(projectMatch)
+          != ReplicationFilter.PatternType.EXACT_MATCH) {
+        // projectMatch is either regular expression, or wild-card.
+        //
+        // Even though they might refer to a single project now, they need not
+        // after new projects have been created. Hence, we do not treat them as
+        // matching a single project.
+        ret = false;
+      }
+    }
+    return ret;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 0d15d7c..e02fafc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -109,49 +109,43 @@
   @VisibleForTesting
   public void scheduleFullSync(
       Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
-    if (!running) {
-      stateLog.warn("Replication plugin did not finish startup before event", state);
-      return;
-    }
-
-    for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
-      if (cfg.wouldPushProject(project)) {
-        for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
-          replicationTasksStorage.persist(
-              new ReplicateRefUpdate(
-                  project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
-        }
-      }
-    }
+    fire(project, urlMatch, PushOne.ALL_REFS, state, now);
   }
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+    fire(event.getProjectName(), event.getRefName());
   }
 
-  private void onGitReferenceUpdated(String projectName, String refName) {
+  private void fire(String projectName, String refName) {
     ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+    fire(Project.nameKey(projectName), null, refName, state, false);
+    state.markAllPushTasksScheduled();
+  }
+
+  private void fire(
+      Project.NameKey project,
+      String urlMatch,
+      String refName,
+      ReplicationState state,
+      boolean now) {
     if (!running) {
       stateLog.warn(
           "Replication plugin did not finish startup before event, event replication is postponed",
           state);
-      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+      beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
       return;
     }
 
-    Project.NameKey project = Project.nameKey(projectName);
     for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
       if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
-        for (URIish uri : cfg.getURIs(project, null)) {
+        for (URIish uri : cfg.getURIs(project, urlMatch)) {
           replicationTasksStorage.persist(
-              new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
-          cfg.schedule(project, refName, uri, state);
+              new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+          cfg.schedule(project, refName, uri, state, now);
         }
       }
     }
-    state.markAllPushTasksScheduled();
   }
 
   private void firePendingEvents() {
@@ -163,7 +157,7 @@
         String eventKey = String.format("%s:%s", t.project, t.ref);
         if (!eventsReplayed.contains(eventKey)) {
           repLog.info("Firing pending task {}", eventKey);
-          onGitReferenceUpdated(t.project, t.ref);
+          fire(t.project, t.ref);
           eventsReplayed.add(eventKey);
         }
       }
@@ -192,7 +186,7 @@
       String eventKey = String.format("%s:%s", event.projectName(), event.refName());
       if (!eventsReplayed.contains(eventKey)) {
         repLog.info("Firing pending task {}", event);
-        onGitReferenceUpdated(event.projectName(), event.refName());
+        fire(event.projectName(), event.refName());
         eventsReplayed.add(eventKey);
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index cc1f10e..3b174a3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -116,7 +116,8 @@
     public Task(ReplicateRefUpdate update) {
       this.update = update;
       json = GSON.toJson(update) + "\n";
-      taskKey = sha1(json).name();
+      String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+      taskKey = sha1(key).name();
       file = refUpdates().resolve(taskKey);
     }