Merge "Decouple observable queue implementation from ReplicationQueue"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index d12abc4..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -17,7 +17,6 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.gerrit.server.config.ConfigUtil;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.transport.RemoteConfig;
@@ -167,24 +166,6 @@
return cfg.getInt("remote", rc.getName(), name, defValue);
}
- boolean isSingleProjectMatch() {
- List<String> projects = getProjects();
- boolean ret = (projects.size() == 1);
- if (ret) {
- String projectMatch = projects.get(0);
- if (ReplicationFilter.getPatternType(projectMatch)
- != ReplicationFilter.PatternType.EXACT_MATCH) {
- // projectMatch is either regular expression, or wild-card.
- //
- // Even though they might refer to a single project now, they need not
- // after new projects have been created. Hence, we do not treat them as
- // matching a single project.
- ret = false;
- }
- }
- return ret;
- }
-
@Override
public int getSlowLatencyThreshold() {
return slowLatencyThreshold;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index aa683ff..b66e73c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.common.collect.ImmutableList;
+import java.util.List;
import org.eclipse.jgit.transport.RemoteConfig;
/** Remote configuration for a replication endpoint */
@@ -95,4 +96,27 @@
* @return the slow latency threshold
*/
int getSlowLatencyThreshold();
+
+ /**
+ * Whether the remote configuration is for a single project only
+ *
+ * @return true, when configuration is for a single project, false otherwise
+ */
+ default boolean isSingleProjectMatch() {
+ List<String> projects = getProjects();
+ boolean ret = (projects.size() == 1);
+ if (ret) {
+ String projectMatch = projects.get(0);
+ if (ReplicationFilter.getPatternType(projectMatch)
+ != ReplicationFilter.PatternType.EXACT_MATCH) {
+ // projectMatch is either regular expression, or wild-card.
+ //
+ // Even though they might refer to a single project now, they need not
+ // after new projects have been created. Hence, we do not treat them as
+ // matching a single project.
+ ret = false;
+ }
+ }
+ return ret;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 0d15d7c..e02fafc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -109,49 +109,43 @@
@VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
- if (!running) {
- stateLog.warn("Replication plugin did not finish startup before event", state);
- return;
- }
-
- for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
- if (cfg.wouldPushProject(project)) {
- for (URIish uri : cfg.getURIs(project, urlMatch)) {
- cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
- replicationTasksStorage.persist(
- new ReplicateRefUpdate(
- project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
- }
- }
- }
+ fire(project, urlMatch, PushOne.ALL_REFS, state, now);
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ fire(event.getProjectName(), event.getRefName());
}
- private void onGitReferenceUpdated(String projectName, String refName) {
+ private void fire(String projectName, String refName) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ fire(Project.nameKey(projectName), null, refName, state, false);
+ state.markAllPushTasksScheduled();
+ }
+
+ private void fire(
+ Project.NameKey project,
+ String urlMatch,
+ String refName,
+ ReplicationState state,
+ boolean now) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
- beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+ beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
return;
}
- Project.NameKey project = Project.nameKey(projectName);
for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
- for (URIish uri : cfg.getURIs(project, null)) {
+ for (URIish uri : cfg.getURIs(project, urlMatch)) {
replicationTasksStorage.persist(
- new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
- cfg.schedule(project, refName, uri, state);
+ new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ cfg.schedule(project, refName, uri, state, now);
}
}
}
- state.markAllPushTasksScheduled();
}
private void firePendingEvents() {
@@ -163,7 +157,7 @@
String eventKey = String.format("%s:%s", t.project, t.ref);
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", eventKey);
- onGitReferenceUpdated(t.project, t.ref);
+ fire(t.project, t.ref);
eventsReplayed.add(eventKey);
}
}
@@ -192,7 +186,7 @@
String eventKey = String.format("%s:%s", event.projectName(), event.refName());
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", event);
- onGitReferenceUpdated(event.projectName(), event.refName());
+ fire(event.projectName(), event.refName());
eventsReplayed.add(eventKey);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index cc1f10e..3b174a3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -116,7 +116,8 @@
public Task(ReplicateRefUpdate update) {
this.update = update;
json = GSON.toJson(update) + "\n";
- taskKey = sha1(json).name();
+ String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+ taskKey = sha1(key).name();
file = refUpdates().resolve(taskKey);
}