Merge "Move HttpClientProvider to the ReplicationModule"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
index 89b97e9..c61a123 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -85,7 +85,7 @@
super(
threadPool,
stream.iterator(),
- new ForwardingRunner<T>(runner) {
+ new ForwardingRunner<>(runner) {
@Override
public void onDone() {
stream.close();
@@ -109,7 +109,7 @@
try {
runner.run(item);
} catch (RuntimeException e) { // catch to prevent chain from breaking
- logger.atSevere().withCause(e).log("Error while running: " + item);
+ logger.atSevere().withCause(e).log("Error while running: %s", item);
}
if (!scheduledNext) {
runner.onDone();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 151e584..e5f125b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -270,6 +270,8 @@
}
private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+ // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
+ // values to loop over them.
for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
pushOneFunction.apply(pushOne);
}
@@ -453,6 +455,7 @@
synchronized (stateLock) {
URIish uri = pushOp.getURI();
pending.remove(uri);
+ pushOp.notifyNotAttempted(pushOp.getRefs());
}
}
@@ -629,7 +632,7 @@
return true;
}
- boolean matches = (new ReplicationFilter(projects)).matches(project);
+ boolean matches = new ReplicationFilter(projects).matches(project);
if (!matches) {
repLog.atFine().log(
"Skipping replication of project %s; does not match filter", project.get());
@@ -769,6 +772,10 @@
return config.getSlowLatencyThreshold();
}
+ int getPushBatchSize() {
+ return config.getPushBatchSize();
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 1b39374..a74d198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -14,10 +14,14 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.google.common.base.Suppliers.memoize;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.gerrit.server.config.ConfigUtil;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.transport.RemoteConfig;
@@ -45,6 +49,7 @@
private final RemoteConfig remoteConfig;
private final int maxRetries;
private final int slowLatencyThreshold;
+ private final Supplier<Integer> pushBatchSize;
protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
this.remoteConfig = remoteConfig;
@@ -84,6 +89,31 @@
"slowLatencyThreshold",
DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
TimeUnit.SECONDS);
+
+ pushBatchSize =
+ memoize(
+ () -> {
+ int configuredBatchSize =
+ Math.max(
+ 0,
+ getInt(
+ remoteConfig,
+ cfg,
+ "pushBatchSize",
+ cfg.getInt("gerrit", "pushBatchSize", 0)));
+ if (configuredBatchSize > 0) {
+ int distributionInterval = cfg.getInt("replication", "distributionInterval", 0);
+ if (distributionInterval > 0) {
+ repLog.atWarning().log(
+ "Push in batches cannot be turned on for remote (%s) when 'Cluster"
+ + " Replication' (replication.distributionInterval) is configured",
+ name);
+ return 0;
+ }
+ return configuredBatchSize;
+ }
+ return 0;
+ });
}
@Override
@@ -173,4 +203,9 @@
public int getSlowLatencyThreshold() {
return slowLatencyThreshold;
}
+
+ @Override
+ public int getPushBatchSize() {
+ return pushBatchSize.get();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 87c35ee..8afcf9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -19,12 +19,14 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
@@ -49,7 +51,6 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.jcraft.jsch.JSchException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -453,10 +454,7 @@
} catch (NotSupportedException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (TransportException e) {
- Throwable cause = e.getCause();
- if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
- repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
- } else if (e instanceof UpdateRefFailureException) {
+ if (e instanceof UpdateRefFailureException) {
updateRefRetryCount++;
repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
@@ -564,7 +562,36 @@
lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
}
- return tn.push(NullProgressMonitor.INSTANCE, todo);
+ return pushInBatches(tn, todo);
+ }
+
+ private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo)
+ throws NotSupportedException, TransportException {
+ int batchSize = pool.getPushBatchSize();
+ if (batchSize == 0 || todo.size() <= batchSize) {
+ return tn.push(NullProgressMonitor.INSTANCE, todo);
+ }
+
+ List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize);
+ repLog.atInfo().log("Push to %s in %d batches", uri, batches.size());
+ AggregatedPushResult result = new AggregatedPushResult();
+ int completedBatch = 1;
+ for (List<RemoteRefUpdate> batch : batches) {
+ repLog.atInfo().log(
+ "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri);
+ result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch));
+
+ // check if push should be no longer continued
+ if (wasCanceled()) {
+ repLog.atInfo().log(
+ "Push for replication to %s was canceled after %d completed batch and thus won't be"
+ + " rescheduled",
+ uri, completedBatch);
+ break;
+ }
+ completedBatch++;
+ }
+ return result;
}
private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) {
@@ -836,4 +863,24 @@
super(uri, message);
}
}
+
+ /**
+ * Internal class used to aggregate PushResult objects from all push batches. See {@link
+ * PushOne#pushInBatches} for usage.
+ */
+ private static class AggregatedPushResult extends PushResult {
+ private final List<PushResult> results = new ArrayList<>();
+
+ void addResult(PushResult result) {
+ results.add(result);
+ }
+
+ @Override
+ public Collection<RemoteRefUpdate> getRemoteUpdates() {
+ return results.stream()
+ .map(PushResult::getRemoteUpdates)
+ .flatMap(Collection::stream)
+ .collect(toList());
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 4f33937..bac599f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -30,7 +30,15 @@
public interface PushResultProcessing {
public static final PushResultProcessing NO_OP = new PushResultProcessing() {};
- /** Invoked when a ref has been replicated to one node. */
+ /**
+ * Invoked when a ref has been replicated to one node.
+ *
+ * @param project the project name
+ * @param ref the ref name
+ * @param uri the URI
+ * @param status the status of the push
+ * @param refStatus the status for the ref
+ */
default void onRefReplicatedToOneNode(
String project,
String ref,
@@ -38,10 +46,20 @@
RefPushResult status,
RemoteRefUpdate.Status refStatus) {}
- /** Invoked when a ref has been replicated to all nodes */
+ /**
+ * Invoked when a ref has been replicated to all nodes.
+ *
+ * @param project the project name
+ * @param ref the ref name
+ * @param nodesCount the number of nodes
+ */
default void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
- /** Invoked when all refs have been replicated to all nodes */
+ /**
+ * Invoked when all refs have been replicated to all nodes.
+ *
+ * @param totalPushTasksCount total number of push tasks
+ */
default void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
/**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index b66e73c..5fe0323 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -98,6 +98,13 @@
int getSlowLatencyThreshold();
/**
+ * Returns the maximum number of refs that can be pushed in a single push operation.
+ *
+ * @return batch size, zero if unlimited.
+ */
+ int getPushBatchSize();
+
+ /**
* Whether the remote configuration is for a single project only
*
* @return true, when configuration is for a single project, false otherwise
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 5458b6c..2424e71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -72,8 +72,8 @@
}
/**
- * See
- * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
+ * See {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
*/
@Override
public boolean isReplicateAllOnPluginStart() {
@@ -81,8 +81,8 @@
}
/**
- * See
- * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
+ * See {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
*/
@Override
public boolean isDefaultForceUpdate() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5310c14..791b387 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -201,7 +201,8 @@
private void synchronizePendingEvents(Prune prune) {
if (replaying.compareAndSet(false, true)) {
- final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+ final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate =
+ new ConcurrentHashMap<>();
if (Prune.TRUE.equals(prune)) {
for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index d216366..deffe5e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -17,7 +17,7 @@
share is mounted to which the repositories should be replicated.
It is possible to
-[configure](config.html#configuring-cluster-replication) the plugin so
+[configure](config.md#configuring-cluster-replication) the plugin so
that multiple primaries share the replication work approximately evenly.
Replication of account data (NoteDb)
@@ -28,7 +28,6 @@
* `refs/users/*` (user branches)
* `refs/meta/external-ids` (external IDs)
-* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves)
+* `refs/starred-changes/*` (star labels, not needed for Gerrit replicas)
* `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit
- slaves)
-
+ replicas)
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
index b6688e0..e815e08 100644
--- a/src/main/resources/Documentation/cmd-list.md
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -30,15 +30,15 @@
-------
`--remote <PATTERN>`
-: Only print information for destinations whose remote name matches
- the `PATTERN`.
+: Only print information for destinations whose remote name matches
+the `PATTERN`.
`--detail`
-: Print additional detailed information: AdminUrl, AuthGroup, Project
- and queue (pending and in-flight).
+: Print additional detailed information: AdminUrl, AuthGroup, Project
+and queue (pending and in-flight).
`--json`
-: Output in json format.
+: Output in json format.
EXAMPLES
--------
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index e12ec92..1a77c72 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -129,10 +129,10 @@
$ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/*
```
-Replicate projects whose path includes a folder named `vendor` to host slave1:
+Replicate projects whose path includes a folder named `vendor` to host replica1:
```
- $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*)
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url replica1 ^(|.*/)vendor(|/.*)
```
Replicate to only one specific destination URL:
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 7fe9e15..d401030 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -18,9 +18,9 @@
ssh-keygen -m PEM -t rsa -C "your_email@example.com"
```
-<a name="example_file">
+<a name="example_file"></a>
Next, create `$site_path/etc/replication.config` as a Git-style config
-file, for example to replicate in parallel to four different hosts:</a>
+file, for example to replicate in parallel to four different hosts:
```
[remote "host-one"]
@@ -148,6 +148,20 @@
`(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
+gerrit.pushBatchSize
+: Max number of refs that are pushed in a single push operation. If more
+ than pushBatchSize are to be pushed then they are divided into batches
+ and pushed sequentially one-by-one.
+
+ Can be overridden at remote-level by setting pushBatchSize.
+
+ By default, `0`, which means that there are no limitations on number of
+ refs to be transferred in a single push operation. Note that negative
+ values are treated as `0`.
+
+ Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+ - when `replication.distributionInterval` has value > 0.
+
gerrit.sshCommandTimeout
: Timeout for SSH command execution. If 0, there is no timeout and
the client waits indefinitely. By default, 0.
@@ -496,7 +510,8 @@
remote.NAME.remoteNameStyle
: Provides possibilities to influence the name of the target
repository, e.g. by replacing slashes in the `${name}`
- placeholder.
+ placeholder, when the target remote repository is not served
+ by Gerrit.
Github and Gitorious do not permit slashes "/" in repository
names and will change them to dashes "-" at repository creation
@@ -511,6 +526,14 @@
Gerrit server, e.g. `${name}` of `foo/bar/my-repo.git` would
be `my-repo`.
+ > **NOTE**: The use of repository name translation using `remoteNameStyle`
+ > may lead to dangerous situations if there are multiple repositories
+ > that may be mapped to the same target name. For instance when
+ > mapping `/foo/my-repo.git` to `my-repo` using "basenameOnly"
+ > would also map `/bar/my-repo.git` to the same `my-repo` leading
+ > to conflicts where commits can be lost between the two repositories
+ > replicating to the same target `my-repo`.
+
By default, "slash", i.e. remote names will contain slashes as
they do in Gerrit.
@@ -551,6 +574,19 @@
default: 15 minutes
+remote.NAME.pushBatchSize
+: Max number of refs that are pushed in a single push operation to this
+ destination. If more than `pushBatchSize` are to be pushed then they are
+ divided into batches and pushed sequentially one-by-one.
+
+ By default it falls back to `gerrit.pushBatchSize` value (which is `0` if
+ not set, which means that there are no limitations on number of refs to
+ be transferred in a single push operation). Note that negative values are
+ treated as `0`.
+
+ Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+ - when `replication.distributionInterval` has value > 0.
+
Directory `replication`
--------------------
The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
index 076aded..83e7e45 100644
--- a/src/main/resources/Documentation/extension-point.md
+++ b/src/main/resources/Documentation/extension-point.md
@@ -2,7 +2,7 @@
==============
The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
-Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](../../../Documentation/config-gerrit.html#gerrit.installModule) and
implemented by another plugin by declaring a `provided` dependency from the replication plugin.
### Install extension libModule
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index ef8b150..db11d23 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -1,4 +1,5 @@
-# Metrics
+Metrics
+=======
Some metrics are emitted when replication occurs to a remote destination.
The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
@@ -7,17 +8,21 @@
The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
could potentially be considerably big.
-### Project level
+Project level
+-------------
* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
-### Destination level
+Destination level
+-----------------
* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
-### Example
+Example
+-------
+
```
# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
# TYPE plugins_replication_replication_delay_destination summary
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
index e7339d9..725052c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -82,7 +82,7 @@
}
private Provider<ReplicationConfig> newVersionConfigProvider() {
- return new Provider<ReplicationConfig>() {
+ return new Provider<>() {
@Override
public ReplicationConfig get() {
return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
index 90191f2..242922d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -19,9 +19,9 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -367,7 +367,7 @@
WaitingRunner runner = new WaitingRunner();
int batchSize = 5; // how many tasks are started concurrently
- Queue<CountDownLatch> batches = new LinkedList<>();
+ Queue<CountDownLatch> batches = new ArrayDeque<>();
for (int b = 0; b < blockSize; b++) {
batches.add(executeWaitingRunnableBatch(batchSize, executor));
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
new file mode 100644
index 0000000..646f915
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DestinationConfigurationTest {
+ private static final String REMOTE = "foo";
+
+ @Mock private RemoteConfig remoteConfigMock;
+ @Mock private Config cfgMock;
+
+ private DestinationConfiguration objectUnderTest;
+
+ @Before
+ public void setUp() {
+ when(remoteConfigMock.getName()).thenReturn(REMOTE);
+ when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(new String[] {});
+ objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock);
+ }
+
+ @Test
+ public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() {
+ // given
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+ when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(0);
+ }
+
+ @Test
+ public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() {
+ // given
+ int globalPushBatchSize = 1;
+ when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+ .thenReturn(globalPushBatchSize);
+ when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(0);
+ }
+
+ @Test
+ public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() {
+ // given
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() {
+ // given
+ int globalPushBatchSize = 1;
+ when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+ .thenReturn(globalPushBatchSize);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(globalPushBatchSize);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
index a1f61fe..5e6bde8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -65,6 +65,7 @@
return implementedByOverrider;
}
} catch (NoSuchMethodException | SecurityException e) {
+ return null;
}
return null;
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 94f0dc4..cfe9002 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,8 +17,10 @@
import static org.eclipse.jgit.lib.Ref.Storage.NEW;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -36,6 +38,7 @@
import com.google.gerrit.server.util.IdGenerator;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -115,7 +118,8 @@
new ObjectIdRef.Unpeeled(
NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
- localRefs = Arrays.asList(newLocalRef);
+ localRefs = new ArrayList<>();
+ localRefs.add(newLocalRef);
Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
remoteRefs = new HashMap<>();
@@ -223,6 +227,52 @@
verify(transportMock, never()).push(any(), any());
}
+ @Test
+ public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
+ throws InterruptedException, IOException {
+ replicateTwoRefs(createPushOne(null));
+ verify(transportMock).push(any(), any());
+ }
+
+ @Test
+ public void shouldPushInBatchesWhenPushBatchSizeIsConfigured()
+ throws InterruptedException, IOException {
+ when(destinationMock.getPushBatchSize()).thenReturn(1);
+ replicateTwoRefs(createPushOne(null));
+ verify(transportMock, times(2)).push(any(), any());
+ }
+
+ @Test
+ public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled()
+ throws InterruptedException, IOException {
+ when(destinationMock.getPushBatchSize()).thenReturn(1);
+ PushOne pushOne = createPushOne(null);
+
+ // cancel replication during the first push
+ doAnswer(
+ invocation -> {
+ pushOne.setCanceledWhileRunning();
+ return new PushResult();
+ })
+ .when(transportMock)
+ .push(any(), any());
+
+ replicateTwoRefs(pushOne);
+ verify(transportMock, times(1)).push(any(), any());
+ }
+
+ private void replicateTwoRefs(PushOne pushOne) throws InterruptedException {
+ ObjectIdRef barLocalRef =
+ new ObjectIdRef.Unpeeled(
+ NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
+ localRefs.add(barLocalRef);
+
+ pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.run();
+
+ isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+ }
+
private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
PushOne push =
new PushOne(
@@ -272,7 +322,7 @@
@Override
public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
- return new Callable<Object>() {
+ return new Callable<>() {
@Override
public Object call() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 9606371..b6a3ed1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -91,6 +91,18 @@
}
protected void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize)
+ throws IOException {
+ setReplicationDestination(
+ remoteName,
+ Arrays.asList(replicaSuffix),
+ project,
+ TEST_REPLICATION_DELAY_SECONDS,
+ false,
+ Optional.ofNullable(pushBatchSize));
+ }
+
+ protected void setReplicationDestination(
String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
throws IOException {
setReplicationDestination(
@@ -125,13 +137,37 @@
protected void setReplicationDestination(
String remoteName,
+ String replicaSuffix,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror,
+ Optional<Integer> pushBatchSize)
+ throws IOException {
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
List<String> replicaSuffixes,
Optional<String> project,
int replicationDelay,
boolean mirror)
throws IOException {
setReplicationDestination(
- config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+ remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty());
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror,
+ Optional<Integer> pushBatchSize)
+ throws IOException {
+ setReplicationDestination(
+ config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize);
config.setBoolean("gerrit", null, "autoReload", true);
config.save();
}
@@ -142,7 +178,8 @@
Optional<String> project,
int replicationDelay)
throws IOException {
- setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+ setReplicationDestination(
+ config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty());
}
protected void setReplicationDestination(
@@ -151,7 +188,8 @@
List<String> replicaSuffixes,
Optional<String> project,
int replicationDelay,
- boolean mirror)
+ boolean mirror,
+ Optional<Integer> pushBatchSize)
throws IOException {
List<String> replicaUrls =
@@ -163,6 +201,7 @@
remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+ pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs));
remoteConfig.save();
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
index 4cd55f9..b32829c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -384,6 +384,7 @@
}
}
+ @SuppressWarnings("deprecation")
private boolean equals(ReplicationScheduledEvent scheduledEvent, Object other) {
if (!(other instanceof ReplicationScheduledEvent)) {
return false;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a174e91..eb2b999 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -217,6 +226,75 @@
}
@Test
+ public void pushAllWait() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ future.get();
+ state.waitForReplication();
+ }
+
+ @Test
+ public void pushAllWaitCancelNotRunningTask() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Executor service = Executors.newSingleThreadExecutor();
+ service.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ state.waitForReplication();
+ latch.countDown();
+ } catch (Exception e) {
+ // fails the test because we don't countDown
+ }
+ }
+ });
+
+ // Cancel the replication task
+ waitUntil(() -> getProjectTasks().size() != 0);
+ WorkQueue.Task<?> task = getProjectTasks().get(0);
+ assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+ task.cancel(false);
+
+ // Confirm our waiting thread completed
+ boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+ assertThat(receivedSignal).isTrue();
+ }
+
+ private List<WorkQueue.Task<?>> getProjectTasks() {
+ return getInstance(WorkQueue.class).getTasks().stream()
+ .filter(t -> t instanceof WorkQueue.ProjectTask)
+ .collect(Collectors.toList());
+ }
+
+ @Test
public void shouldReplicateHeadUpdate() throws Exception {
setReplicationDestination("foo", "replica", ALL_PROJECTS);
reloadConfig();
@@ -348,6 +426,28 @@
}
}
+ @Test
+ public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, 1);
+ reloadConfig();
+
+ // creating a change results in 2 refs creation therefore it already qualifies for push in two
+ // batches of size 1 each
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().refName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
new file mode 100644
index 0000000..cf8dbe3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.Project;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationPushInBatchesIT extends ReplicationDaemon {
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ initConfig();
+ config.setInt("gerrit", null, "pushBatchSize", 1);
+ config.save();
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ super.setUpTestPlugin();
+ }
+
+ @Test
+ public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ // creating a change results in 2 refs creation therefore it already qualifies for push in two
+ // batches of size 1 each
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().refName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
index 901200b..5881ea7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -22,14 +22,14 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefEvent;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class TestDispatcher implements EventDispatcher {
- private final List<ProjectEvent> projectEvents = new LinkedList<>();
- private final List<RefEvent> refEvents = new LinkedList<>();
- private final List<Event> events = new LinkedList<>();
+ private final List<ProjectEvent> projectEvents = new ArrayList<>();
+ private final List<RefEvent> refEvents = new ArrayList<>();
+ private final List<Event> events = new ArrayList<>();
@Override
public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index f61114e..080f279 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.entities.Project;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
@@ -34,7 +35,7 @@
public static TestUriUpdates create(
Project.NameKey project, URIish uri, String remote, Set<String> refs) {
- return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+ return new AutoValue_TestUriUpdates(project, uri, remote, ImmutableSet.copyOf(refs));
}
@Override
@@ -47,5 +48,5 @@
public abstract String getRemoteName();
@Override
- public abstract Set<String> getRefs();
+ public abstract ImmutableSet<String> getRefs();
}