Merge "ReplicationState: Streamline getRefStatus()"
diff --git a/BUILD b/BUILD
index 4c74bf2..ee97660 100644
--- a/BUILD
+++ b/BUILD
@@ -14,9 +14,6 @@
"Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.SshModule",
],
resources = glob(["src/main/resources/**/*"]),
- deps = [
- "//lib/auto:auto-value-gson",
- ],
)
junit_tests(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index e0a9354..8ef21d0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -63,6 +63,7 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
@@ -70,7 +71,6 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -463,7 +463,7 @@
}
void scheduleDeleteProject(URIish uri, Project.NameKey project, ProjectDeletionState state) {
- repLog.atFine().log("scheduling deletion of project {} at {}", project, uri);
+ repLog.atFine().log("scheduling deletion of project %s at %s", project, uri);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
pool.schedule(deleteProjectFactory.create(uri, project, state), 0, TimeUnit.SECONDS);
@@ -608,15 +608,15 @@
}
}
- public Set<String> getPrunableTaskNames() {
- Set<String> names = new HashSet<>();
+ public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {
+ Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>();
for (PushOne push : pending.values()) {
- if (!replicationTasksStorage.get().isWaiting(push)) {
- repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI());
- names.add(push.toString());
+ String taskName = push.toString();
+ for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) {
+ taskNameByReplicateRefUpdate.put(refUpdate, taskName);
}
}
- return names;
+ return taskNameByReplicateRefUpdate;
}
boolean wouldPush(URIish uri, Project.NameKey project, String ref) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index d195aa3..0fa02ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -68,7 +68,8 @@
if (exitCode == 1) {
logger.atInfo().log(
"DeleteProject plugin is not installed on %s;"
- + " will not try to forward this operation to that host");
+ + " will not try to forward this operation to that host",
+ uri);
withoutDeleteProjectPlugin.add(uri);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 404d4bd..87c35ee 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -468,6 +468,7 @@
pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR);
}
} else {
+ retryDone();
repLog.atSevere().log(
"Giving up after %d '%s' failures during replication to %s",
updateRefRetryCount, e.getMessage(), uri);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index ed474ae..5310c14 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,6 +70,11 @@
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
private Distributor distributor;
+ protected enum Prune {
+ TRUE,
+ FALSE;
+ }
+
@Inject
ReplicationQueue(
ReplicationConfig rc,
@@ -94,7 +100,7 @@
destinations.get().startup(workQueue);
running = true;
replicationTasksStorage.recoverAll();
- firePendingEvents();
+ synchronizePendingEvents(Prune.FALSE);
fireBeforeStartupEvents();
distributor = new Distributor(workQueue);
}
@@ -193,8 +199,14 @@
}
}
- private void firePendingEvents() {
+ private void synchronizePendingEvents(Prune prune) {
if (replaying.compareAndSet(false, true)) {
+ final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+ if (Prune.TRUE.equals(prune)) {
+ for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+ taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
+ }
+ }
new ChainedScheduler.StreamScheduler<>(
workQueue.getDefaultQueue(),
replicationTasksStorage.streamWaiting(),
@@ -203,6 +215,9 @@
public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
try {
fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+ if (Prune.TRUE.equals(prune)) {
+ taskNamesByReplicateRefUpdate.remove(u);
+ }
} catch (URISyntaxException e) {
repLog.atSevere().withCause(e).log(
"Encountered malformed URI for persisted event %s", u);
@@ -213,6 +228,9 @@
@Override
public void onDone() {
+ if (Prune.TRUE.equals(prune)) {
+ pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
+ }
replaying.set(false);
}
@@ -224,17 +242,12 @@
}
}
- private void pruneCompleted() {
+ private void pruneNoLongerPending(Set<String> prunableTaskNames) {
// Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
// We also cannot access them by taskId since PushOnes don't have a taskId, they do have
- // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
- // do use the same name as returned by toString() though, so that be used to correlate
+ // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
+ // do use the same name as returned by toString() though, so that can be used to correlate
// PushOnes with queue tasks despite their wrappers.
- Set<String> prunableTaskNames = new HashSet<>();
- for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
- prunableTaskNames.addAll(destination.getPrunableTaskNames());
- }
-
for (WorkQueue.Task<?> task : workQueue.getTasks()) {
WorkQueue.Task.State state = task.getState();
if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
@@ -310,8 +323,7 @@
return;
}
try {
- firePendingEvents();
- pruneCompleted();
+ synchronizePendingEvents(Prune.TRUE);
} catch (Exception e) {
repLog.atSevere().withCause(e).log("error distributing tasks");
}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index ded9d84..d216366 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,11 +16,9 @@
local path as replication target. This makes e.g. sense if a network
share is mounted to which the repositories should be replicated.
-In multi-primary scenario, any replication work which is already
-in-flight or completed by the other nodes is not performed to
-avoid extra work. This is because, the storage for replication
-events is shared between multiple primaries.(The storage location
-is specified in the config using: `replication.eventsDirectory`).
+It is possible to
+[configure](config.html#configuring-cluster-replication) the plugin so
+that multiple primaries share the replication work approximately evenly.
Replication of account data (NoteDb)
------------------------------------
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f4ea9d6..8843671 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -46,6 +46,59 @@
To manually trigger replication at runtime, see
SSH command [start](cmd-start.md).
+<a name="configuring-cluster-replication"></a>
+Configuring Cluster Replication
+-------------------------------
+
+The replication plugin is designed to allow multiple primaries in a
+cluster to efficiently cooperate together. This cooperation is based on
+the replication event persistence subsystem and thus the directory
+pointed to by the replication.eventsDirectory config key must reside on
+a shared filesystem, such as NFS, to enable this cooperation. By
+default simply pointing multiple primaries to the same eventsDirectory
+will enable some cooperation by preventing the same replication push
+from being duplicated by more than one primary.
+
+To further improve cooperation across the cluster, the
+replication.distributionInterval config value can be set. With
+distribution enabled, the replication queues for all the nodes sharing
+the same eventsDirectory will reflect approximately the same outstanding
+replication work (i.e. tasks waiting in the queue). Replication pushes
+which are running will continue to only be visible in the queue of the
+node on which the push is actually happening. This feature not only
+helps administrators get a cluster wide view of outstanding replication
+tasks, it allows replication tasks triggered by one primary to be
+fullfilled by another node which is less busy.
+
+This enhanced replication work distribution allows the amount of
+replication work a cluster can handle to scale more evenly and linearly
+with the amount of primaries in the cluster. Adding more nodes to a
+cluster without distribution enabled will generally not allow the thread
+count per remote to be reduced without impacting service levels to those
+remotes. This is because without distribution, all events triggered by a
+node will only be fullfilled by the node which triggered the event, even
+if all the other nodes in the cluster are idle. This behavior implies
+that each node should be configured in a way that allows it alone to
+provide the level of service which each remote requires. However with
+distribution enabled, it becomes possible to reduce the amount of
+replication threads configured per remote proportionally to the amount
+of nodes in the cluster while maintaining the same approximate service
+level as before adding new nodes.
+
+Thread per remote reduction without service impacts is possible with
+distribution because when configuring a node it can be expected that
+other nodes will pick up some of the work it triggers and it no longer
+needs to be configured as if it were the only node in the cluster. For
+example, if a remote requires 6 threads with one node to achieve
+acceptable service, it should only take 2 threads on 3 equivalently
+powered nodes to provide the same service level with distribution
+enabled. Scaling down of the thread requirements per remote results in a
+reduced memory footprint per remote on each node in the cluster and this
+enables the nodes in the cluster to now scale to handle more remotes
+with the approximate same service level than without distribution. The
+amount of extra supported remotes then also scales approximately
+linearly with the extra nodes in a cluster.
+
File `replication.config`
-------------------------
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 202d77e..9606371 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -26,6 +26,7 @@
import com.google.gerrit.extensions.api.changes.NotifyHandling;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.LocalDiskRepositoryManager;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.file.Path;
@@ -71,6 +72,18 @@
protected Path gitPath;
protected FileBasedConfig config;
+ protected void setDistributionInterval(int interval) throws IOException {
+ config.setInt("replication", null, "distributionInterval", interval);
+ config.save();
+ }
+
+ protected String getProjectUri(Project.NameKey project) throws Exception {
+ return ((LocalDiskRepositoryManager) repoManager)
+ .getBasePath(project)
+ .resolve(project.get() + ".git")
+ .toString();
+ }
+
protected void setReplicationDestination(
String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
setReplicationDestination(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
new file mode 100644
index 0000000..3d0dfc1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+/**
+ * The tests in this class ensure the correctness of {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationQueue.Distributor}
+ */
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationDistributorIT extends ReplicationStorageDaemon {
+ private static final int TEST_DISTRIBUTION_INTERVAL_SECONDS = 3;
+ private static final int TEST_DISTRIBUTION_DURATION_SECONDS = 1;
+ private static final int TEST_DISTRIBUTION_CYCLE_SECONDS =
+ TEST_DISTRIBUTION_INTERVAL_SECONDS + TEST_DISTRIBUTION_DURATION_SECONDS;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ initConfig();
+ setDistributionInterval(TEST_DISTRIBUTION_INTERVAL_SECONDS);
+ super.setUpTestPlugin();
+ }
+
+ @Test
+ public void distributorAddingTaskFromStorage() throws Exception {
+ String remote = "foo";
+ String replica = "replica";
+ String master = "refs/heads/master";
+ String newBranch = "refs/heads/foo_branch";
+ Project.NameKey targetProject = createTestProject(project + replica);
+ ReplicationTasksStorage.ReplicateRefUpdate ref =
+ ReplicationTasksStorage.ReplicateRefUpdate.create(
+ project.get(), newBranch, new URIish(getProjectUri(targetProject)), remote);
+ createBranch(project, master, newBranch);
+ setReplicationDestination(remote, replica, ALL_PROJECTS);
+ reloadConfig();
+
+ tasksStorage.create(ref); // Mimics RefUpdate inserted into storage by other Primary
+ WaitUtil.waitUntil(
+ () -> getProjectTasks().size() != 0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS));
+
+ List<WorkQueue.Task<?>> tasks = getProjectTasks();
+ assertThat(tasks).hasSize(1); // ReplicationTask for the created ref in queue
+ assertThat(waitForProjectTaskCount(0, TEST_PUSH_TIMEOUT)).isTrue();
+
+ try (Repository targetRepo = repoManager.openRepository(targetProject);
+ Repository sourceRepo = repoManager.openRepository(project)) {
+ Ref masterRef = getRef(sourceRepo, master);
+ Ref targetBranchRef = getRef(targetRepo, newBranch);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+ }
+ }
+
+ @Test
+ public void distributorPrunesTaskFromWorkQueue() throws Exception {
+ createTestProject(project + "replica");
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE);
+ reloadConfig();
+
+ String newBranch = "refs/heads/foo_branch";
+ createBranch(BranchNameKey.create(project, newBranch));
+
+ assertThat(listWaitingReplicationTasks(newBranch)).hasSize(1);
+ deleteWaitingReplicationTasks(newBranch); // This simulates the work being started by other node
+
+ assertThat(waitForProjectTaskCount(0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS)))
+ .isTrue();
+ }
+
+ private List<WorkQueue.Task<?>> getProjectTasks() {
+ return getInstance(WorkQueue.class).getTasks().stream()
+ .filter(t -> t instanceof WorkQueue.ProjectTask)
+ .collect(Collectors.toList());
+ }
+
+ private void createBranch(Project.NameKey project, String fromRef, String refToCreate)
+ throws Exception {
+ try (Repository repo = repoManager.openRepository(project)) {
+ Ref from = repo.exactRef(fromRef);
+ RefUpdate createBranch = repo.updateRef(refToCreate);
+ createBranch.setNewObjectId(from.getObjectId());
+ createBranch.update();
+ }
+ }
+
+ private boolean waitForProjectTaskCount(int count, Duration duration) {
+ try {
+ WaitUtil.waitUntil(() -> getProjectTasks().size() == count, duration);
+ return true;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index e508b32..e2e1e21 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -250,26 +250,6 @@
replicateBranchDeletion(false);
}
- private void replicateBranchDeletion(boolean mirror) throws Exception {
- setReplicationDestination("foo", "replica", ALL_PROJECTS);
- reloadConfig();
-
- Project.NameKey targetProject = createTestProject(project + "replica");
- String branchToDelete = "refs/heads/todelete";
- String master = "refs/heads/master";
- BranchInput input = new BranchInput();
- input.revision = master;
- gApi.projects().name(project.get()).branch(branchToDelete).create(input);
- isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
-
- setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
- reloadConfig();
-
- gApi.projects().name(project.get()).branch(branchToDelete).delete();
-
- assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
- }
-
@Test
public void shouldCleanupTasksAfterNewProjectReplication() throws Exception {
setReplicationDestination("task_cleanup_project", "replica", ALL_PROJECTS);
@@ -332,6 +312,26 @@
WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
}
+ private void replicateBranchDeletion(boolean mirror) throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey targetProject = createTestProject(project + "replica");
+ String branchToDelete = "refs/heads/todelete";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(branchToDelete).create(input);
+ isPushCompleted(targetProject, branchToDelete, TEST_PUSH_TIMEOUT);
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE, mirror);
+ reloadConfig();
+
+ gApi.projects().name(project.get()).branch(branchToDelete).delete();
+
+ assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
+ }
+
private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
PushOne pushOne = queue.pending.get(uri);
return pushOne == null ? false : pushOne.isRetrying();