Merge branch 'stable-3.3' into master
* stable-3.3: (23 commits)
Move shouldCleanupBothTasksAndLocks* ITs to ReplicationStorageIT
ReplicationStorageIT: Wait for all pushes without order
Replication*IT: Share getRef method
ReplicationFanoutIT: Share setReplicationDestination
ReplicationFanoutIT: Split shouldReplicateNewBranch tests
ReplicationFanoutIT: Remove generic waitUntil helper
ReplicationFanoutIT: Inherit from ReplicationDaemon
ReplicationFanoutIT: Refactor setRemoteReplicationDestination
ReplicationFanoutIT: Rename setReplicationDestination
ReplicationFanoutIT: Cleanup shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair
Move shouldCleanupTasksAfterNewProjectReplication test
Fix documentation issue
Move storage portion of replicateBranchDeletion ITs
Refactor Replication*IT tests to share a base class
ReplicationIT: Add shouldMatch* e2e tests
ReplicationStorageIT: Move shouldMatch* tests from ReplicationIT
ReplicationTasksStorage: Add multi-primary unit tests
ReplicationTasksStorage: Add multi-primary unit tests
ReplicationStorageIT: Add shouldFire*ChangeRefs tests
Move storage-based ITs into ReplicationStorageIT
...
Change-Id: Ie32364ed45f4d19467fc8fd529290d62275698e2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index e277cb0..9932628 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,7 +17,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import com.google.gerrit.common.UsedAt;
import com.google.gerrit.entities.Project;
@@ -114,11 +113,6 @@
return replaying;
}
- void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
- scheduleFullSync(project, urlMatch, state, false);
- }
-
- @VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 5b993dd..7c35613 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -52,14 +52,11 @@
* <p><code>
* .../building/<tmp_name> new replication tasks under construction
* .../running/<sha1> running replication tasks
- * .../waiting/<task_sha1_NN_shard>/<task_sha1> outstanding replication tasks
+ * .../waiting/<task_sha1> outstanding replication tasks
* </code>
*
* <p>Tasks are moved atomically via a rename between those directories to indicate the current
* state of each task.
- *
- * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent
- * failures when moving tasks to and from the shard directories from different hosts concurrently.
*/
@Singleton
public class ReplicationTasksStorage {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
new file mode 100644
index 0000000..420cdf8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -0,0 +1,232 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+/**
+ * This class can be extended by any Replication*IT class and provides common setup and helper
+ * methods.
+ */
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationDaemon extends LightweightPluginDaemonTest {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ protected static final Optional<String> ALL_PROJECTS = Optional.empty();
+
+ protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+ protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
+ protected static final int TEST_PUSH_TIME_SECONDS = 1;
+ protected static final int TEST_PROJECT_CREATION_SECONDS = 10;
+ protected static final Duration TEST_PUSH_TIMEOUT =
+ Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+ protected static final Duration TEST_NEW_PROJECT_TIMEOUT =
+ Duration.ofSeconds(
+ (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+ + TEST_PROJECT_CREATION_SECONDS);
+
+ @Inject protected SitePaths sitePaths;
+ @Inject private ProjectOperations projectOperations;
+ protected Path gitPath;
+ protected FileBasedConfig config;
+
+ protected void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
+ throws IOException {
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS, mirror);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
+ throws IOException {
+ setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay)
+ throws IOException {
+ setReplicationDestination(remoteName, replicaSuffixes, project, replicationDelay, false);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
+ String replicaSuffix,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror)
+ throws IOException {
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror)
+ throws IOException {
+ setReplicationDestination(
+ config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+ config.setBoolean("gerrit", null, "autoReload", true);
+ config.save();
+ }
+
+ protected void setReplicationDestination(
+ FileBasedConfig config,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay)
+ throws IOException {
+ setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+ }
+
+ protected void setReplicationDestination(
+ FileBasedConfig remoteConfig,
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror)
+ throws IOException {
+
+ List<String> replicaUrls =
+ replicaSuffixes.stream()
+ .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+ .collect(toList());
+ remoteConfig.setStringList("remote", remoteName, "url", replicaUrls);
+ remoteConfig.setInt("remote", remoteName, "replicationDelay", replicationDelay);
+ remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
+ remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
+ project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+ remoteConfig.save();
+ }
+
+ protected Project.NameKey createTestProject(String name) throws Exception {
+ return projectOperations.newProject().name(name).create();
+ }
+
+ protected boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
+ try (Repository repo = repoManager.openRepository(project)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
+ return true;
+ } catch (InterruptedException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot open repo for project" + project, e);
+ }
+ }
+
+ protected boolean isPushCompleted(Map<Project.NameKey, String> refsByProject, Duration timeOut) {
+ try {
+ WaitUtil.waitUntil(
+ () -> {
+ Iterator<Map.Entry<Project.NameKey, String>> iterator =
+ refsByProject.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Project.NameKey, String> entry = iterator.next();
+ try (Repository repo = repoManager.openRepository(entry.getKey())) {
+ if (checkedGetRef(repo, entry.getValue()) != null) {
+ iterator.remove();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot open repo for project" + entry.getKey(), e);
+ }
+ }
+ return refsByProject.isEmpty();
+ },
+ timeOut);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ return true;
+ }
+
+ protected Ref checkedGetRef(Repository repo, String branchName) {
+ try {
+ return repo.getRefDatabase().exactRef(branchName);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+ return null;
+ }
+ }
+
+ protected Ref getRef(Repository repo, String branchName) throws IOException {
+ return repo.getRefDatabase().exactRef(branchName);
+ }
+
+ protected void reloadConfig() {
+ getAutoReloadConfigDecoratorInstance().reload();
+ }
+
+ protected AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+ return getInstance(AutoReloadConfigDecorator.class);
+ }
+
+ protected <T> T getInstance(Class<T> classObj) {
+ return plugin.getSysInjector().getInstance(classObj);
+ }
+
+ protected boolean nonEmptyProjectExists(Project.NameKey name) {
+ try (Repository r = repoManager.openRepository(name)) {
+ return !r.getAllRefsByPeeledObjectId().isEmpty();
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ protected void initConfig() throws IOException {
+ if (config == null) {
+ gitPath = sitePaths.site_path.resolve("git");
+ config =
+ new FileBasedConfig(
+ sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ config.save();
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 7f8282a..54afa5f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -17,32 +17,20 @@
import static com.google.common.truth.Truth.assertThat;
import static java.util.stream.Collectors.toList;
-import com.google.common.flogger.FluentLogger;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.PushOneCommit.Result;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.inject.Inject;
-import com.google.inject.Key;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.io.IOException;
-import java.nio.file.DirectoryStream;
import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.stream.Stream;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
@@ -55,37 +43,18 @@
@TestPlugin(
name = "replication",
sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationFanoutIT extends LightweightPluginDaemonTest {
- private static final Optional<String> ALL_PROJECTS = Optional.empty();
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private static final int TEST_REPLICATION_DELAY = 1;
- private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
-
- @Inject private SitePaths sitePaths;
- @Inject private ProjectOperations projectOperations;
- private Path pluginDataDir;
- private Path gitPath;
- private Path storagePath;
- private FileBasedConfig config;
+public class ReplicationFanoutIT extends ReplicationDaemon {
private ReplicationTasksStorage tasksStorage;
@Override
public void setUpTestPlugin() throws Exception {
- gitPath = sitePaths.site_path.resolve("git");
-
- config =
- new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
- setAutoReload();
+ initConfig();
+ config.setBoolean("gerrit", null, "autoReload", true);
config.save();
-
- setReplicationDestination("remote1", "suffix1", Optional.of("not-used-project"));
+ setReplicationDestinationRemoteConfig("remote1", "suffix1", Optional.of("not-used-project"));
super.setUpTestPlugin();
-
- pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
- storagePath = pluginDataDir.resolve("ref-updates");
tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
- cleanupReplicationTasks();
}
@After
@@ -98,7 +67,7 @@
@Test
public void shouldReplicateNewBranch() throws Exception {
- setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ setReplicationDestinationRemoteConfig("foo", "replica", ALL_PROJECTS);
reloadConfig();
Project.NameKey targetProject = createTestProject(project + "replica");
@@ -108,12 +77,9 @@
input.revision = master;
gApi.projects().name(project.get()).branch(newBranch).create(input);
- assertThat(listIncompleteTasks("refs/heads/(mybranch|master)")).hasSize(2);
-
+ isPushCompleted(targetProject, newBranch, TEST_PUSH_TIMEOUT);
try (Repository repo = repoManager.openRepository(targetProject);
Repository sourceRepo = repoManager.openRepository(project)) {
- waitUntil(() -> checkedGetRef(repo, newBranch) != null);
-
Ref masterRef = getRef(sourceRepo, master);
Ref targetBranchRef = getRef(repo, newBranch);
assertThat(targetBranchRef).isNotNull();
@@ -122,25 +88,39 @@
}
@Test
+ public void shouldReplicateNewBranchStorage() throws Exception {
+ setReplicationDestinationRemoteConfig("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ createTestProject(project + "replica");
+ String newBranch = "refs/heads/mybranch";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+ assertThat(listWaitingTasks("refs/heads/(mybranch|master)")).hasSize(2);
+ }
+
+ @Test
public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
Project.NameKey targetProject1 = createTestProject(project + "replica1");
Project.NameKey targetProject2 = createTestProject(project + "replica2");
- setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
- setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+ setReplicationDestinationRemoteConfig("foo1", "replica1", ALL_PROJECTS);
+ setReplicationDestinationRemoteConfig("foo2", "replica2", ALL_PROJECTS);
reloadConfig();
Result pushResult = createChange();
RevCommit sourceCommit = pushResult.getCommit();
String sourceRef = pushResult.getPatchSet().refName();
- assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
-
try (Repository repo1 = repoManager.openRepository(targetProject1);
Repository repo2 = repoManager.openRepository(targetProject2)) {
- waitUntil(
+ WaitUtil.waitUntil(
() ->
- (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+ (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null),
+ TEST_PUSH_TIMEOUT);
Ref targetBranchRef1 = getRef(repo1, sourceRef);
assertThat(targetBranchRef1).isNotNull();
@@ -153,119 +133,64 @@
}
@Test
- public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
- List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+ public void shouldReplicateNewBranchToTwoRemotesStorage() throws Exception {
+ createTestProject(project + "replica1");
+ createTestProject(project + "replica2");
- FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
- FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
- dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
- dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
- dest1.save();
- dest2.save();
+ setReplicationDestinationRemoteConfig("foo1", "replica1", ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestinationRemoteConfig("foo2", "replica2", ALL_PROJECTS, Integer.MAX_VALUE);
reloadConfig();
createChange();
- assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
-
- setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
- setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+ assertThat(listWaitingTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
}
- private Ref getRef(Repository repo, String branchName) throws IOException {
- return repo.getRefDatabase().exactRef(branchName);
+ @Test
+ public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+ List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+
+ setReplicationDestinationRemoteConfig("foo1", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestinationRemoteConfig("foo2", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ createChange();
+
+ assertThat(listWaitingTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
}
- private Ref checkedGetRef(Repository repo, String branchName) {
- try {
- return repo.getRefDatabase().exactRef(branchName);
- } catch (Exception e) {
- logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
- return null;
- }
- }
-
- private void setReplicationDestination(
+ private void setReplicationDestinationRemoteConfig(
String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
- setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+ setReplicationDestinationRemoteConfig(
+ remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY_SECONDS);
}
- private FileBasedConfig setReplicationDestination(
- String remoteName, List<String> replicaSuffixes, Optional<String> allProjects)
+ private void setReplicationDestinationRemoteConfig(
+ String remoteName, String replicaSuffix, Optional<String> project, int replicationDelay)
+ throws IOException {
+ setReplicationDestinationRemoteConfig(
+ remoteName, Arrays.asList(replicaSuffix), project, replicationDelay);
+ }
+
+ private void setReplicationDestinationRemoteConfig(
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> allProjects,
+ int replicationDelay)
throws IOException {
FileBasedConfig remoteConfig =
new FileBasedConfig(
sitePaths.etc_dir.resolve("replication/" + remoteName + ".config").toFile(),
FS.DETECTED);
- setReplicationDestination(remoteConfig, replicaSuffixes, allProjects);
- return remoteConfig;
+ setReplicationDestination(remoteConfig, replicaSuffixes, allProjects, replicationDelay);
}
- private void setAutoReload() throws IOException {
- config.setBoolean("gerrit", null, "autoReload", true);
- config.save();
- }
-
- private void setReplicationDestination(
- FileBasedConfig config, List<String> replicaSuffixes, Optional<String> project)
- throws IOException {
-
- List<String> replicaUrls =
- replicaSuffixes.stream()
- .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
- .collect(toList());
- config.setStringList("remote", null, "url", replicaUrls);
- config.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY);
- project.ifPresent(prj -> config.setString("remote", null, "projects", prj));
-
- config.save();
- }
-
- private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
- WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
- }
-
- private void reloadConfig() {
- getAutoReloadConfigDecoratorInstance().reload();
- }
-
- private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
- return getInstance(AutoReloadConfigDecorator.class);
- }
-
- private <T> T getInstance(Class<T> classObj) {
- return plugin.getSysInjector().getInstance(classObj);
- }
-
- private Project.NameKey createTestProject(String name) throws Exception {
- return projectOperations.newProject().name(name).create();
- }
-
- @SuppressWarnings(
- "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
- private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
+ private List<ReplicateRefUpdate> listWaitingTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
- synchronized (tasksStorage) {
- return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning())
- .filter(task -> refmaskPattern.matcher(task.ref()).matches())
- .collect(toList());
- }
- }
-
- public void cleanupReplicationTasks() throws IOException {
- cleanupReplicationTasks(storagePath);
- }
-
- private void cleanupReplicationTasks(Path basePath) throws IOException {
- try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
- for (Path path : files) {
- if (Files.isDirectory(path)) {
- cleanupReplicationTasks(path);
- } else {
- path.toFile().delete();
- }
- }
- }
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .collect(toList());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 55c6a9c..fdea8d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -17,103 +17,50 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.PushOneCommit.Result;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.api.changes.NotifyHandling;
import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
-import com.google.inject.Key;
-import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo;
-import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.URIish;
-import org.eclipse.jgit.util.FS;
+import org.eclipse.jgit.revwalk.RevWalk;
import org.junit.Test;
@UseLocalDisk
@TestPlugin(
name = "replication",
sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationIT extends LightweightPluginDaemonTest {
- private static final Optional<String> ALL_PROJECTS = Optional.empty();
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class ReplicationIT extends ReplicationDaemon {
private static final int TEST_REPLICATION_DELAY = 1;
private static final int TEST_REPLICATION_RETRY = 1;
- private static final int TEST_REPLICATION_MAX_RETRIES = 1;
private static final Duration TEST_TIMEOUT =
Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1);
- private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
- Duration.ofSeconds(
- (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) * TEST_REPLICATION_MAX_RETRIES
- + 10);
- private static final int TEST_PROJECT_CREATION_SECONDS = 10;
-
- private static final Duration TEST_NEW_PROJECT_TIMEOUT =
- Duration.ofSeconds(
- (TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + TEST_PROJECT_CREATION_SECONDS);
-
- @Inject private SitePaths sitePaths;
- @Inject private ProjectOperations projectOperations;
@Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
- private DestinationsCollection destinationCollection;
- private Path pluginDataDir;
- private Path gitPath;
- private Path storagePath;
- private FileBasedConfig config;
- private ReplicationConfig replicationConfig;
- private ReplicationTasksStorage tasksStorage;
@Override
public void setUpTestPlugin() throws Exception {
- gitPath = sitePaths.site_path.resolve("git");
-
- config =
- new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ initConfig();
setReplicationDestination(
"remote1",
"suffix1",
Optional.of("not-used-project")); // Simulates a full replication.config initialization
- config.save();
-
super.setUpTestPlugin();
-
- pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
- replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
- storagePath = pluginDataDir.resolve("ref-updates");
- tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
- destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
- cleanupReplicationTasks();
}
@Test
@@ -232,78 +179,55 @@
}
@Test
- public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
- List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
- createTestProject(project + "replica1");
- createTestProject(project + "replica2");
-
- FileBasedConfig dest1 = setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
- FileBasedConfig dest2 = setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
- dest1.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
- dest2.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY * 100);
- dest1.save();
- dest2.save();
- reloadConfig();
-
- createChange();
-
- assertThat(listIncompleteTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
-
- setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
- setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
- }
-
- @Test
- public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
- createTestProject(project + "replica");
-
- setReplicationDestination("foo", "replica", ALL_PROJECTS);
- reloadConfig();
-
- plugin
- .getSysInjector()
- .getInstance(ReplicationQueue.class)
- .scheduleFullSync(project, null, new ReplicationState(NO_OP), true);
-
- assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
- }
-
- @Test
public void shouldMatchTemplatedURL() throws Exception {
- createTestProject(project + "replica");
+ Project.NameKey targetProject = createTestProject(project + "replica");
setReplicationDestination("foo", "replica", ALL_PROJECTS);
reloadConfig();
+ String newRef = "refs/heads/newForTest";
+ ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
- String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
plugin
.getSysInjector()
.getInstance(ReplicationQueue.class)
.scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
- assertThat(listIncompleteTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
- streamIncompleteTasks().forEach((task) -> assertThat(task.uri()).isEqualTo(expectedURI));
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+ Ref targetBranchRef = getRef(repo, newRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
+ }
}
@Test
public void shouldMatchRealURL() throws Exception {
- createTestProject(project + "replica");
+ Project.NameKey targetProject = createTestProject(project + "replica");
setReplicationDestination("foo", "replica", ALL_PROJECTS);
reloadConfig();
+ String newRef = "refs/heads/newForTest";
+ ObjectId newRefTip = createNewBranchWithoutPush("refs/heads/master", newRef);
+
String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
- String expectedURI = urlMatch;
plugin
.getSysInjector()
.getInstance(ReplicationQueue.class)
.scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), true);
- assertThat(listIncompleteTasks()).hasSize(1);
- streamIncompleteTasks().forEach((task) -> assertThat(task.uri()).isEqualTo(expectedURI));
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, newRef) != null);
+
+ Ref targetBranchRef = getRef(repo, newRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(newRefTip);
+ }
}
@Test
@@ -349,16 +273,12 @@
input.revision = master;
gApi.projects().name(project.get()).branch(branchToDelete).create(input);
- assertThat(listIncompleteTasks("refs/heads/(todelete|master)")).hasSize(2);
-
try (Repository repo = repoManager.openRepository(targetProject)) {
waitUntil(() -> checkedGetRef(repo, branchToDelete) != null);
}
gApi.projects().name(project.get()).branch(branchToDelete).delete();
- assertThat(listIncompleteTasks(branchToDelete)).hasSize(1);
-
try (Repository repo = repoManager.openRepository(targetProject)) {
if (mirror) {
waitUntil(() -> checkedGetRef(repo, branchToDelete) == null);
@@ -442,182 +362,6 @@
}
}
- @Test
- public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
- setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
- config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
- config.save();
- reloadConfig();
- assertThat(listRunning()).hasSize(0);
- Project.NameKey sourceProject = createTestProject("task_cleanup_project");
-
- waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
- waitUntil(() -> listRunning().size() == 0);
- }
-
- @Test
- public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception {
- setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS);
- config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
- config.save();
- reloadConfig();
- assertThat(listRunning()).hasSize(0);
- Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
-
- waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")));
- waitUntil(() -> isTaskCleanedUp());
- }
-
- @Test
- public void shouldCleanupBothTasksAndLocksAfterReplicationCancelledAfterMaxRetries()
- throws Exception {
- String projectName = "task_cleanup_locks_project_cancelled";
- String remoteDestination = "http://invalidurl:9090/";
- URIish urish = new URIish(remoteDestination + projectName + ".git");
-
- setReplicationDestination(projectName, "replica", Optional.of(projectName));
- // replace correct urls with invalid one to trigger retry
- config.setString("remote", projectName, "url", remoteDestination + "${name}.git");
- config.setInt("remote", projectName, "replicationMaxRetries", TEST_REPLICATION_MAX_RETRIES);
- config.save();
- reloadConfig();
- Destination destination =
- destinationCollection.getAll(FilterType.ALL).stream()
- .filter(dest -> dest.getProjects().contains(projectName))
- .findFirst()
- .get();
-
- waitUntil(() -> listRunning().size() == 0);
-
- createTestProject(projectName);
-
- waitUntil(() -> isTaskRescheduled(destination.getQueueInfo(), urish));
- // replicationRetry is set to 1 minute which is the minimum value. That's why
- // should be safe to get the pushOne object from pending because it should be
- // here for one minute
- PushOne pushOp = destination.getQueueInfo().pending.get(urish);
-
- WaitUtil.waitUntil(() -> pushOp.wasCanceled(), MAX_RETRY_WITH_TOLERANCE_TIMEOUT);
- waitUntil(() -> isTaskCleanedUp());
- }
-
- private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
- PushOne pushOne = queue.pending.get(uri);
- return pushOne == null ? false : pushOne.isRetrying();
- }
-
- @Test
- public void shouldFirePendingOnlyToRemainingUris() throws Exception {
- String suffix1 = "replica1";
- String suffix2 = "replica2";
- Project.NameKey target1 = createTestProject(project + suffix1);
- Project.NameKey target2 = createTestProject(project + suffix2);
- String remote1 = "foo1";
- String remote2 = "foo2";
- setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE, false);
- setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE, false);
- reloadConfig();
-
- String changeRef = createChange().getPatchSet().refName();
-
- changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
- .forEach(
- (update) -> {
- try {
- UriUpdates uriUpdates = TestUriUpdates.create(update);
- tasksStorage.start(uriUpdates);
- tasksStorage.finish(uriUpdates);
- } catch (URISyntaxException e) {
- }
- });
-
- setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
- setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
- reloadConfig();
-
- assertThat(changeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
- assertThat(changeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
-
- assertThat(isPushCompleted(target2, changeRef, TEST_TIMEOUT)).isEqualTo(true);
- assertThat(isPushCompleted(target1, changeRef, TEST_TIMEOUT)).isEqualTo(false);
- }
-
- public boolean isPushCompleted(Project.NameKey project, String ref, Duration timeOut) {
- try (Repository repo = repoManager.openRepository(project)) {
- WaitUtil.waitUntil(() -> checkedGetRef(repo, ref) != null, timeOut);
- return true;
- } catch (InterruptedException e) {
- return false;
- } catch (Exception e) {
- throw new RuntimeException("Cannot open repo for project" + project, e);
- }
- }
-
- private Ref getRef(Repository repo, String branchName) throws IOException {
- return repo.getRefDatabase().exactRef(branchName);
- }
-
- private Ref checkedGetRef(Repository repo, String branchName) {
- try {
- return repo.getRefDatabase().exactRef(branchName);
- } catch (Exception e) {
- logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
- return null;
- }
- }
-
- private void setReplicationDestination(
- String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
- setReplicationDestination(
- remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, false);
- }
-
- private void setReplicationDestination(
- String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
- throws IOException {
- setReplicationDestination(
- remoteName, Arrays.asList(replicaSuffix), project, TEST_REPLICATION_DELAY, mirror);
- }
-
- private void setReplicationDestination(
- String remoteName,
- String replicaSuffix,
- Optional<String> project,
- int replicationDelay,
- boolean mirror)
- throws IOException {
- setReplicationDestination(
- remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror);
- }
-
- private FileBasedConfig setReplicationDestination(
- String remoteName, List<String> replicaSuffixes, Optional<String> project)
- throws IOException {
- return setReplicationDestination(
- remoteName, replicaSuffixes, project, TEST_REPLICATION_DELAY, false);
- }
-
- private FileBasedConfig setReplicationDestination(
- String remoteName,
- List<String> replicaSuffixes,
- Optional<String> project,
- int replicationDelay,
- boolean mirror)
- throws IOException {
- List<String> replicaUrls =
- replicaSuffixes.stream()
- .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
- .collect(toList());
- config.setStringList("remote", remoteName, "url", replicaUrls);
- config.setInt("remote", remoteName, "replicationDelay", replicationDelay);
- config.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY);
- config.setBoolean("remote", remoteName, "mirror", mirror);
- project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
- config.setBoolean("gerrit", null, "autoReload", true);
- config.save();
- return config;
- }
-
private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
throws IOException {
config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion);
@@ -628,10 +372,6 @@
WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
}
- private void reloadConfig() {
- getAutoReloadConfigDecoratorInstance().reload();
- }
-
private void shutdownDestinations() {
getInstance(DestinationsCollection.class).shutdown();
}
@@ -644,92 +384,23 @@
getReplicationQueueInstance().stop();
}
- private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
- return getInstance(AutoReloadConfigDecorator.class);
- }
-
private ReplicationQueue getReplicationQueueInstance() {
return getInstance(ReplicationQueue.class);
}
- private <T> T getInstance(Class<T> classObj) {
- return plugin.getSysInjector().getInstance(classObj);
- }
-
- private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
- String changeRef, String remote) {
- return changeReplicationTasksForRemote(streamIncompleteTasks(), changeRef, remote);
- }
-
- private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
- Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
- return updates
- .filter(task -> changeRef.equals(task.ref()))
- .filter(task -> remote.equals(task.remote()));
- }
-
- private Project.NameKey createTestProject(String name) throws Exception {
- return projectOperations.newProject().name(name).create();
- }
-
- public List<ReplicateRefUpdate> listRunning() {
- return tasksStorage.streamRunning().collect(Collectors.toList());
- }
-
- @SuppressWarnings(
- "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
- private List<ReplicateRefUpdate> listIncompleteTasks(String refRegex) {
- Pattern refmaskPattern = Pattern.compile(refRegex);
- synchronized (tasksStorage) {
- return streamIncompleteTasks()
- .filter(task -> refmaskPattern.matcher(task.ref()).matches())
- .collect(toList());
- }
- }
-
- @SuppressWarnings(
- "SynchronizeOnNonFinalField") // tasksStorage is non-final but only set in setUpTestPlugin()
- private List<ReplicateRefUpdate> listIncompleteTasks() {
- synchronized (tasksStorage) {
- return streamIncompleteTasks().collect(toList());
- }
- }
-
- private Stream<ReplicateRefUpdate> streamIncompleteTasks() {
- return Stream.concat(tasksStorage.streamWaiting(), tasksStorage.streamRunning());
- }
-
- public void cleanupReplicationTasks() throws IOException {
- cleanupReplicationTasks(storagePath);
- }
-
- private void cleanupReplicationTasks(Path basePath) throws IOException {
- try (DirectoryStream<Path> files = Files.newDirectoryStream(basePath)) {
- for (Path path : files) {
- if (Files.isDirectory(path)) {
- cleanupReplicationTasks(path);
- } else {
- path.toFile().delete();
- }
+ private ObjectId createNewBranchWithoutPush(String fromBranch, String newBranch)
+ throws Exception {
+ try (Repository repo = repoManager.openRepository(project);
+ RevWalk walk = new RevWalk(repo)) {
+ Ref ref = repo.exactRef(fromBranch);
+ RevCommit tip = null;
+ if (ref != null) {
+ tip = walk.parseCommit(ref.getObjectId());
}
- }
- }
-
- private boolean isTaskCleanedUp() {
- Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
- Path runningUpdates = refUpdates.resolve("running");
- try {
- return Files.list(runningUpdates).count() == 0;
- } catch (IOException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- private boolean nonEmptyProjectExists(Project.NameKey name) {
- try (Repository r = repoManager.openRepository(name)) {
- return !r.getAllRefsByPeeledObjectId().isEmpty();
- } catch (Exception e) {
- return false;
+ RefUpdate update = repo.updateRef(newBranch);
+ update.setNewObjectId(tip);
+ update.update(walk);
+ return update.getNewObjectId();
}
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
new file mode 100644
index 0000000..a65b257
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -0,0 +1,407 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
+import static java.util.stream.Collectors.toList;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+/**
+ * The tests in this class aim to ensure events are correctly written and read from storage. They
+ * typically do this by setting up replication destinations with long delays, performing actions
+ * that are expected to write to storage, reloading the configuration (which should read from
+ * storage), and then confirming the actions complete as expected.
+ */
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageIT extends ReplicationDaemon {
+ private static final int TEST_TASK_FINISH_SECONDS = 1;
+ private static final int TEST_REPLICATION_MAX_RETRIES = 1;
+ protected static final Duration TEST_TASK_FINISH_TIMEOUT =
+ Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
+ private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
+ Duration.ofSeconds(
+ (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+ * TEST_REPLICATION_MAX_RETRIES
+ + 10);
+ protected ReplicationTasksStorage tasksStorage;
+ private DestinationsCollection destinationCollection;
+ private ReplicationConfig replicationConfig;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ initConfig();
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ super.setUpTestPlugin();
+ tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+ destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
+ replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
+ }
+
+ @Test
+ public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+ List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+ createTestProject(project + "replica1");
+ createTestProject(project + "replica2");
+
+ setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ createChange();
+
+ assertThat(listWaitingReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+ }
+
+ @Test
+ public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ plugin
+ .getSysInjector()
+ .getInstance(ReplicationQueue.class)
+ .scheduleFullSync(project, null, new ReplicationState(NO_OP), false);
+
+ assertThat(listWaitingReplicationTasks(Pattern.quote(PushOne.ALL_REFS))).hasSize(1);
+ }
+
+ @Test
+ public void shouldFirePendingOnlyToIncompleteUri() throws Exception {
+ String suffix1 = "replica1";
+ String suffix2 = "replica2";
+ String remote1 = "foo1";
+ String remote2 = "foo2";
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String changeRef = createChange().getPatchSet().refName();
+ changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
+ .forEach(
+ (update) -> {
+ try {
+ UriUpdates uriUpdates = TestUriUpdates.create(update);
+ tasksStorage.start(uriUpdates);
+ tasksStorage.finish(uriUpdates);
+ } catch (URISyntaxException e) {
+ }
+ });
+ reloadConfig();
+
+ assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote2).count()).isEqualTo(1);
+ assertThat(waitingChangeReplicationTasksForRemote(changeRef, remote1).count()).isEqualTo(0);
+ }
+
+ @Test
+ public void shouldFireAndCompletePendingOnlyToIncompleteUri() throws Exception {
+ String suffix1 = "replica1";
+ String suffix2 = "replica2";
+ Project.NameKey target1 = createTestProject(project + suffix1);
+ Project.NameKey target2 = createTestProject(project + suffix2);
+ String remote1 = "foo1";
+ String remote2 = "foo2";
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String changeRef = createChange().getPatchSet().refName();
+ changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
+ .forEach(
+ (update) -> {
+ try {
+ UriUpdates uriUpdates = TestUriUpdates.create(update);
+ tasksStorage.start(uriUpdates);
+ tasksStorage.finish(uriUpdates);
+ } catch (URISyntaxException e) {
+ }
+ });
+
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+ reloadConfig();
+
+ assertThat(isPushCompleted(target2, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(true);
+ assertThat(isPushCompleted(target1, changeRef, TEST_PUSH_TIMEOUT)).isEqualTo(false);
+ }
+
+ @Test
+ public void shouldFirePendingChangeRefs() throws Exception {
+ String suffix1 = "replica1";
+ String suffix2 = "replica2";
+ String remote1 = "foo1";
+ String remote2 = "foo2";
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String changeRef1 = createChange().getPatchSet().refName();
+ String changeRef2 = createChange().getPatchSet().refName();
+ reloadConfig();
+
+ assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote1).count()).isEqualTo(1);
+ assertThat(waitingChangeReplicationTasksForRemote(changeRef1, remote2).count()).isEqualTo(1);
+ assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote1).count()).isEqualTo(1);
+ assertThat(waitingChangeReplicationTasksForRemote(changeRef2, remote2).count()).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldFireAndCompletePendingChangeRefs() throws Exception {
+ String suffix1 = "replica1";
+ String suffix2 = "replica2";
+ Project.NameKey target1 = createTestProject(project + suffix1);
+ Project.NameKey target2 = createTestProject(project + suffix2);
+ String remote1 = "foo1";
+ String remote2 = "foo2";
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS, Integer.MAX_VALUE);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String changeRef1 = createChange().getPatchSet().refName();
+ String changeRef2 = createChange().getPatchSet().refName();
+ Map<Project.NameKey, String> refsByProject = new HashMap<>();
+ refsByProject.put(target1, changeRef1);
+ refsByProject.put(target2, changeRef1);
+ refsByProject.put(target1, changeRef2);
+ refsByProject.put(target2, changeRef2);
+
+ setReplicationDestination(remote1, suffix1, ALL_PROJECTS);
+ setReplicationDestination(remote2, suffix2, ALL_PROJECTS);
+ reloadConfig();
+
+ // Wait for completion within the time 2 pushes should take because each remote only has 1
+ // thread and needs to push 2 events
+ assertThat(isPushCompleted(refsByProject, TEST_PUSH_TIMEOUT.plus(TEST_PUSH_TIMEOUT)))
+ .isEqualTo(true);
+ }
+
+ @Test
+ public void shouldMatchTemplatedUrl() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String urlMatch = gitPath.resolve("${name}" + "replica" + ".git").toString();
+ String expectedURI = gitPath.resolve(project + "replica" + ".git").toString();
+
+ plugin
+ .getSysInjector()
+ .getInstance(ReplicationQueue.class)
+ .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+ assertThat(listWaiting()).hasSize(1);
+ tasksStorage
+ .streamWaiting()
+ .forEach(
+ (task) -> {
+ assertThat(task.uri()).isEqualTo(expectedURI);
+ assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+ });
+ }
+
+ @Test
+ public void shouldMatchRealUrl() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String urlMatch = gitPath.resolve(project + "replica" + ".git").toString();
+ String expectedURI = urlMatch;
+
+ plugin
+ .getSysInjector()
+ .getInstance(ReplicationQueue.class)
+ .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
+
+ assertThat(listWaiting()).hasSize(1);
+ tasksStorage
+ .streamWaiting()
+ .forEach(
+ (task) -> {
+ assertThat(task.uri()).isEqualTo(expectedURI);
+ assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+ });
+ }
+
+ @Test
+ public void shouldReplicateBranchDeletionWhenMirror() throws Exception {
+ replicateBranchDeletion(true);
+ }
+
+ @Test
+ public void shouldNotReplicateBranchDeletionWhenNotMirror() throws Exception {
+ replicateBranchDeletion(false);
+ }
+
+ private void replicateBranchDeletion(boolean mirror) throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey targetProject = createTestProject(project + "replica");
+ String branchToDelete = "refs/heads/todelete";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(branchToDelete).create(input);
+ isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
+ reloadConfig();
+
+ gApi.projects().name(project.get()).branch(branchToDelete).delete();
+
+ assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
+ }
+
+ @Test
+ public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
+ setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
+ config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
+ config.save();
+ reloadConfig();
+ assertThat(listRunning()).hasSize(0);
+ Project.NameKey sourceProject = createTestProject("task_cleanup_project");
+
+ WaitUtil.waitUntil(
+ () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
+ TEST_NEW_PROJECT_TIMEOUT);
+ WaitUtil.waitUntil(() -> listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
+ }
+
+ @Test
+ public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception {
+ setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS);
+ config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
+ config.save();
+ reloadConfig();
+ assertThat(listRunning()).hasSize(0);
+ Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
+
+ WaitUtil.waitUntil(
+ () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
+ TEST_NEW_PROJECT_TIMEOUT);
+ WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
+ }
+
+ @Test
+ public void shouldCleanupBothTasksAndLocksAfterReplicationCancelledAfterMaxRetries()
+ throws Exception {
+ String projectName = "task_cleanup_locks_project_cancelled";
+ String remoteDestination = "http://invalidurl:9090/";
+ URIish urish = new URIish(remoteDestination + projectName + ".git");
+
+ setReplicationDestination(projectName, "replica", Optional.of(projectName));
+ // replace correct urls with invalid one to trigger retry
+ config.setString("remote", projectName, "url", remoteDestination + "${name}.git");
+ config.setInt("remote", projectName, "replicationMaxRetries", TEST_REPLICATION_MAX_RETRIES);
+ config.save();
+ reloadConfig();
+ Destination destination =
+ destinationCollection.getAll(FilterType.ALL).stream()
+ .filter(dest -> dest.getProjects().contains(projectName))
+ .findFirst()
+ .get();
+
+ createTestProject(projectName);
+
+ WaitUtil.waitUntil(
+ () -> isTaskRescheduled(destination.getQueueInfo(), urish), TEST_NEW_PROJECT_TIMEOUT);
+ // replicationRetry is set to 1 minute which is the minimum value. That's why
+ // should be safe to get the pushOne object from pending because it should be
+ // here for one minute
+ PushOne pushOp = destination.getQueueInfo().pending.get(urish);
+
+ WaitUtil.waitUntil(() -> pushOp.wasCanceled(), MAX_RETRY_WITH_TOLERANCE_TIMEOUT);
+ WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
+ }
+
+ private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
+ PushOne pushOne = queue.pending.get(uri);
+ return pushOne == null ? false : pushOne.isRetrying();
+ }
+
+ private boolean isTaskCleanedUp() {
+ Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+ Path runningUpdates = refUpdates.resolve("running");
+ try {
+ return Files.list(runningUpdates).count() == 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
+ String changeRef, String remote) {
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> changeRef.equals(task.ref()))
+ .filter(task -> remote.equals(task.remote()));
+ }
+
+ private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
+ Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
+ return updates
+ .filter(task -> changeRef.equals(task.ref()))
+ .filter(task -> remote.equals(task.remote()));
+ }
+
+ private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
+ Pattern refmaskPattern = Pattern.compile(refRegex);
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .collect(toList());
+ }
+
+ private List<ReplicateRefUpdate> listWaiting() {
+ return tasksStorage.streamWaiting().collect(Collectors.toList());
+ }
+
+ private List<ReplicateRefUpdate> listRunning() {
+ return tasksStorage.streamRunning().collect(Collectors.toList());
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
new file mode 100644
index 0000000..b5c01b9
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -0,0 +1,204 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.net.URISyntaxException;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageMPTest {
+ protected static final String PROJECT = "myProject";
+ protected static final String REF = "myRef";
+ protected static final String REMOTE = "myDest";
+ protected static final URIish URISH =
+ ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
+ protected static final ReplicateRefUpdate REF_UPDATE =
+ ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+ protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE);
+
+ protected ReplicationTasksStorage nodeA;
+ protected ReplicationTasksStorage nodeB;
+ protected ReplicationTasksStorage persistedView;
+ protected FileSystem fileSystem;
+
+ @Before
+ public void setUp() throws Exception {
+ fileSystem = Jimfs.newFileSystem(Configuration.unix());
+ Path storageSite = fileSystem.getPath("replication_site");
+ nodeA = new ReplicationTasksStorage(storageSite);
+ nodeB = new ReplicationTasksStorage(storageSite);
+ persistedView = new ReplicationTasksStorage(storageSite);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ persistedView = null;
+ nodeA = null;
+ nodeB = null;
+ fileSystem.close();
+ }
+
+ @Test
+ public void sameTaskCreatedByOtherNodeIsDeduped() {
+ nodeA.create(REF_UPDATE);
+
+ nodeB.create(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ }
+
+ @Test
+ public void waitingTaskCanBeCompletedByOtherNode() {
+ nodeA.create(REF_UPDATE);
+
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void resetTaskCanBeCompletedByOtherNode() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+
+ nodeA.reset(URI_UPDATES);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void resetTaskCanBeResetAndCompletedByOtherNode() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+ nodeA.reset(URI_UPDATES);
+ nodeB.start(URI_UPDATES);
+
+ nodeB.reset(URI_UPDATES);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void resetTaskCanBeResetByOtherNodeAndCompletedByOriginalNode() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+ nodeA.reset(URI_UPDATES);
+ nodeB.start(URI_UPDATES);
+ nodeB.reset(URI_UPDATES);
+
+ nodeA.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeA.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void canBeResetAllAndCompletedByOtherNode() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+
+ nodeB.resetAll();
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeA.finish(URI_UPDATES);
+ // Bug: https://crbug.com/gerrit/12973
+ // assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void canBeResetAllAndCompletedByOtherNodeFastOriginalNode() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+ nodeB.resetAll();
+
+ nodeA.finish(URI_UPDATES);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void canBeResetAllAndCompletedByOtherNodeSlowOriginalNode() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+ nodeB.resetAll();
+
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
+
+ nodeA.finish(URI_UPDATES);
+ ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
+ }
+
+ @Test
+ public void multipleNodesCanReplicateSameRef() {
+ nodeA.create(REF_UPDATE);
+ nodeA.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeA.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+
+ nodeB.create(REF_UPDATE);
+ nodeB.start(URI_UPDATES);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ nodeB.finish(URI_UPDATES);
+ assertNoIncompleteTasks(persistedView);
+ }
+
+ public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
+ try {
+ return TestUriUpdates.create(refUpdate);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Cannot instantiate UriUpdates object", e);
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
new file mode 100644
index 0000000..b646bc6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -0,0 +1,195 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertIsRunning;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertIsWaiting;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertNotRunning;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTaskTest.assertNotWaiting;
+
+import com.google.common.jimfs.Configuration;
+import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.nio.file.FileSystem;
+import java.nio.file.Path;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationTasksStorageTaskMPTest {
+ protected static final String PROJECT = "myProject";
+ protected static final String REF = "myRef";
+ protected static final String REMOTE = "myDest";
+ protected static final URIish URISH =
+ ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
+ protected static final ReplicateRefUpdate REF_UPDATE =
+ ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+
+ protected FileSystem fileSystem;
+ protected Path storageSite;
+ protected ReplicationTasksStorage nodeA;
+ protected ReplicationTasksStorage nodeB;
+ protected ReplicationTasksStorage.Task taskA;
+ protected ReplicationTasksStorage.Task taskB;
+
+ @Before
+ public void setUp() throws Exception {
+ fileSystem = Jimfs.newFileSystem(Configuration.unix());
+ storageSite = fileSystem.getPath("replication_site");
+ nodeA = new ReplicationTasksStorage(storageSite);
+ nodeB = new ReplicationTasksStorage(storageSite);
+ taskA = nodeA.new Task(REF_UPDATE);
+ taskB = nodeB.new Task(REF_UPDATE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fileSystem.close();
+ }
+
+ @Test
+ public void waitingTaskCanBeCompletedByOtherNode() {
+ taskA.create();
+
+ taskB.start();
+ assertIsRunning(taskA);
+
+ taskB.finish();
+ assertNotRunning(taskA);
+ assertNotWaiting(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+
+ @Test
+ public void resetTaskCanBeCompletedByOtherNode() {
+ taskA.create();
+ taskA.start();
+
+ taskA.reset();
+ assertIsWaiting(taskA);
+
+ taskB.start();
+ assertIsRunning(taskA);
+ assertIsRunning(taskB);
+
+ taskB.finish();
+ assertNotRunning(taskA);
+ assertNotWaiting(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+
+ @Test
+ public void retryCanBeRetriedAndCompletedByOtherNode() {
+ taskA.create();
+ taskA.start();
+ taskA.reset();
+ taskB.start();
+
+ taskB.reset();
+ assertIsWaiting(taskA);
+
+ taskB.start();
+ assertIsRunning(taskA);
+
+ taskB.finish();
+ assertNotRunning(taskA);
+ assertNotWaiting(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+
+ @Test
+ public void retryCanBeRetriedOtherNodeAndCompletedByOriginalNode() {
+ taskA.create();
+ taskA.start();
+ taskA.reset();
+ taskB.start();
+ taskB.reset();
+
+ taskA.start();
+ assertIsRunning(taskA);
+
+ taskA.finish();
+ assertNotRunning(taskA);
+ assertNotWaiting(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+
+ @Test
+ public void canBeResetAllAndCompletedByOtherNode() {
+ taskA.create();
+ taskA.start();
+
+ nodeB.resetAll();
+ assertIsWaiting(taskA);
+
+ taskB.create();
+ taskB.start();
+ assertIsRunning(taskA);
+
+ taskA.finish();
+ // Bug: https://crbug.com/gerrit/12973
+ // assertIsRunning(taskB);
+
+ taskB.finish();
+ assertNotRunning(taskA);
+ assertNotWaiting(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+
+ @Test
+ public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() {
+ taskA.create();
+ taskA.start();
+ nodeB.resetAll();
+
+ taskA.finish();
+ assertIsWaiting(taskA);
+
+ taskB.start();
+ assertIsRunning(taskA);
+
+ taskB.finish();
+ assertNotRunning(taskA);
+ assertNotWaiting(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+
+ @Test
+ public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() {
+ taskA.create();
+ taskA.start();
+ nodeB.resetAll();
+
+ taskB.start();
+ assertIsRunning(taskA);
+
+ taskB.finish();
+ assertNotWaiting(taskA);
+ assertNotRunning(taskA);
+
+ taskA.finish();
+ assertNotWaiting(taskA);
+ assertNotRunning(taskA);
+ assertNotRunning(taskB);
+ assertNotWaiting(taskB);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index af084f1..a2e5e4d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -346,30 +346,26 @@
assertIsWaiting(persistedView);
}
- private void assertIsWaiting(Task task) {
- assertTrue(whiteBoxIsWaiting(task));
+ protected static void assertIsWaiting(Task task) {
+ assertTrue(task.isWaiting());
}
- private void assertNotWaiting(Task task) {
- assertFalse(whiteBoxIsWaiting(task));
+ protected static void assertNotWaiting(Task task) {
+ assertFalse(task.isWaiting());
}
- private void assertIsRunning(Task task) {
+ protected static void assertIsRunning(Task task) {
assertTrue(whiteBoxIsRunning(task));
}
- private void assertNotRunning(Task task) {
+ protected static void assertNotRunning(Task task) {
assertFalse(whiteBoxIsRunning(task));
}
- private boolean whiteBoxIsRunning(Task task) {
+ private static boolean whiteBoxIsRunning(Task task) {
return Files.exists(task.running);
}
- private boolean whiteBoxIsWaiting(Task task) {
- return Files.exists(task.waiting);
- }
-
public static URIish getUrish(String uri) {
try {
return new URIish(uri);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 4b09d52..4b1a827 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -16,7 +16,9 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
@@ -71,10 +73,20 @@
}
@Test
+ public void canCheckIfUpdateIsWaiting() {
+ storage.create(REF_UPDATE);
+ assertTrue(storage.isWaiting(uriUpdates));
+
+ storage.start(uriUpdates);
+ assertFalse(storage.isWaiting(uriUpdates));
+ }
+
+ @Test
public void canStartWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
storage.start(uriUpdates);
assertThatStream(storage.streamWaiting()).isEmpty();
+ assertFalse(storage.isWaiting(uriUpdates));
assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
}
@@ -128,6 +140,8 @@
String keyA = storage.create(REF_UPDATE);
String keyB = storage.create(updateB);
assertThatStream(storage.streamWaiting()).hasSize(2);
+ assertTrue(storage.isWaiting(uriUpdates));
+ assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
assertNotEquals(keyA, keyB);
}
@@ -187,6 +201,8 @@
storage.create(REF_UPDATE);
storage.create(updateB);
assertThatStream(storage.streamWaiting()).hasSize(2);
+ assertTrue(storage.isWaiting(uriUpdates));
+ assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
}
@Test
@@ -198,6 +214,8 @@
String keyB = storage.create(refB);
assertThatStream(storage.streamWaiting()).hasSize(2);
assertNotEquals(keyA, keyB);
+ assertTrue(storage.isWaiting(TestUriUpdates.create(refA)));
+ assertTrue(storage.isWaiting(TestUriUpdates.create(refB)));
}
@Test
@@ -237,6 +255,7 @@
storage.start(uriUpdates);
assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
assertThatStream(storage.streamWaiting()).isEmpty();
+ assertFalse(storage.isWaiting(uriUpdates));
storage.finish(uriUpdates);
assertNoIncompleteTasks(storage);
@@ -256,6 +275,7 @@
storage.resetAll();
assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
assertThatStream(storage.streamRunning()).isEmpty();
+ assertTrue(storage.isWaiting(uriUpdates));
}
@Test
@@ -267,6 +287,7 @@
storage.start(uriUpdates);
assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
assertThatStream(storage.streamWaiting()).isEmpty();
+ assertFalse(storage.isWaiting(uriUpdates));
storage.finish(uriUpdates);
assertNoIncompleteTasks(storage);
@@ -353,12 +374,12 @@
assertThatStream(storage.streamRunning()).isEmpty();
}
- private void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
+ protected static void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
assertThatStream(storage.streamWaiting()).isEmpty();
assertThatStream(storage.streamRunning()).isEmpty();
}
- private IterableSubject assertThatStream(Stream<?> stream) {
+ protected static IterableSubject assertThatStream(Stream<?> stream) {
return assertThat(stream.collect(Collectors.toList()));
}