Merge branch 'stable-3.4' * stable-3.4: Doc: make explicit that remoteNameStyle is for non-Gerrit repos Doc: remoteNameStyle might result in a repo name clashes Change-Id: I77b698b5976dda82160ca967ce52e510791e651f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 8ef21d0..00a46de 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.gerrit.entities.AccountGroup; @@ -219,7 +218,7 @@ private void addRecursiveParents( AccountGroup.UUID g, - Builder<AccountGroup.UUID> builder, + ImmutableSet.Builder<AccountGroup.UUID> builder, GroupIncludeCache groupIncludeCache) { for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) { if (builder.build().contains(p)) { @@ -775,6 +774,10 @@ return config.getSlowLatencyThreshold(); } + int getPushBatchSize() { + return config.getPushBatchSize(); + } + private static boolean matches(URIish uri, String urlMatch) { if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) { return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java index 1b39374..a74d198 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -14,10 +14,14 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.common.base.Suppliers.memoize; +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.gerrit.server.config.ConfigUtil; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.transport.RemoteConfig; @@ -45,6 +49,7 @@ private final RemoteConfig remoteConfig; private final int maxRetries; private final int slowLatencyThreshold; + private final Supplier<Integer> pushBatchSize; protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) { this.remoteConfig = remoteConfig; @@ -84,6 +89,31 @@ "slowLatencyThreshold", DEFAULT_SLOW_LATENCY_THRESHOLD_SECS, TimeUnit.SECONDS); + + pushBatchSize = + memoize( + () -> { + int configuredBatchSize = + Math.max( + 0, + getInt( + remoteConfig, + cfg, + "pushBatchSize", + cfg.getInt("gerrit", "pushBatchSize", 0))); + if (configuredBatchSize > 0) { + int distributionInterval = cfg.getInt("replication", "distributionInterval", 0); + if (distributionInterval > 0) { + repLog.atWarning().log( + "Push in batches cannot be turned on for remote (%s) when 'Cluster" + + " Replication' (replication.distributionInterval) is configured", + name); + return 0; + } + return configuredBatchSize; + } + return 0; + }); } @Override @@ -173,4 +203,9 @@ public int getSlowLatencyThreshold() { return slowLatencyThreshold; } + + @Override + public int getPushBatchSize() { + return pushBatchSize.get(); + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java index 747c0f6..4957a64 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -35,7 +35,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; -import com.googlesource.gerrit.plugins.replication.Destination.Factory; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; import java.net.URISyntaxException; import java.util.ArrayList; @@ -50,7 +49,7 @@ public class DestinationsCollection implements ReplicationDestinations { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private final Factory destinationFactory; + private final Destination.Factory destinationFactory; private final Provider<ReplicationQueue> replicationQueue; private volatile List<Destination> destinations; private boolean shuttingDown;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java index a347f3a..09a632d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
@@ -22,7 +22,6 @@ /** * Determine if a throwable or a cause in its causal chain is a Stale NFS File Handle * - * @param throwable * @return a boolean true if the throwable or a cause in its causal chain is a Stale NFS File * Handle */ @@ -39,7 +38,6 @@ /** * Determine if an IOException is a Stale NFS File Handle * - * @param ioe * @return a boolean true if the IOException is a Stale NFS FIle Handle */ public static boolean isStaleFileHandle(IOException ioe) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 87c35ee..8b3c9e2 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -19,12 +19,14 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; @@ -564,7 +566,36 @@ lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog())))); } - return tn.push(NullProgressMonitor.INSTANCE, todo); + return pushInBatches(tn, todo); + } + + private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo) + throws NotSupportedException, TransportException { + int batchSize = pool.getPushBatchSize(); + if (batchSize == 0 || todo.size() <= batchSize) { + return tn.push(NullProgressMonitor.INSTANCE, todo); + } + + List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize); + repLog.atInfo().log("Push to %s in %d batches", uri, batches.size()); + AggregatedPushResult result = new AggregatedPushResult(); + int completedBatch = 1; + for (List<RemoteRefUpdate> batch : batches) { + repLog.atInfo().log( + "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri); + result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch)); + + // check if push should be no longer continued + if (wasCanceled()) { + repLog.atInfo().log( + "Push for replication to %s was canceled after %d completed batch and thus won't be" + + " rescheduled", + uri, completedBatch); + break; + } + completedBatch++; + } + return result; } private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) { @@ -836,4 +867,24 @@ super(uri, message); } } + + /** + * Internal class used to aggregate PushResult objects from all push batches. See {@link + * PushOne#pushInBatches} for usage. + */ + private static class AggregatedPushResult extends PushResult { + private final List<PushResult> results = new ArrayList<>(); + + void addResult(PushResult result) { + results.add(result); + } + + @Override + public Collection<RemoteRefUpdate> getRemoteUpdates() { + return results.stream() + .map(PushResult::getRemoteUpdates) + .flatMap(Collection::stream) + .collect(toList()); + } + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java index 5450dd5..4f33937 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -30,15 +30,7 @@ public interface PushResultProcessing { public static final PushResultProcessing NO_OP = new PushResultProcessing() {}; - /** - * Invoked when a ref has been replicated to one node. - * - * @param project - * @param ref - * @param uri - * @param status - * @param refStatus - */ + /** Invoked when a ref has been replicated to one node. */ default void onRefReplicatedToOneNode( String project, String ref, @@ -46,20 +38,10 @@ RefPushResult status, RemoteRefUpdate.Status refStatus) {} - /** - * Invoked when a ref has been replicated to all nodes. - * - * @param project - * @param ref - * @param nodesCount - */ + /** Invoked when a ref has been replicated to all nodes */ default void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {} - /** - * Invoked when all refs have been replicated to all nodes. - * - * @param totalPushTasksCount - */ + /** Invoked when all refs have been replicated to all nodes */ default void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {} /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java index b66e73c..5fe0323 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -98,6 +98,13 @@ int getSlowLatencyThreshold(); /** + * Returns the maximum number of refs that can be pushed in a single push operation. + * + * @return batch size, zero if unlimited. + */ + int getPushBatchSize(); + + /** * Whether the remote configuration is for a single project only * * @return true, when configuration is for a single project, false otherwise
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java index 18ccc66..2fa6c34 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -54,7 +54,7 @@ */ List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref); - /** @return true if there are no destinations, false otherwise. */ + /** Returns true if there are no destinations, false otherwise. */ boolean isEmpty(); /**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index f5c6185..5458b6c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -71,16 +71,18 @@ return null; } - /* (non-Javadoc) - * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart() + /** + * See + * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()} */ @Override public boolean isReplicateAllOnPluginStart() { return replicateAllOnPluginStart; } - /* (non-Javadoc) - * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate() + /** + * See + * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()} */ @Override public boolean isDefaultForceUpdate() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 4abb295..5310c14 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -229,7 +229,7 @@ @Override public void onDone() { if (Prune.TRUE.equals(prune)) { - pruneNoLongerPending(taskNamesByReplicateRefUpdate.values()); + pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values())); } replaying.set(false); } @@ -242,7 +242,7 @@ } } - private void pruneNoLongerPending(Collection<String> prunableTaskNames) { + private void pruneNoLongerPending(Set<String> prunableTaskNames) { // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes. // We also cannot access them by taskId since PushOnes don't have a taskId, they do have // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java index 871ed52..fa65803 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -140,12 +140,12 @@ } private RefReplicationStatus getRefStatus(String project, String ref) { - if (!statusByProjectRef.contains(project, ref)) { - RefReplicationStatus refStatus = new RefReplicationStatus(project, ref); + RefReplicationStatus refStatus = statusByProjectRef.get(project, ref); + if (refStatus == null) { + refStatus = new RefReplicationStatus(project, ref); statusByProjectRef.put(project, ref, refStatus); - return refStatus; } - return statusByProjectRef.get(project, ref); + return refStatus; } public void waitForReplication() throws InterruptedException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java index fa8b44c..f63df98 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -60,7 +60,6 @@ } ReplicationState state = new ReplicationState(new CommandProcessing(this)); - Future<?> future = null; ReplicationFilter projectFilter; @@ -70,7 +69,8 @@ projectFilter = new ReplicationFilter(projectPatterns); } - future = pushFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS); + Future<?> future = + pushFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS); if (wait) { if (future != null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java index b0c554e..c8347d1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
@@ -16,22 +16,16 @@ import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; -import com.google.gerrit.entities.Project; -import com.google.gerrit.server.events.RefEvent; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; import java.util.Objects; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.RemoteRefUpdate.Status; import org.eclipse.jgit.transport.URIish; -public class RefReplicatedEvent extends RefEvent { +public class RefReplicatedEvent extends RemoteRefReplicationEvent { public static final String TYPE = "ref-replicated"; - public final String project; - public final String ref; @Deprecated public final String targetNode; - public final String targetUri; - public final String status; public final Status refStatus; public RefReplicatedEvent( @@ -40,23 +34,9 @@ URIish targetUri, RefPushResult status, RemoteRefUpdate.Status refStatus) { - super(TYPE); - this.project = project; - this.ref = ref; + super(TYPE, project, ref, targetUri, status.toString()); this.targetNode = resolveNodeName(targetUri); - this.status = status.toString(); this.refStatus = refStatus; - this.targetUri = targetUri.toASCIIString(); - } - - @Override - public Project.NameKey getProjectNameKey() { - return Project.nameKey(project); - } - - @Override - public String getRefName() { - return ref; } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java new file mode 100644 index 0000000..3950c3f --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java
@@ -0,0 +1,47 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import com.google.gerrit.common.Nullable; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.events.RefEvent; +import org.eclipse.jgit.transport.URIish; + +public class RemoteRefReplicationEvent extends RefEvent { + + public final String project; + public final String ref; + public final String status; + public final String targetUri; + + public RemoteRefReplicationEvent( + String type, String project, String ref, URIish targetUri, @Nullable String status) { + super(type); + this.project = project; + this.ref = ref; + this.status = status; + this.targetUri = targetUri.toASCIIString(); + } + + @Override + public Project.NameKey getProjectNameKey() { + return Project.nameKey(project); + } + + @Override + public String getRefName() { + return ref; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java index 4a1ade8..a952777 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
@@ -17,23 +17,16 @@ import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import com.google.gerrit.entities.Project; -import com.google.gerrit.server.events.RefEvent; import org.eclipse.jgit.transport.URIish; -public class ReplicationScheduledEvent extends RefEvent { +public class ReplicationScheduledEvent extends RemoteRefReplicationEvent { public static final String TYPE = "ref-replication-scheduled"; - public final String project; - public final String ref; @Deprecated public final String targetNode; - public final String targetUri; public ReplicationScheduledEvent(String project, String ref, URIish targetUri) { - super(TYPE); - this.project = project; - this.ref = ref; + super(TYPE, project, ref, targetUri, null); this.targetNode = resolveNodeName(targetUri); - this.targetUri = targetUri.toASCIIString(); } @Override
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md index ded9d84..deffe5e 100644 --- a/src/main/resources/Documentation/about.md +++ b/src/main/resources/Documentation/about.md
@@ -16,11 +16,9 @@ local path as replication target. This makes e.g. sense if a network share is mounted to which the repositories should be replicated. -In multi-primary scenario, any replication work which is already -in-flight or completed by the other nodes is not performed to -avoid extra work. This is because, the storage for replication -events is shared between multiple primaries.(The storage location -is specified in the config using: `replication.eventsDirectory`). +It is possible to +[configure](config.md#configuring-cluster-replication) the plugin so +that multiple primaries share the replication work approximately evenly. Replication of account data (NoteDb) ------------------------------------ @@ -30,7 +28,6 @@ * `refs/users/*` (user branches) * `refs/meta/external-ids` (external IDs) -* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves) +* `refs/starred-changes/*` (star labels, not needed for Gerrit replicas) * `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit - slaves) - + replicas)
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md index e12ec92..1a77c72 100644 --- a/src/main/resources/Documentation/cmd-start.md +++ b/src/main/resources/Documentation/cmd-start.md
@@ -129,10 +129,10 @@ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/* ``` -Replicate projects whose path includes a folder named `vendor` to host slave1: +Replicate projects whose path includes a folder named `vendor` to host replica1: ``` - $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*) + $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url replica1 ^(|.*/)vendor(|/.*) ``` Replicate to only one specific destination URL:
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index af91032..d401030 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -18,9 +18,9 @@ ssh-keygen -m PEM -t rsa -C "your_email@example.com" ``` -<a name="example_file"> +<a name="example_file"></a> Next, create `$site_path/etc/replication.config` as a Git-style config -file, for example to replicate in parallel to four different hosts:</a> +file, for example to replicate in parallel to four different hosts: ``` [remote "host-one"] @@ -46,6 +46,59 @@ To manually trigger replication at runtime, see SSH command [start](cmd-start.md). +<a name="configuring-cluster-replication"></a> +Configuring Cluster Replication +------------------------------- + +The replication plugin is designed to allow multiple primaries in a +cluster to efficiently cooperate together via the replication event +persistence subsystem. To enable this cooperation, the directory +pointed to by the replication.eventsDirectory config key must reside on +a shared filesystem, such as NFS. By default, simply pointing multiple +primaries to the same eventsDirectory will enable some cooperation by +preventing the same replication push from being duplicated by more +than one primary. + +To further improve cooperation across the cluster, the +replication.distributionInterval config value can be set. With +distribution enabled, the replication queues for all the nodes sharing +the same eventsDirectory will reflect approximately the same outstanding +replication work (i.e. tasks waiting in the queue). Replication pushes +which are running will continue to only be visible in the queue of the +node on which the push is actually happening. This feature helps +administrators get a cluster wide view of outstanding replication +tasks, while allowing replication tasks triggered by one primary to be +fulfilled by another node which is less busy. + +This enhanced replication work distribution allows the amount of +replication work a cluster can handle to scale more evenly and linearly +with the amount of primaries in the cluster. Adding more nodes to a +cluster without distribution enabled will generally not allow the thread +count per remote to be reduced without impacting service levels to those +remotes. This is because without distribution, all events triggered by a +node will only be fulfilled by the node which triggered the event, even +if all the other nodes in the cluster are idle. This behavior implies +that each node should be configured in a way that allows it alone to +provide the level of service which each remote requires. However, with +distribution enabled, it becomes possible to reduce the amount of +replication threads configured per remote proportionally to the amount +of nodes in the cluster, while maintaining the same approximate service +level as before adding new nodes. + +Threads per remote reduction without service impacts is possible with +distribution, because when configuring a node it can be expected that +other nodes will pick up some of the work it triggers. Then the node no +longer needs to be configured as if it were the only node in the +cluster. For example, if a remote requires 6 threads with one node to +achieve acceptable service, it should only take 2 threads on 3 +equivalently powered nodes to provide the same service level with +distribution enabled. Scaling down such thread requirements per remote +results in a reduced memory footprint per remote on each node in the +cluster. This enables the nodes in the cluster to now scale to handle +more remotes with the approximate same service level than without +distribution. The amount of extra supported remotes then also scales +approximately linearly with the extra nodes in a cluster. + File `replication.config` ------------------------- @@ -95,6 +148,20 @@ `(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]` +gerrit.pushBatchSize +: Max number of refs that are pushed in a single push operation. If more + than pushBatchSize are to be pushed then they are divided into batches + and pushed sequentially one-by-one. + + Can be overridden at remote-level by setting pushBatchSize. + + By default, `0`, which means that there are no limitations on number of + refs to be transferred in a single push operation. Note that negative + values are treated as `0`. + + Note that `pushBatchSize` is ignored when *Cluster Replication* is configured + - when `replication.distributionInterval` has value > 0. + gerrit.sshCommandTimeout : Timeout for SSH command execution. If 0, there is no timeout and the client waits indefinitely. By default, 0. @@ -507,6 +574,19 @@ default: 15 minutes +remote.NAME.pushBatchSize +: Max number of refs that are pushed in a single push operation to this + destination. If more than `pushBatchSize` are to be pushed then they are + divided into batches and pushed sequentially one-by-one. + + By default it falls back to `gerrit.pushBatchSize` value (which is `0` if + not set, which means that there are no limitations on number of refs to + be transferred in a single push operation). Note that negative values are + treated as `0`. + + Note that `pushBatchSize` is ignored when *Cluster Replication* is configured + - when `replication.distributionInterval` has value > 0. + Directory `replication` -------------------- The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md index 076aded..83e7e45 100644 --- a/src/main/resources/Documentation/extension-point.md +++ b/src/main/resources/Documentation/extension-point.md
@@ -2,7 +2,7 @@ ============== The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script. -Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and +Extension points can be defined from the replication plugin only when it is loaded as [libModule](../../../Documentation/config-gerrit.html#gerrit.installModule) and implemented by another plugin by declaring a `provided` dependency from the replication plugin. ### Install extension libModule
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java new file mode 100644 index 0000000..646f915 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
@@ -0,0 +1,101 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.when; + +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.transport.RemoteConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class DestinationConfigurationTest { + private static final String REMOTE = "foo"; + + @Mock private RemoteConfig remoteConfigMock; + @Mock private Config cfgMock; + + private DestinationConfiguration objectUnderTest; + + @Before + public void setUp() { + when(remoteConfigMock.getName()).thenReturn(REMOTE); + when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenReturn(new String[] {}); + objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock); + } + + @Test + public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() { + // given + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1); + when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(0); + } + + @Test + public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() { + // given + int globalPushBatchSize = 1; + when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize); + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize)) + .thenReturn(globalPushBatchSize); + when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(0); + } + + @Test + public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() { + // given + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(1); + } + + @Test + public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() { + // given + int globalPushBatchSize = 1; + when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize); + when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize)) + .thenReturn(globalPushBatchSize); + + // when + int actual = objectUnderTest.getPushBatchSize(); + + // then + assertThat(actual).isEqualTo(globalPushBatchSize); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java index a1f61fe..5e6bde8 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -65,6 +65,7 @@ return implementedByOverrider; } } catch (NoSuchMethodException | SecurityException e) { + return null; } return null; }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index 94f0dc4..bb3e886 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,8 +17,10 @@ import static org.eclipse.jgit.lib.Ref.Storage.NEW; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -36,6 +38,7 @@ import com.google.gerrit.server.util.IdGenerator; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -115,7 +118,8 @@ new ObjectIdRef.Unpeeled( NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001")); - localRefs = Arrays.asList(newLocalRef); + localRefs = new ArrayList<>(); + localRefs.add(newLocalRef); Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId()); remoteRefs = new HashMap<>(); @@ -223,6 +227,52 @@ verify(transportMock, never()).push(any(), any()); } + @Test + public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured() + throws InterruptedException, IOException { + replicateTwoRefs(createPushOne(null)); + verify(transportMock).push(any(), any()); + } + + @Test + public void shouldPushInBatchesWhenPushBatchSizeIsConfigured() + throws InterruptedException, IOException { + when(destinationMock.getPushBatchSize()).thenReturn(1); + replicateTwoRefs(createPushOne(null)); + verify(transportMock, times(2)).push(any(), any()); + } + + @Test + public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled() + throws InterruptedException, IOException { + when(destinationMock.getPushBatchSize()).thenReturn(1); + PushOne pushOne = createPushOne(null); + + // cancel replication during the first push + doAnswer( + invocation -> { + pushOne.setCanceledWhileRunning(); + return new PushResult(); + }) + .when(transportMock) + .push(any(), any()); + + replicateTwoRefs(pushOne); + verify(transportMock, times(1)).push(any(), any()); + } + + private void replicateTwoRefs(PushOne pushOne) throws InterruptedException { + ObjectIdRef barLocalRef = + new ObjectIdRef.Unpeeled( + NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001")); + localRefs.add(barLocalRef); + + pushOne.addRef(PushOne.ALL_REFS); + pushOne.run(); + + isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS); + } + private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) { PushOne push = new PushOne(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java index 9606371..b6a3ed1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -91,6 +91,18 @@ } protected void setReplicationDestination( + String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize) + throws IOException { + setReplicationDestination( + remoteName, + Arrays.asList(replicaSuffix), + project, + TEST_REPLICATION_DELAY_SECONDS, + false, + Optional.ofNullable(pushBatchSize)); + } + + protected void setReplicationDestination( String remoteName, String replicaSuffix, Optional<String> project, boolean mirror) throws IOException { setReplicationDestination( @@ -125,13 +137,37 @@ protected void setReplicationDestination( String remoteName, + String replicaSuffix, + Optional<String> project, + int replicationDelay, + boolean mirror, + Optional<Integer> pushBatchSize) + throws IOException { + setReplicationDestination( + remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize); + } + + protected void setReplicationDestination( + String remoteName, List<String> replicaSuffixes, Optional<String> project, int replicationDelay, boolean mirror) throws IOException { setReplicationDestination( - config, remoteName, replicaSuffixes, project, replicationDelay, mirror); + remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty()); + } + + protected void setReplicationDestination( + String remoteName, + List<String> replicaSuffixes, + Optional<String> project, + int replicationDelay, + boolean mirror, + Optional<Integer> pushBatchSize) + throws IOException { + setReplicationDestination( + config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize); config.setBoolean("gerrit", null, "autoReload", true); config.save(); } @@ -142,7 +178,8 @@ Optional<String> project, int replicationDelay) throws IOException { - setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false); + setReplicationDestination( + config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty()); } protected void setReplicationDestination( @@ -151,7 +188,8 @@ List<String> replicaSuffixes, Optional<String> project, int replicationDelay, - boolean mirror) + boolean mirror, + Optional<Integer> pushBatchSize) throws IOException { List<String> replicaUrls = @@ -163,6 +201,7 @@ remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES); remoteConfig.setBoolean("remote", remoteName, "mirror", mirror); project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj)); + pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs)); remoteConfig.save(); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java index 5ade68d..3d0dfc1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -19,6 +19,7 @@ import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.WaitUtil; +import com.google.gerrit.entities.BranchNameKey; import com.google.gerrit.entities.Project; import com.google.gerrit.server.git.WorkQueue; import java.time.Duration; @@ -89,8 +90,9 @@ reloadConfig(); String newBranch = "refs/heads/foo_branch"; - createBranch(project, "refs/heads/master", newBranch); + createBranch(BranchNameKey.create(project, newBranch)); + assertThat(listWaitingReplicationTasks(newBranch)).hasSize(1); deleteWaitingReplicationTasks(newBranch); // This simulates the work being started by other node assertThat(waitForProjectTaskCount(0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS)))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java index 2d69a47..4cd55f9 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -15,6 +15,7 @@ package com.googlesource.gerrit.plugins.replication; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; import com.google.common.base.Objects; import com.google.gerrit.acceptance.PushOneCommit.Result; @@ -42,11 +43,13 @@ import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent; import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent; +import java.net.URISyntaxException; import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.function.Predicate; import java.util.function.Supplier; +import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.URIish; import org.junit.Before; import org.junit.Test; @@ -308,6 +311,34 @@ assertThat(origEvent).isEqualTo(gotEvent); } + @Test + public void shouldSerializeRefReplicatedEvent() throws URISyntaxException { + RefReplicatedEvent origEvent = + new RefReplicatedEvent( + project.get(), + "refs/heads/master", + new URIish(String.format("git://someHost/%s.git", project.get())), + ReplicationState.RefPushResult.SUCCEEDED, + RemoteRefUpdate.Status.OK); + + assertThat(origEvent) + .isEqualTo(eventGson.fromJson(eventGson.toJson(origEvent), RefReplicatedEvent.class)); + } + + @Test + public void shouldSerializeReplicationScheduledEvent() throws URISyntaxException { + ReplicationScheduledEvent origEvent = + new ReplicationScheduledEvent( + project.get(), + "refs/heads/master", + new URIish(String.format("git://someHost/%s.git", project.get()))); + + assertTrue( + equals( + origEvent, + eventGson.fromJson(eventGson.toJson(origEvent), ReplicationScheduledEvent.class))); + } + private <T extends RefEvent> void waitForRefEvent(Supplier<List<T>> events, String refName) throws InterruptedException { WaitUtil.waitUntil( @@ -352,4 +383,24 @@ return Objects.hashCode(event); } } + + private boolean equals(ReplicationScheduledEvent scheduledEvent, Object other) { + if (!(other instanceof ReplicationScheduledEvent)) { + return false; + } + ReplicationScheduledEvent event = (ReplicationScheduledEvent) other; + if (!Objects.equal(event.project, scheduledEvent.project)) { + return false; + } + if (!Objects.equal(event.ref, scheduledEvent.ref)) { + return false; + } + if (!Objects.equal(event.targetUri, scheduledEvent.targetUri)) { + return false; + } + if (!Objects.equal(event.status, scheduledEvent.status)) { + return false; + } + return Objects.equal(event.targetNode, scheduledEvent.targetNode); + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index a174e91..33bd91d 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -348,6 +348,28 @@ } } + @Test + public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS, 1); + reloadConfig(); + + // creating a change results in 2 refs creation therefore it already qualifies for push in two + // batches of size 1 each + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().refName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60)); + + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java new file mode 100644 index 0000000..cf8dbe3 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
@@ -0,0 +1,68 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.WaitUtil; +import com.google.gerrit.entities.Project; +import java.time.Duration; +import java.util.Optional; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.junit.Test; + +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationPushInBatchesIT extends ReplicationDaemon { + + @Override + public void setUpTestPlugin() throws Exception { + initConfig(); + config.setInt("gerrit", null, "pushBatchSize", 1); + config.save(); + setReplicationDestination( + "remote1", + "suffix1", + Optional.of("not-used-project")); // Simulates a full replication.config initialization + super.setUpTestPlugin(); + } + + @Test + public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + // creating a change results in 2 refs creation therefore it already qualifies for push in two + // batches of size 1 each + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().refName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60)); + + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } +}