Merge "Avoid starting tasks which are not present under waiting storage area"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 8d7227f..12f0273 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -583,7 +583,7 @@
if (inFlightOp != null) {
return RunwayStatus.denied(inFlightOp.getId());
}
- replicationTasksStorage.get().start(op);
+ op.setRefs(replicationTasksStorage.get().start(op));
inFlight.put(op.getURI(), op);
}
return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index bdef2fa..565790c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -270,6 +270,12 @@
}
}
+ void setRefs(Set<String> refs) {
+ pushAllRefs = false;
+ delta.clear();
+ addRefs(refs);
+ }
+
void addState(String ref, ReplicationState state) {
stateMap.put(ref, state);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index d6cc8be..4736402 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -32,7 +32,9 @@
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Stream;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.transport.URIish;
@@ -93,6 +95,11 @@
public abstract String remote();
+ public String sha1() {
+ return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote())
+ .name();
+ }
+
@Override
public final String toString() {
return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote();
@@ -127,10 +134,15 @@
return new Task(r).create();
}
- public synchronized void start(UriUpdates uriUpdates) {
+ public synchronized Set<String> start(UriUpdates uriUpdates) {
+ Set<String> startedRefs = new HashSet<>();
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
- new Task(update).start();
+ Task t = new Task(update);
+ if (t.start()) {
+ startedRefs.add(t.update.ref());
+ }
}
+ return startedRefs;
}
public synchronized void reset(UriUpdates uriUpdates) {
@@ -182,7 +194,7 @@
}
@SuppressWarnings("deprecation")
- private ObjectId sha1(String s) {
+ private static ObjectId sha1(String s) {
return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
}
@@ -203,9 +215,7 @@
public Task(ReplicateRefUpdate update) {
this.update = update;
- String key =
- update.project() + "\n" + update.ref() + "\n" + update.uri() + "\n" + update.remote();
- taskKey = sha1(key).name();
+ taskKey = update.sha1();
running = createDir(runningUpdates).resolve(taskKey);
waiting = createDir(waitingUpdates).resolve(taskKey);
}
@@ -228,8 +238,8 @@
return taskKey;
}
- public void start() {
- rename(waiting, running);
+ public boolean start() {
+ return rename(waiting, running);
}
public void reset() {
@@ -253,12 +263,14 @@
}
}
- private void rename(Path from, Path to) {
+ private boolean rename(Path from, Path to) {
try {
logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+ return true;
} catch (IOException e) {
logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+ return false;
}
}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index adb8d4c..ded9d84 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,6 +16,12 @@
local path as replication target. This makes e.g. sense if a network
share is mounted to which the repositories should be replicated.
+In multi-primary scenario, any replication work which is already
+in-flight or completed by the other nodes is not performed to
+avoid extra work. This is because, the storage for replication
+events is shared between multiple primaries.(The storage location
+is specified in the config using: `replication.eventsDirectory`).
+
Replication of account data (NoteDb)
------------------------------------
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 420cdf8..48d14e2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -50,11 +50,14 @@
protected static final Optional<String> ALL_PROJECTS = Optional.empty();
protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+ protected static final int TEST_LONG_REPLICATION_DELAY_SECONDS = 30;
protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
protected static final int TEST_PUSH_TIME_SECONDS = 1;
protected static final int TEST_PROJECT_CREATION_SECONDS = 10;
protected static final Duration TEST_PUSH_TIMEOUT =
Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+ protected static final Duration TEST_PUSH_TIMEOUT_LONG =
+ Duration.ofSeconds(TEST_LONG_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
protected static final Duration TEST_NEW_PROJECT_TIMEOUT =
Duration.ofSeconds(
(TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
new file mode 100644
index 0000000..bcbbc8a
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -0,0 +1,89 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class can be extended by any ReplicationStorage*IT class and provides common setup and
+ * helper methods.
+ */
+public class ReplicationStorageDaemon extends ReplicationDaemon {
+ protected static final int TEST_TASK_FINISH_SECONDS = 1;
+ protected static final int TEST_REPLICATION_MAX_RETRIES = 1;
+ protected static final Duration TEST_TASK_FINISH_TIMEOUT =
+ Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
+ protected static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
+ Duration.ofSeconds(
+ (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+ * TEST_REPLICATION_MAX_RETRIES
+ + 10);
+ protected ReplicationTasksStorage tasksStorage;
+ protected DestinationsCollection destinationCollection;
+ protected ReplicationConfig replicationConfig;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ initConfig();
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ super.setUpTestPlugin();
+ tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+ destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
+ replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
+ }
+
+ protected boolean noIncompleteTasks() {
+ Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+ Path runningUpdates = refUpdates.resolve("running");
+ Path waitingUpdates = refUpdates.resolve("waiting");
+ try {
+ return Files.list(runningUpdates).count() == 0 && Files.list(waitingUpdates).count() == 0;
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaitingReplicationTasks(
+ String refRegex) {
+ Pattern refmaskPattern = Pattern.compile(refRegex);
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .collect(toList());
+ }
+
+ protected void deleteWaitingReplicationTasks(String refRegex) {
+ Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+ Path waitingUpdates = refUpdates.resolve("waiting");
+ for (ReplicationTasksStorage.ReplicateRefUpdate r : listWaitingReplicationTasks(refRegex)) {
+ try {
+ Files.deleteIfExists(waitingUpdates.resolve(r.sha1()));
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't delete waiting task", e);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index a65b257..753e429 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -16,7 +16,6 @@
import static com.google.common.truth.Truth.assertThat;
import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
@@ -29,7 +28,6 @@
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -51,32 +49,7 @@
@TestPlugin(
name = "replication",
sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationStorageIT extends ReplicationDaemon {
- private static final int TEST_TASK_FINISH_SECONDS = 1;
- private static final int TEST_REPLICATION_MAX_RETRIES = 1;
- protected static final Duration TEST_TASK_FINISH_TIMEOUT =
- Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
- private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
- Duration.ofSeconds(
- (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
- * TEST_REPLICATION_MAX_RETRIES
- + 10);
- protected ReplicationTasksStorage tasksStorage;
- private DestinationsCollection destinationCollection;
- private ReplicationConfig replicationConfig;
-
- @Override
- public void setUpTestPlugin() throws Exception {
- initConfig();
- setReplicationDestination(
- "remote1",
- "suffix1",
- Optional.of("not-used-project")); // Simulates a full replication.config initialization
- super.setUpTestPlugin();
- tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
- destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
- replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
- }
+public class ReplicationStorageIT extends ReplicationStorageDaemon {
@Test
public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
@@ -389,14 +362,6 @@
.filter(task -> remote.equals(task.remote()));
}
- private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
- Pattern refmaskPattern = Pattern.compile(refRegex);
- return tasksStorage
- .streamWaiting()
- .filter(task -> refmaskPattern.matcher(task.ref()).matches())
- .collect(toList());
- }
-
private List<ReplicateRefUpdate> listWaiting() {
return tasksStorage.streamWaiting().collect(Collectors.toList());
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
new file mode 100644
index 0000000..65fc4df
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.junit.Test;
+
+/**
+ * The tests in this class ensure that events in the storage are correctly managed under multi-
+ * primary scenarios.
+ *
+ * @see com.googlesource.gerrit.plugins.replication.ReplicationStorageIT
+ */
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageMPIT extends ReplicationStorageDaemon {
+
+ @Test
+ public void workFromOnlyWaitingIsPerformed() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, TEST_LONG_REPLICATION_DELAY_SECONDS);
+ reloadConfig();
+
+ String newBranchA = "refs/heads/foo_branch_a";
+ String newBranchB = "refs/heads/foo_branch_b";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newBranchA).create(input);
+ gApi.projects().name(project.get()).branch(newBranchB).create(input);
+
+ deleteWaitingReplicationTasks(
+ newBranchA); // This simulates the work being completed by other node
+ assertThat(listWaitingReplicationTasks("refs/heads/foo_branch_.*")).hasSize(1);
+
+ try (Repository repo = repoManager.openRepository(targetProject);
+ Repository sourceRepo = repoManager.openRepository(project)) {
+ WaitUtil.waitUntil(
+ () -> checkedGetRef(repo, newBranchA) == null && checkedGetRef(repo, newBranchB) != null,
+ TEST_PUSH_TIMEOUT_LONG);
+
+ Ref masterRef = getRef(sourceRepo, master);
+ Ref targetBranchRefA = getRef(repo, newBranchA);
+ Ref targetBranchRefB = getRef(repo, newBranchB);
+ assertThat(targetBranchRefA).isNull();
+ assertThat(targetBranchRefB).isNotNull();
+ assertThat(targetBranchRefB.getObjectId()).isEqualTo(masterRef.getObjectId());
+ }
+
+ assertTrue(noIncompleteTasks());
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
index 25752b4..5cfb2d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.google.common.truth.Truth.assertThat;
import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks;
import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream;
@@ -194,6 +195,18 @@
assertNoIncompleteTasks(persistedView);
}
+ @Test
+ public void duplicateWorkIsNotPerformed() {
+ nodeA.create(REF_UPDATE);
+ nodeB.create(REF_UPDATE);
+
+ assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref());
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ assertThat(nodeB.start(URI_UPDATES)).isEmpty();
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ }
+
public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
try {
return TestUriUpdates.create(refUpdate);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 48e145b..16a0363 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -84,7 +84,7 @@
@Test
public void canStartWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
- storage.start(uriUpdates);
+ assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref());
assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);