Merge branch 'stable-3.0' into stable-3.1
* stable-3.0:
Only fire the specified pending event URI
Adapt to the new refactored code of the config
and destinations in stable-3.1 and remove unused
methods coming from the merge with stable-3.0.
Change-Id: Ica9da7c735bbd965bc6704dec35419dba126ab66
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 85c3917..985056b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -595,6 +595,10 @@
}
}
+ boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
+ return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
+ }
+
boolean wouldPushProject(Project.NameKey project) {
if (!shouldReplicate(project)) {
repLog.debug("Skipping replication of project {}", project.get());
@@ -647,6 +651,16 @@
return config.replicateProjectDeletions();
}
+ private boolean matches(URIish uri, Project.NameKey project) {
+ for (URIish configUri : config.getRemoteConfig().getURIs()) {
+ URIish projectUri = getURI(configUri, project);
+ if (uri.equals(projectUri)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
List<URIish> getURIs(Project.NameKey project, String urlMatch) {
List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
for (URIish configUri : config.getRemoteConfig().getURIs()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index fcf5dfa..20c5bf3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -38,6 +38,7 @@
import com.googlesource.gerrit.plugins.replication.Destination.Factory;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -156,6 +157,17 @@
}
@Override
+ public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+ List<Destination> dests = new ArrayList<>();
+ for (Destination dest : getAll(FilterType.ALL)) {
+ if (dest.wouldPush(uri, project, ref)) {
+ dests.add(dest);
+ }
+ }
+ return dests;
+ }
+
+ @Override
public boolean isEmpty() {
return destinations.isEmpty();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index b191d7d..18ccc66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -44,6 +44,16 @@
*/
List<Destination> getAll(FilterType filterType);
+ /**
+ * Return the active replication destinations for a uri/project/ref triplet.
+ *
+ * @param uriish uri of the destinations
+ * @param project name of the project
+ * @param ref ref name
+ * @return the list of active destinations
+ */
+ List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
/** @return true if there are no destinations, false otherwise. */
boolean isEmpty();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5fb97bd..6a89e80 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -31,6 +31,7 @@
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Queue;
@@ -144,6 +145,14 @@
}
}
+ private void fire(URIish uri, Project.NameKey project, String refName) {
+ ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
+ dest.schedule(project, refName, uri, state);
+ }
+ state.markAllPushTasksScheduled();
+ }
+
@UsedAt(UsedAt.Project.COLLABNET)
public void pushReference(Destination cfg, Project.NameKey project, String refName) {
pushReference(cfg, project, null, refName, null, true);
@@ -177,14 +186,16 @@
private void firePendingEvents() {
replaying = true;
try {
- Set<String> eventsReplayed = new HashSet<>();
replaying = true;
for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
- String eventKey = String.format("%s:%s", t.project, t.ref);
- if (!eventsReplayed.contains(eventKey)) {
- repLog.info("Firing pending task {}", eventKey);
- fire(t.project, t.ref);
- eventsReplayed.add(eventKey);
+ if (t == null) {
+ repLog.warn("Encountered null replication event in ReplicationTasksStorage");
+ continue;
+ }
+ try {
+ fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
+ } catch (URISyntaxException e) {
+ repLog.error("Encountered malformed URI for persisted event %s", t);
}
}
} finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index d15f02d..98bc40f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -109,6 +109,11 @@
this.disableDeleteForTesting = deleteDisabled;
}
+ @VisibleForTesting
+ public void delete(ReplicateRefUpdate r) {
+ new Task(r).delete();
+ }
+
public synchronized void start(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).start();
@@ -250,6 +255,15 @@
}
}
+ public void delete() {
+ try {
+ Files.deleteIfExists(waiting);
+ Files.deleteIfExists(running);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+ }
+ }
+
private void rename(Path from, Path to) {
try {
logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index d607d86..60f55ac 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -447,6 +447,58 @@
waitUntil(() -> tasksStorage.listRunning().size() == 0);
}
+ @Test
+ public void shouldFirePendingOnlyToStoredUri() throws Exception {
+ String suffix1 = "replica1";
+ String suffix2 = "replica2";
+ Project.NameKey target1 = createTestProject(project + suffix1);
+ Project.NameKey target2 = createTestProject(project + suffix2);
+ String remote1 = "foo1";
+ String remote2 = "foo2";
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+ reloadConfig();
+
+ Result pushResult = createChange();
+ String sourceRef = pushResult.getPatchSet().refName();
+
+ replicationQueueStop();
+
+ tasksStorage.disableDeleteForTesting(false);
+ listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+ .filter(task -> remote1.equals(task.remote))
+ .forEach(u -> tasksStorage.delete(u));
+ tasksStorage.disableDeleteForTesting(true);
+
+ assertThat(
+ listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+ .filter(task -> remote2.equals(task.remote))
+ .collect(toList()))
+ .hasSize(1);
+
+ assertThat(
+ listReplicationTasks("refs/changes/\\d*/\\d*/\\d*").stream()
+ .filter(task -> remote1.equals(task.remote))
+ .collect(toList()))
+ .hasSize(0);
+
+ replicationQueueStart();
+
+ assertThat(isPushCompleted(target2, sourceRef, TEST_TIMEOUT)).isEqualTo(true);
+ assertThat(isPushCompleted(target1, sourceRef, TEST_TIMEOUT)).isEqualTo(false);
+ }
+
+ public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+ try (Repository repo = repoManager.openRepository(project)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+ return true;
+ } catch (InterruptedException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot open repo for project" + project, e);
+ }
+ }
+
private Ref getRef(Repository repo, String branchName) throws IOException {
return repo.getRefDatabase().exactRef(branchName);
}