Merge "ReplicationTasksStorage: Add multi-primary unit tests" into stable-3.1
diff --git a/BUILD b/BUILD
index 50615d8..72a3fc8 100644
--- a/BUILD
+++ b/BUILD
@@ -23,6 +23,18 @@
name = "replication_tests",
srcs = glob([
"src/test/java/**/*Test.java",
+ ]),
+ tags = ["replication"],
+ visibility = ["//visibility:public"],
+ deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+ ":replication__plugin",
+ ":replication_util",
+ ],
+)
+
+junit_tests(
+ name = "replication_it",
+ srcs = glob([
"src/test/java/**/*IT.java",
]),
tags = ["replication"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 6a89e80..0a34d11 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -81,7 +81,9 @@
destinations.get().startup(workQueue);
running = true;
replicationTasksStorage.resetAll();
- firePendingEvents();
+ Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
+ t.setDaemon(true);
+ t.start();
fireBeforeStartupEvents();
}
}
@@ -198,6 +200,8 @@
repLog.error("Encountered malformed URI for persisted event %s", t);
}
}
+ } catch (Throwable e) {
+ repLog.error("Unexpected error while firing pending events", e);
} finally {
replaying = false;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 048767f..ead218b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -60,8 +60,6 @@
public class ReplicationTasksStorage {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private boolean disableDeleteForTesting;
-
public static class ReplicateRefUpdate {
public final String project;
public final String ref;
@@ -103,11 +101,6 @@
return new Task(r).create();
}
- @VisibleForTesting
- public void disableDeleteForTesting(boolean deleteDisabled) {
- this.disableDeleteForTesting = deleteDisabled;
- }
-
public synchronized void start(UriUpdates uriUpdates) {
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
new Task(update).start();
@@ -132,11 +125,11 @@
}
}
- public synchronized List<ReplicateRefUpdate> listWaiting() {
+ public List<ReplicateRefUpdate> listWaiting() {
return list(createDir(waitingUpdates));
}
- public synchronized List<ReplicateRefUpdate> listRunning() {
+ public List<ReplicateRefUpdate> listRunning() {
return list(createDir(runningUpdates));
}
@@ -226,11 +219,6 @@
}
public void finish() {
- if (disableDeleteForTesting) {
- logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
- return;
- }
-
try {
logger.atFine().log("DELETE %s %s", running, updateLog());
Files.delete(running);
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index 8291421..e12ec92 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -11,8 +11,7 @@
ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
[--now]
[--wait]
- [--url <PATTERN>]
- {--all | <PROJECT PATTERN> ...}
+ {--url <PATTERN> | [--url <PATTERN>] --all | [--url <PATTERN>] <PROJECT PATTERN> ...}
```
DESCRIPTION
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index f06b933..c32a55d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -86,7 +86,6 @@
storagePath = pluginDataDir.resolve("ref-updates");
tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
cleanupReplicationTasks();
- tasksStorage.disableDeleteForTesting(true);
}
@After
@@ -156,13 +155,13 @@
@Test
public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
- createTestProject(project + "replica1");
- createTestProject(project + "replica2");
- setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
- setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
- config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
- config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+ FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+ dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ dest1.save();
+ dest2.save();
reloadConfig();
createChange();
@@ -191,7 +190,7 @@
setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
}
- private void setReplicationDestination(
+ private FileBasedConfig setReplicationDestination(
String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
throws IOException {
FileBasedConfig remoteConfig =
@@ -200,6 +199,7 @@
FS.DETECTED);
setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
+ return remoteConfig;
}
private void setAutoReload() throws IOException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index bfabc9f..3c6ab0c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -65,9 +65,14 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int TEST_REPLICATION_DELAY = 1;
private static final int TEST_REPLICATION_RETRY = 1;
+ private static final int TEST_PROJECT_CREATION_SECONDS = 10;
private static final Duration TEST_TIMEOUT =
Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
+ private static final Duration TEST_NEW_PROJECT_TIMEOUT =
+ Duration.ofSeconds(
+ (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + TEST_PROJECT_CREATION_SECONDS);
+
@Inject private SitePaths sitePaths;
@Inject private ProjectOperations projectOperations;
@Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
@@ -95,7 +100,6 @@
storagePath = pluginDataDir.resolve("ref-updates");
tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
cleanupReplicationTasks();
- tasksStorage.disableDeleteForTesting(true);
}
@Test
@@ -105,9 +109,9 @@
Project.NameKey sourceProject = createTestProject("foo");
- assertThat(listIncompleteTasks("refs/meta/config")).hasSize(1);
-
- waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
+ WaitUtil.waitUntil(
+ () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
+ TEST_NEW_PROJECT_TIMEOUT);
ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
assertThat(replicaProject).isNotNull();
@@ -152,8 +156,6 @@
RevCommit sourceCommit = pushResult.getCommit();
String sourceRef = pushResult.getPatchSet().refName();
- assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
-
try (Repository repo = repoManager.openRepository(targetProject)) {
waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -175,8 +177,6 @@
input.revision = master;
gApi.projects().name(project.get()).branch(newBranch).create(input);
- assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
-
try (Repository repo = repoManager.openRepository(targetProject);
Repository sourceRepo = repoManager.openRepository(project)) {
waitUntil(() -> checkedGetRef(repo, newBranch) != null);
@@ -201,8 +201,6 @@
RevCommit sourceCommit = pushResult.getCommit();
String sourceRef = pushResult.getPatchSet().refName();
- assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
-
try (Repository repo1 = repoManager.openRepository(targetProject1);
Repository repo2 = repoManager.openRepository(targetProject2)) {
waitUntil(
@@ -225,10 +223,12 @@
createTestProject(project + "replica1");
createTestProject(project + "replica2");
- setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
- setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
- config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
- config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+ FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+ dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ dest1.save();
+ dest2.save();
reloadConfig();
createChange();
@@ -251,7 +251,7 @@
.getInstance(ReplicationQueue.class)
.scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
- assertThat(listIncompleteTasks(".*all.*")).hasSize(1);
+ assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
}
@Test
@@ -269,10 +269,8 @@
.getInstance(ReplicationQueue.class)
.scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
- assertThat(listIncompleteTasks(".*all.*")).hasSize(1);
- for (ReplicationTasksStorage.ReplicateRefUpdate task : listIncompleteTasks()) {
- assertThat(task.uri).isEqualTo(expectedURI);
- }
+ assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
+ streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
}
@Test
@@ -291,9 +289,7 @@
.scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
assertThat(listIncompleteTasks()).hasSize(1);
- for (ReplicationTasksStorage.ReplicateRefUpdate task : listIncompleteTasks()) {
- assertThat(task.uri).isEqualTo(expectedURI);
- }
+ streamIncompleteTasks().forEach((task) -> assertThat(task.uri).isEqualTo(expectedURI));
}
@Test
@@ -329,8 +325,6 @@
}
private void replicateBranchDeletion(boolean mirror) throws Exception {
- tasksStorage.disableDeleteForTesting(false);
-
setReplicationDestination("foo", "replica", ALL_PROJECTS, mirror);
reloadConfig();
@@ -436,7 +430,6 @@
@Test
public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
- tasksStorage.disableDeleteForTesting(false);
setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
config.save();
@@ -462,7 +455,6 @@
String changeRef = createChange().getPatchSet().refName();
- tasksStorage.disableDeleteForTesting(false);
changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
.forEach(
(update) -> {
@@ -473,7 +465,6 @@
} catch (URISyntaxException e) {
}
});
- tasksStorage.disableDeleteForTesting(true);
setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
@@ -534,13 +525,14 @@
remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
}
- private void setReplicationDestination(
+ private FileBasedConfig setReplicationDestination(
String remoteName, List<String> replicaSuffixes, Optional<String> project)
throws IOException {
- setReplicationDestination(remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
+ return setReplicationDestination(
+ remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
}
- private void setReplicationDestination(
+ private FileBasedConfig setReplicationDestination(
String remoteName,
List<String> replicaSuffixes,
Optional<String> project,
@@ -558,6 +550,7 @@
project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
config.setBoolean("gerrit", null, "autoReload", true);
config.save();
+ return config;
}
private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index fecbd31..97992c8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -71,6 +71,14 @@
}
@Test
+ public void canStartWaitingUpdate() throws Exception {
+ storage.create(REF_UPDATE);
+ storage.start(uriUpdates);
+ assertThat(storage.listWaiting()).isEmpty();
+ assertContainsExactly(storage.listRunning(), REF_UPDATE);
+ }
+
+ @Test
public void canFinishRunningUpdate() throws Exception {
storage.create(REF_UPDATE);
storage.start(uriUpdates);
@@ -92,6 +100,8 @@
storage.start(uriUpdates);
assertThat(storage.listWaiting()).isEmpty();
assertThat(persistedView.listWaiting()).isEmpty();
+ assertContainsExactly(storage.listRunning(), REF_UPDATE);
+ assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
storage.finish(uriUpdates);
assertThat(storage.listRunning()).isEmpty();