Avoid starting tasks which are not present under waiting storage area This change only starts the replication tasks which are present under the ../waiting directory and ignores any extra refs which it was orginally intended to push. This will reduce the duplicate work because the absence of tasks under ../waiting directory implies that it has already been started/completed by other nodes. Change-Id: Ifbb7018ec1d960015626c089a4dadf6b0247d278
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 8054eb4..ab8c476 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -583,7 +583,7 @@ if (inFlightOp != null) { return RunwayStatus.denied(inFlightOp.getId()); } - replicationTasksStorage.get().start(op); + op.setRefs(replicationTasksStorage.get().start(op)); inFlight.put(op.getURI(), op); } return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 3c34fb6..3d06d47 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -259,6 +259,12 @@ } } + void setRefs(Set<String> refs) { + pushAllRefs = false; + delta.clear(); + addRefs(refs); + } + void addState(String ref, ReplicationState state) { stateMap.put(ref, state); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index d6cc8be..4736402 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -32,7 +32,9 @@ import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.transport.URIish; @@ -93,6 +95,11 @@ public abstract String remote(); + public String sha1() { + return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote()) + .name(); + } + @Override public final String toString() { return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote(); @@ -127,10 +134,15 @@ return new Task(r).create(); } - public synchronized void start(UriUpdates uriUpdates) { + public synchronized Set<String> start(UriUpdates uriUpdates) { + Set<String> startedRefs = new HashSet<>(); for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { - new Task(update).start(); + Task t = new Task(update); + if (t.start()) { + startedRefs.add(t.update.ref()); + } } + return startedRefs; } public synchronized void reset(UriUpdates uriUpdates) { @@ -182,7 +194,7 @@ } @SuppressWarnings("deprecation") - private ObjectId sha1(String s) { + private static ObjectId sha1(String s) { return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes()); } @@ -203,9 +215,7 @@ public Task(ReplicateRefUpdate update) { this.update = update; - String key = - update.project() + "\n" + update.ref() + "\n" + update.uri() + "\n" + update.remote(); - taskKey = sha1(key).name(); + taskKey = update.sha1(); running = createDir(runningUpdates).resolve(taskKey); waiting = createDir(waitingUpdates).resolve(taskKey); } @@ -228,8 +238,8 @@ return taskKey; } - public void start() { - rename(waiting, running); + public boolean start() { + return rename(waiting, running); } public void reset() { @@ -253,12 +263,14 @@ } } - private void rename(Path from, Path to) { + private boolean rename(Path from, Path to) { try { logger.atFine().log("RENAME %s to %s %s", from, to, updateLog()); Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + return true; } catch (IOException e) { logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey); + return false; } }
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md index adb8d4c..ded9d84 100644 --- a/src/main/resources/Documentation/about.md +++ b/src/main/resources/Documentation/about.md
@@ -16,6 +16,12 @@ local path as replication target. This makes e.g. sense if a network share is mounted to which the repositories should be replicated. +In multi-primary scenario, any replication work which is already +in-flight or completed by the other nodes is not performed to +avoid extra work. This is because, the storage for replication +events is shared between multiple primaries.(The storage location +is specified in the config using: `replication.eventsDirectory`). + Replication of account data (NoteDb) ------------------------------------
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java index 420cdf8..48d14e2 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -50,11 +50,14 @@ protected static final Optional<String> ALL_PROJECTS = Optional.empty(); protected static final int TEST_REPLICATION_DELAY_SECONDS = 1; + protected static final int TEST_LONG_REPLICATION_DELAY_SECONDS = 30; protected static final int TEST_REPLICATION_RETRY_MINUTES = 1; protected static final int TEST_PUSH_TIME_SECONDS = 1; protected static final int TEST_PROJECT_CREATION_SECONDS = 10; protected static final Duration TEST_PUSH_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS); + protected static final Duration TEST_PUSH_TIMEOUT_LONG = + Duration.ofSeconds(TEST_LONG_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS); protected static final Duration TEST_NEW_PROJECT_TIMEOUT = Duration.ofSeconds( (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java new file mode 100644 index 0000000..bcbbc8a --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -0,0 +1,89 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static java.util.stream.Collectors.toList; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; + +/** + * This class can be extended by any ReplicationStorage*IT class and provides common setup and + * helper methods. + */ +public class ReplicationStorageDaemon extends ReplicationDaemon { + protected static final int TEST_TASK_FINISH_SECONDS = 1; + protected static final int TEST_REPLICATION_MAX_RETRIES = 1; + protected static final Duration TEST_TASK_FINISH_TIMEOUT = + Duration.ofSeconds(TEST_TASK_FINISH_SECONDS); + protected static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT = + Duration.ofSeconds( + (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + * TEST_REPLICATION_MAX_RETRIES + + 10); + protected ReplicationTasksStorage tasksStorage; + protected DestinationsCollection destinationCollection; + protected ReplicationConfig replicationConfig; + + @Override + public void setUpTestPlugin() throws Exception { + initConfig(); + setReplicationDestination( + "remote1", + "suffix1", + Optional.of("not-used-project")); // Simulates a full replication.config initialization + super.setUpTestPlugin(); + tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); + destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class); + replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class); + } + + protected boolean noIncompleteTasks() { + Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates"); + Path runningUpdates = refUpdates.resolve("running"); + Path waitingUpdates = refUpdates.resolve("waiting"); + try { + return Files.list(runningUpdates).count() == 0 && Files.list(waitingUpdates).count() == 0; + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaitingReplicationTasks( + String refRegex) { + Pattern refmaskPattern = Pattern.compile(refRegex); + return tasksStorage + .streamWaiting() + .filter(task -> refmaskPattern.matcher(task.ref()).matches()) + .collect(toList()); + } + + protected void deleteWaitingReplicationTasks(String refRegex) { + Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates"); + Path waitingUpdates = refUpdates.resolve("waiting"); + for (ReplicationTasksStorage.ReplicateRefUpdate r : listWaitingReplicationTasks(refRegex)) { + try { + Files.deleteIfExists(waitingUpdates.resolve(r.sha1())); + } catch (IOException e) { + throw new RuntimeException("Couldn't delete waiting task", e); + } + } + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java index a65b257..753e429 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -16,7 +16,6 @@ import static com.google.common.truth.Truth.assertThat; import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP; -import static java.util.stream.Collectors.toList; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; @@ -29,7 +28,6 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -51,32 +49,7 @@ @TestPlugin( name = "replication", sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") -public class ReplicationStorageIT extends ReplicationDaemon { - private static final int TEST_TASK_FINISH_SECONDS = 1; - private static final int TEST_REPLICATION_MAX_RETRIES = 1; - protected static final Duration TEST_TASK_FINISH_TIMEOUT = - Duration.ofSeconds(TEST_TASK_FINISH_SECONDS); - private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT = - Duration.ofSeconds( - (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) - * TEST_REPLICATION_MAX_RETRIES - + 10); - protected ReplicationTasksStorage tasksStorage; - private DestinationsCollection destinationCollection; - private ReplicationConfig replicationConfig; - - @Override - public void setUpTestPlugin() throws Exception { - initConfig(); - setReplicationDestination( - "remote1", - "suffix1", - Optional.of("not-used-project")); // Simulates a full replication.config initialization - super.setUpTestPlugin(); - tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); - destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class); - replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class); - } +public class ReplicationStorageIT extends ReplicationStorageDaemon { @Test public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception { @@ -389,14 +362,6 @@ .filter(task -> remote.equals(task.remote())); } - private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) { - Pattern refmaskPattern = Pattern.compile(refRegex); - return tasksStorage - .streamWaiting() - .filter(task -> refmaskPattern.matcher(task.ref()).matches()) - .collect(toList()); - } - private List<ReplicateRefUpdate> listWaiting() { return tasksStorage.streamWaiting().collect(Collectors.toList()); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java new file mode 100644 index 0000000..65fc4df --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
@@ -0,0 +1,74 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; + +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.entities.Project; +import com.google.gerrit.extensions.api.projects.BranchInput; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.junit.Test; + +/** + * The tests in this class ensure that events in the storage are correctly managed under multi- + * primary scenarios. + * + * @see com.googlesource.gerrit.plugins.replication.ReplicationStorageIT + */ +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationStorageMPIT extends ReplicationStorageDaemon { + + @Test + public void workFromOnlyWaitingIsPerformed() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + setReplicationDestination("foo", "replica", ALL_PROJECTS, TEST_LONG_REPLICATION_DELAY_SECONDS); + reloadConfig(); + + String newBranchA = "refs/heads/foo_branch_a"; + String newBranchB = "refs/heads/foo_branch_b"; + String master = "refs/heads/master"; + BranchInput input = new BranchInput(); + input.revision = master; + gApi.projects().name(project.get()).branch(newBranchA).create(input); + gApi.projects().name(project.get()).branch(newBranchB).create(input); + + deleteWaitingReplicationTasks( + newBranchA); // This simulates the work being completed by other node + assertThat(listWaitingReplicationTasks("refs/heads/foo_branch_.*")).hasSize(1); + + try (Repository repo = repoManager.openRepository(targetProject); + Repository sourceRepo = repoManager.openRepository(project)) { + WaitUtil.waitUntil( + () -> checkedGetRef(repo, newBranchA) == null && checkedGetRef(repo, newBranchB) != null, + TEST_PUSH_TIMEOUT_LONG); + + Ref masterRef = getRef(sourceRepo, master); + Ref targetBranchRefA = getRef(repo, newBranchA); + Ref targetBranchRefB = getRef(repo, newBranchB); + assertThat(targetBranchRefA).isNull(); + assertThat(targetBranchRefB).isNotNull(); + assertThat(targetBranchRefB.getObjectId()).isEqualTo(masterRef.getObjectId()); + } + + assertTrue(noIncompleteTasks()); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java index 25752b4..5cfb2d0 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.common.truth.Truth.assertThat; import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks; import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream; @@ -194,6 +195,18 @@ assertNoIncompleteTasks(persistedView); } + @Test + public void duplicateWorkIsNotPerformed() { + nodeA.create(REF_UPDATE); + nodeB.create(REF_UPDATE); + + assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref()); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); + + assertThat(nodeB.start(URI_UPDATES)).isEmpty(); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); + } + public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) { try { return TestUriUpdates.create(refUpdate);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java index 48e145b..16a0363 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -84,7 +84,7 @@ @Test public void canStartWaitingUpdate() throws Exception { storage.create(REF_UPDATE); - storage.start(uriUpdates); + assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref()); assertThatStream(storage.streamWaiting()).isEmpty(); assertFalse(storage.isWaiting(uriUpdates)); assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);