Merge branch 'stable-3.1' into stable-3.2
* stable-3.1:
Improve readability of shouldFirePendingOnlyToStoredUri test
Fix flakiness in ReplicationIT for pending events firing
Only fire the specified pending event URI
Change-Id: Ie83763e4a9fe13522f356b569fc2360fa5883224
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index adc3133..8f04c74 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -609,6 +609,10 @@
return names;
}
+ boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
+ return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref);
+ }
+
boolean wouldPushProject(Project.NameKey project) {
if (!shouldReplicate(project)) {
repLog.atFine().log("Skipping replication of project %s", project.get());
@@ -662,6 +666,16 @@
return config.replicateProjectDeletions();
}
+ private boolean matches(URIish uri, Project.NameKey project) {
+ for (URIish configUri : config.getRemoteConfig().getURIs()) {
+ URIish projectUri = getURI(configUri, project);
+ if (uri.equals(projectUri)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
List<URIish> getURIs(Project.NameKey project, String urlMatch) {
List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
for (URIish configUri : config.getRemoteConfig().getURIs()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index ecfbb8e..ac14657 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -38,6 +38,7 @@
import com.googlesource.gerrit.plugins.replication.Destination.Factory;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -157,6 +158,17 @@
}
@Override
+ public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+ List<Destination> dests = new ArrayList<>();
+ for (Destination dest : getAll(FilterType.ALL)) {
+ if (dest.wouldPush(uri, project, ref)) {
+ dests.add(dest);
+ }
+ }
+ return dests;
+ }
+
+ @Override
public boolean isEmpty() {
return destinations.isEmpty();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index b191d7d..18ccc66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -44,6 +44,16 @@
*/
List<Destination> getAll(FilterType filterType);
+ /**
+ * Return the active replication destinations for a uri/project/ref triplet.
+ *
+ * @param uriish uri of the destinations
+ * @param project name of the project
+ * @param ref ref name
+ * @return the list of active destinations
+ */
+ List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+
/** @return true if there are no destinations, false otherwise. */
boolean isEmpty();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 21e4227..e685237 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -34,6 +34,7 @@
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Queue;
@@ -154,6 +155,14 @@
}
}
+ private void fire(URIish uri, Project.NameKey project, String refName) {
+ ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
+ dest.schedule(project, refName, uri, state);
+ }
+ state.markAllPushTasksScheduled();
+ }
+
@UsedAt(UsedAt.Project.COLLABNET)
public void pushReference(Destination cfg, Project.NameKey project, String refName) {
pushReference(cfg, project, null, refName, null, true, false);
@@ -190,14 +199,16 @@
private void firePendingEvents() {
replaying = true;
try {
- Set<String> eventsReplayed = new HashSet<>();
replaying = true;
for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
- String eventKey = String.format("%s:%s", t.project, t.ref);
- if (!eventsReplayed.contains(eventKey)) {
- repLog.atInfo().log("Firing pending task %s", eventKey);
- fire(t.project, t.ref, true);
- eventsReplayed.add(eventKey);
+ if (t == null) {
+ repLog.atWarning().log("Encountered null replication event in ReplicationTasksStorage");
+ continue;
+ }
+ try {
+ fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
+ } catch (URISyntaxException e) {
+ repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
}
}
} finally {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 506b175..bfd0886 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -112,6 +112,11 @@
this.disableDeleteForTesting = deleteDisabled;
}
+ @VisibleForTesting
+ public void delete(ReplicateRefUpdate r) {
+ new Task(r).delete();
+ }
+
public synchronized void start(PushOne push) {
for (String ref : push.getRefs()) {
new Task(new ReplicateRefUpdate(push, ref)).start();
@@ -264,6 +269,15 @@
}
}
+ public void delete() {
+ try {
+ Files.deleteIfExists(waiting);
+ Files.deleteIfExists(running);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey);
+ }
+ }
+
private void rename(Path from, Path to) {
try {
logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 653e5a1..08b8c65 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -48,6 +48,7 @@
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
@@ -513,6 +514,46 @@
return pushOne == null ? false : pushOne.isRetrying();
}
+ @Test
+ public void shouldFirePendingOnlyToStoredUri() throws Exception {
+ String suffix1 = "replica1";
+ String suffix2 = "replica2";
+ Project.NameKey target1 = createTestProject(project + suffix1);
+ Project.NameKey target2 = createTestProject(project + suffix2);
+ String remote1 = "foo1";
+ String remote2 = "foo2";
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE, false);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false);
+ reloadConfig();
+
+ String changeRef = createChange().getPatchSet().refName();
+
+ tasksStorage.disableDeleteForTesting(false);
+ changeReplicationTasksForRemote(changeRef, remote1).forEach(tasksStorage::delete);
+ tasksStorage.disableDeleteForTesting(true);
+
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+ reloadConfig();
+
+ assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+ assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+
+ assertThat(isPushCompleted(target2, changeRef, TEST_TIMEOUT)).isEqualTo(true);
+ assertThat(isPushCompleted(target1, changeRef, TEST_TIMEOUT)).isEqualTo(false);
+ }
+
+ public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+ try (Repository repo = repoManager.openRepository(project)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+ return true;
+ } catch (InterruptedException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot open repo for project" + project, e);
+ }
+ }
+
private Ref getRef(Repository repo, String branchName) throws IOException {
return repo.getRefDatabase().exactRef(branchName);
}
@@ -528,30 +569,47 @@
private void setReplicationDestination(
String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
- setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, false);
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, false);
}
private void setReplicationDestination(
String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
throws IOException {
- setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, mirror);
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, mirror);
+ }
+
+ private void setReplicationDestination(
+ String remoteName,
+ String replicaSuffix,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror)
+ throws IOException {
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
}
private void setReplicationDestination(
String remoteName, List<String> replicaSuffixes, Optional<String> project)
throws IOException {
- setReplicationDestination(remoteName, replicaSuffixes, project, false);
+ setReplicationDestination(remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
}
private void setReplicationDestination(
- String remoteName, List<String> replicaSuffixes, Optional<String> project, boolean mirror)
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror)
throws IOException {
List<String> replicaUrls =
replicaSuffixes.stream()
.map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
.collect(toList());
config.setStringList("remote", remoteName, "url", replicaUrls);
- config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
config.setBoolean("remote", remoteName, "mirror", mirror);
project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
@@ -597,6 +655,13 @@
return plugin.getSysInjector().getInstance(classObj);
}
+ private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+ String changeRef, String remote) {
+ return tasksStorage.list().stream()
+ .filter(task -> changeRef.equals(task.ref))
+ .filter(task -> remote.equals(task.remote));
+ }
+
private Project.NameKey createTestProject(String name) throws Exception {
return projectOperations.newProject().name(name).create();
}