Merge branch 'stable-3.4'
* stable-3.4:
Ensure states are updated for canceled replication tasks
Change-Id: Ifc514a151819f5d809a1834d382857baf334a8a2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
index 89b97e9..c61a123 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -85,7 +85,7 @@
super(
threadPool,
stream.iterator(),
- new ForwardingRunner<T>(runner) {
+ new ForwardingRunner<>(runner) {
@Override
public void onDone() {
stream.close();
@@ -109,7 +109,7 @@
try {
runner.run(item);
} catch (RuntimeException e) { // catch to prevent chain from breaking
- logger.atSevere().withCause(e).log("Error while running: " + item);
+ logger.atSevere().withCause(e).log("Error while running: %s", item);
}
if (!scheduledNext) {
runner.onDone();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index baf0328..a6a162b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.gerrit.entities.AccountGroup;
@@ -219,7 +218,7 @@
private void addRecursiveParents(
AccountGroup.UUID g,
- Builder<AccountGroup.UUID> builder,
+ ImmutableSet.Builder<AccountGroup.UUID> builder,
GroupIncludeCache groupIncludeCache) {
for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) {
if (builder.build().contains(p)) {
@@ -276,6 +275,8 @@
}
private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+ // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
+ // values to loop over them.
for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
pushOneFunction.apply(pushOne);
}
@@ -636,7 +637,7 @@
return true;
}
- boolean matches = (new ReplicationFilter(projects)).matches(project);
+ boolean matches = new ReplicationFilter(projects).matches(project);
if (!matches) {
repLog.atFine().log(
"Skipping replication of project %s; does not match filter", project.get());
@@ -776,6 +777,10 @@
return config.getSlowLatencyThreshold();
}
+ int getPushBatchSize() {
+ return config.getPushBatchSize();
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 1b39374..a74d198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -14,10 +14,14 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.google.common.base.Suppliers.memoize;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.gerrit.server.config.ConfigUtil;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.transport.RemoteConfig;
@@ -45,6 +49,7 @@
private final RemoteConfig remoteConfig;
private final int maxRetries;
private final int slowLatencyThreshold;
+ private final Supplier<Integer> pushBatchSize;
protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
this.remoteConfig = remoteConfig;
@@ -84,6 +89,31 @@
"slowLatencyThreshold",
DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
TimeUnit.SECONDS);
+
+ pushBatchSize =
+ memoize(
+ () -> {
+ int configuredBatchSize =
+ Math.max(
+ 0,
+ getInt(
+ remoteConfig,
+ cfg,
+ "pushBatchSize",
+ cfg.getInt("gerrit", "pushBatchSize", 0)));
+ if (configuredBatchSize > 0) {
+ int distributionInterval = cfg.getInt("replication", "distributionInterval", 0);
+ if (distributionInterval > 0) {
+ repLog.atWarning().log(
+ "Push in batches cannot be turned on for remote (%s) when 'Cluster"
+ + " Replication' (replication.distributionInterval) is configured",
+ name);
+ return 0;
+ }
+ return configuredBatchSize;
+ }
+ return 0;
+ });
}
@Override
@@ -173,4 +203,9 @@
public int getSlowLatencyThreshold() {
return slowLatencyThreshold;
}
+
+ @Override
+ public int getPushBatchSize() {
+ return pushBatchSize.get();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index 747c0f6..4957a64 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -35,7 +35,6 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.replication.Destination.Factory;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -50,7 +49,7 @@
public class DestinationsCollection implements ReplicationDestinations {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final Factory destinationFactory;
+ private final Destination.Factory destinationFactory;
private final Provider<ReplicationQueue> replicationQueue;
private volatile List<Destination> destinations;
private boolean shuttingDown;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
index a347f3a..09a632d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Nfs.java
@@ -22,7 +22,6 @@
/**
* Determine if a throwable or a cause in its causal chain is a Stale NFS File Handle
*
- * @param throwable
* @return a boolean true if the throwable or a cause in its causal chain is a Stale NFS File
* Handle
*/
@@ -39,7 +38,6 @@
/**
* Determine if an IOException is a Stale NFS File Handle
*
- * @param ioe
* @return a boolean true if the IOException is a Stale NFS FIle Handle
*/
public static boolean isStaleFileHandle(IOException ioe) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 87c35ee..8afcf9b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -19,12 +19,14 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
@@ -49,7 +51,6 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.jcraft.jsch.JSchException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -453,10 +454,7 @@
} catch (NotSupportedException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (TransportException e) {
- Throwable cause = e.getCause();
- if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
- repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
- } else if (e instanceof UpdateRefFailureException) {
+ if (e instanceof UpdateRefFailureException) {
updateRefRetryCount++;
repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
@@ -564,7 +562,36 @@
lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))));
}
- return tn.push(NullProgressMonitor.INSTANCE, todo);
+ return pushInBatches(tn, todo);
+ }
+
+ private PushResult pushInBatches(Transport tn, List<RemoteRefUpdate> todo)
+ throws NotSupportedException, TransportException {
+ int batchSize = pool.getPushBatchSize();
+ if (batchSize == 0 || todo.size() <= batchSize) {
+ return tn.push(NullProgressMonitor.INSTANCE, todo);
+ }
+
+ List<List<RemoteRefUpdate>> batches = Lists.partition(todo, batchSize);
+ repLog.atInfo().log("Push to %s in %d batches", uri, batches.size());
+ AggregatedPushResult result = new AggregatedPushResult();
+ int completedBatch = 1;
+ for (List<RemoteRefUpdate> batch : batches) {
+ repLog.atInfo().log(
+ "Pushing %d/%d batches for replication to %s", completedBatch, batches.size(), uri);
+ result.addResult(tn.push(NullProgressMonitor.INSTANCE, batch));
+
+ // check if push should be no longer continued
+ if (wasCanceled()) {
+ repLog.atInfo().log(
+ "Push for replication to %s was canceled after %d completed batch and thus won't be"
+ + " rescheduled",
+ uri, completedBatch);
+ break;
+ }
+ completedBatch++;
+ }
+ return result;
}
private static String refUpdatesForLogging(List<RemoteRefUpdate> refUpdates) {
@@ -836,4 +863,24 @@
super(uri, message);
}
}
+
+ /**
+ * Internal class used to aggregate PushResult objects from all push batches. See {@link
+ * PushOne#pushInBatches} for usage.
+ */
+ private static class AggregatedPushResult extends PushResult {
+ private final List<PushResult> results = new ArrayList<>();
+
+ void addResult(PushResult result) {
+ results.add(result);
+ }
+
+ @Override
+ public Collection<RemoteRefUpdate> getRemoteUpdates() {
+ return results.stream()
+ .map(PushResult::getRemoteUpdates)
+ .flatMap(Collection::stream)
+ .collect(toList());
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 5450dd5..bac599f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -33,11 +33,11 @@
/**
* Invoked when a ref has been replicated to one node.
*
- * @param project
- * @param ref
- * @param uri
- * @param status
- * @param refStatus
+ * @param project the project name
+ * @param ref the ref name
+ * @param uri the URI
+ * @param status the status of the push
+ * @param refStatus the status for the ref
*/
default void onRefReplicatedToOneNode(
String project,
@@ -49,16 +49,16 @@
/**
* Invoked when a ref has been replicated to all nodes.
*
- * @param project
- * @param ref
- * @param nodesCount
+ * @param project the project name
+ * @param ref the ref name
+ * @param nodesCount the number of nodes
*/
default void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
/**
* Invoked when all refs have been replicated to all nodes.
*
- * @param totalPushTasksCount
+ * @param totalPushTasksCount total number of push tasks
*/
default void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
index b66e73c..5fe0323 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -98,6 +98,13 @@
int getSlowLatencyThreshold();
/**
+ * Returns the maximum number of refs that can be pushed in a single push operation.
+ *
+ * @return batch size, zero if unlimited.
+ */
+ int getPushBatchSize();
+
+ /**
* Whether the remote configuration is for a single project only
*
* @return true, when configuration is for a single project, false otherwise
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index 18ccc66..2fa6c34 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -54,7 +54,7 @@
*/
List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
- /** @return true if there are no destinations, false otherwise. */
+ /** Returns true if there are no destinations, false otherwise. */
boolean isEmpty();
/**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index f5c6185..2424e71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -71,16 +71,18 @@
return null;
}
- /* (non-Javadoc)
- * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()
+ /**
+ * See {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
*/
@Override
public boolean isReplicateAllOnPluginStart() {
return replicateAllOnPluginStart;
}
- /* (non-Javadoc)
- * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()
+ /**
+ * See {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
*/
@Override
public boolean isDefaultForceUpdate() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4abb295..791b387 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -201,7 +201,8 @@
private void synchronizePendingEvents(Prune prune) {
if (replaying.compareAndSet(false, true)) {
- final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+ final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate =
+ new ConcurrentHashMap<>();
if (Prune.TRUE.equals(prune)) {
for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
@@ -229,7 +230,7 @@
@Override
public void onDone() {
if (Prune.TRUE.equals(prune)) {
- pruneNoLongerPending(taskNamesByReplicateRefUpdate.values());
+ pruneNoLongerPending(new HashSet<>(taskNamesByReplicateRefUpdate.values()));
}
replaying.set(false);
}
@@ -242,7 +243,7 @@
}
}
- private void pruneNoLongerPending(Collection<String> prunableTaskNames) {
+ private void pruneNoLongerPending(Set<String> prunableTaskNames) {
// Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
// We also cannot access them by taskId since PushOnes don't have a taskId, they do have
// an Id, but it is not the id assigned to the task in the queues. The tasks in the queue
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 871ed52..fa65803 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -140,12 +140,12 @@
}
private RefReplicationStatus getRefStatus(String project, String ref) {
- if (!statusByProjectRef.contains(project, ref)) {
- RefReplicationStatus refStatus = new RefReplicationStatus(project, ref);
+ RefReplicationStatus refStatus = statusByProjectRef.get(project, ref);
+ if (refStatus == null) {
+ refStatus = new RefReplicationStatus(project, ref);
statusByProjectRef.put(project, ref, refStatus);
- return refStatus;
}
- return statusByProjectRef.get(project, ref);
+ return refStatus;
}
public void waitForReplication() throws InterruptedException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index fa8b44c..f63df98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -60,7 +60,6 @@
}
ReplicationState state = new ReplicationState(new CommandProcessing(this));
- Future<?> future = null;
ReplicationFilter projectFilter;
@@ -70,7 +69,8 @@
projectFilter = new ReplicationFilter(projectPatterns);
}
- future = pushFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
+ Future<?> future =
+ pushFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
if (wait) {
if (future != null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
index b0c554e..c8347d1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
@@ -16,22 +16,16 @@
import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.events.RefEvent;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
import java.util.Objects;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
import org.eclipse.jgit.transport.URIish;
-public class RefReplicatedEvent extends RefEvent {
+public class RefReplicatedEvent extends RemoteRefReplicationEvent {
public static final String TYPE = "ref-replicated";
- public final String project;
- public final String ref;
@Deprecated public final String targetNode;
- public final String targetUri;
- public final String status;
public final Status refStatus;
public RefReplicatedEvent(
@@ -40,23 +34,9 @@
URIish targetUri,
RefPushResult status,
RemoteRefUpdate.Status refStatus) {
- super(TYPE);
- this.project = project;
- this.ref = ref;
+ super(TYPE, project, ref, targetUri, status.toString());
this.targetNode = resolveNodeName(targetUri);
- this.status = status.toString();
this.refStatus = refStatus;
- this.targetUri = targetUri.toASCIIString();
- }
-
- @Override
- public Project.NameKey getProjectNameKey() {
- return Project.nameKey(project);
- }
-
- @Override
- public String getRefName() {
- return ref;
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java
new file mode 100644
index 0000000..3950c3f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RemoteRefReplicationEvent.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+import org.eclipse.jgit.transport.URIish;
+
+public class RemoteRefReplicationEvent extends RefEvent {
+
+ public final String project;
+ public final String ref;
+ public final String status;
+ public final String targetUri;
+
+ public RemoteRefReplicationEvent(
+ String type, String project, String ref, URIish targetUri, @Nullable String status) {
+ super(type);
+ this.project = project;
+ this.ref = ref;
+ this.status = status;
+ this.targetUri = targetUri.toASCIIString();
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+
+ @Override
+ public String getRefName() {
+ return ref;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
index 4a1ade8..a952777 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
@@ -17,23 +17,16 @@
import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.server.events.RefEvent;
import org.eclipse.jgit.transport.URIish;
-public class ReplicationScheduledEvent extends RefEvent {
+public class ReplicationScheduledEvent extends RemoteRefReplicationEvent {
public static final String TYPE = "ref-replication-scheduled";
- public final String project;
- public final String ref;
@Deprecated public final String targetNode;
- public final String targetUri;
public ReplicationScheduledEvent(String project, String ref, URIish targetUri) {
- super(TYPE);
- this.project = project;
- this.ref = ref;
+ super(TYPE, project, ref, targetUri, null);
this.targetNode = resolveNodeName(targetUri);
- this.targetUri = targetUri.toASCIIString();
}
@Override
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index ded9d84..deffe5e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,11 +16,9 @@
local path as replication target. This makes e.g. sense if a network
share is mounted to which the repositories should be replicated.
-In multi-primary scenario, any replication work which is already
-in-flight or completed by the other nodes is not performed to
-avoid extra work. This is because, the storage for replication
-events is shared between multiple primaries.(The storage location
-is specified in the config using: `replication.eventsDirectory`).
+It is possible to
+[configure](config.md#configuring-cluster-replication) the plugin so
+that multiple primaries share the replication work approximately evenly.
Replication of account data (NoteDb)
------------------------------------
@@ -30,7 +28,6 @@
* `refs/users/*` (user branches)
* `refs/meta/external-ids` (external IDs)
-* `refs/starred-changes/*` (star labels, not needed for Gerrit slaves)
+* `refs/starred-changes/*` (star labels, not needed for Gerrit replicas)
* `refs/sequences/accounts` (account sequence numbers, not needed for Gerrit
- slaves)
-
+ replicas)
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
index b6688e0..e815e08 100644
--- a/src/main/resources/Documentation/cmd-list.md
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -30,15 +30,15 @@
-------
`--remote <PATTERN>`
-: Only print information for destinations whose remote name matches
- the `PATTERN`.
+: Only print information for destinations whose remote name matches
+the `PATTERN`.
`--detail`
-: Print additional detailed information: AdminUrl, AuthGroup, Project
- and queue (pending and in-flight).
+: Print additional detailed information: AdminUrl, AuthGroup, Project
+and queue (pending and in-flight).
`--json`
-: Output in json format.
+: Output in json format.
EXAMPLES
--------
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
index e12ec92..1a77c72 100644
--- a/src/main/resources/Documentation/cmd-start.md
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -129,10 +129,10 @@
$ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/*
```
-Replicate projects whose path includes a folder named `vendor` to host slave1:
+Replicate projects whose path includes a folder named `vendor` to host replica1:
```
- $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*)
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url replica1 ^(|.*/)vendor(|/.*)
```
Replicate to only one specific destination URL:
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index af91032..d401030 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -18,9 +18,9 @@
ssh-keygen -m PEM -t rsa -C "your_email@example.com"
```
-<a name="example_file">
+<a name="example_file"></a>
Next, create `$site_path/etc/replication.config` as a Git-style config
-file, for example to replicate in parallel to four different hosts:</a>
+file, for example to replicate in parallel to four different hosts:
```
[remote "host-one"]
@@ -46,6 +46,59 @@
To manually trigger replication at runtime, see
SSH command [start](cmd-start.md).
+<a name="configuring-cluster-replication"></a>
+Configuring Cluster Replication
+-------------------------------
+
+The replication plugin is designed to allow multiple primaries in a
+cluster to efficiently cooperate together via the replication event
+persistence subsystem. To enable this cooperation, the directory
+pointed to by the replication.eventsDirectory config key must reside on
+a shared filesystem, such as NFS. By default, simply pointing multiple
+primaries to the same eventsDirectory will enable some cooperation by
+preventing the same replication push from being duplicated by more
+than one primary.
+
+To further improve cooperation across the cluster, the
+replication.distributionInterval config value can be set. With
+distribution enabled, the replication queues for all the nodes sharing
+the same eventsDirectory will reflect approximately the same outstanding
+replication work (i.e. tasks waiting in the queue). Replication pushes
+which are running will continue to only be visible in the queue of the
+node on which the push is actually happening. This feature helps
+administrators get a cluster wide view of outstanding replication
+tasks, while allowing replication tasks triggered by one primary to be
+fulfilled by another node which is less busy.
+
+This enhanced replication work distribution allows the amount of
+replication work a cluster can handle to scale more evenly and linearly
+with the amount of primaries in the cluster. Adding more nodes to a
+cluster without distribution enabled will generally not allow the thread
+count per remote to be reduced without impacting service levels to those
+remotes. This is because without distribution, all events triggered by a
+node will only be fulfilled by the node which triggered the event, even
+if all the other nodes in the cluster are idle. This behavior implies
+that each node should be configured in a way that allows it alone to
+provide the level of service which each remote requires. However, with
+distribution enabled, it becomes possible to reduce the amount of
+replication threads configured per remote proportionally to the amount
+of nodes in the cluster, while maintaining the same approximate service
+level as before adding new nodes.
+
+Threads per remote reduction without service impacts is possible with
+distribution, because when configuring a node it can be expected that
+other nodes will pick up some of the work it triggers. Then the node no
+longer needs to be configured as if it were the only node in the
+cluster. For example, if a remote requires 6 threads with one node to
+achieve acceptable service, it should only take 2 threads on 3
+equivalently powered nodes to provide the same service level with
+distribution enabled. Scaling down such thread requirements per remote
+results in a reduced memory footprint per remote on each node in the
+cluster. This enables the nodes in the cluster to now scale to handle
+more remotes with the approximate same service level than without
+distribution. The amount of extra supported remotes then also scales
+approximately linearly with the extra nodes in a cluster.
+
File `replication.config`
-------------------------
@@ -95,6 +148,20 @@
`(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
+gerrit.pushBatchSize
+: Max number of refs that are pushed in a single push operation. If more
+ than pushBatchSize are to be pushed then they are divided into batches
+ and pushed sequentially one-by-one.
+
+ Can be overridden at remote-level by setting pushBatchSize.
+
+ By default, `0`, which means that there are no limitations on number of
+ refs to be transferred in a single push operation. Note that negative
+ values are treated as `0`.
+
+ Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+ - when `replication.distributionInterval` has value > 0.
+
gerrit.sshCommandTimeout
: Timeout for SSH command execution. If 0, there is no timeout and
the client waits indefinitely. By default, 0.
@@ -507,6 +574,19 @@
default: 15 minutes
+remote.NAME.pushBatchSize
+: Max number of refs that are pushed in a single push operation to this
+ destination. If more than `pushBatchSize` are to be pushed then they are
+ divided into batches and pushed sequentially one-by-one.
+
+ By default it falls back to `gerrit.pushBatchSize` value (which is `0` if
+ not set, which means that there are no limitations on number of refs to
+ be transferred in a single push operation). Note that negative values are
+ treated as `0`.
+
+ Note that `pushBatchSize` is ignored when *Cluster Replication* is configured
+ - when `replication.distributionInterval` has value > 0.
+
Directory `replication`
--------------------
The optional directory `$site_path/etc/replication` contains Git-style
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
index 076aded..83e7e45 100644
--- a/src/main/resources/Documentation/extension-point.md
+++ b/src/main/resources/Documentation/extension-point.md
@@ -2,7 +2,7 @@
==============
The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
-Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](../../../Documentation/config-gerrit.html#gerrit.installModule) and
implemented by another plugin by declaring a `provided` dependency from the replication plugin.
### Install extension libModule
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index ef8b150..db11d23 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -1,4 +1,5 @@
-# Metrics
+Metrics
+=======
Some metrics are emitted when replication occurs to a remote destination.
The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
@@ -7,17 +8,21 @@
The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
could potentially be considerably big.
-### Project level
+Project level
+-------------
* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
-### Destination level
+Destination level
+-----------------
* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
-### Example
+Example
+-------
+
```
# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
# TYPE plugins_replication_replication_delay_destination summary
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
index e7339d9..725052c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -82,7 +82,7 @@
}
private Provider<ReplicationConfig> newVersionConfigProvider() {
- return new Provider<ReplicationConfig>() {
+ return new Provider<>() {
@Override
public ReplicationConfig get() {
return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
index 90191f2..242922d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -19,9 +19,9 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -367,7 +367,7 @@
WaitingRunner runner = new WaitingRunner();
int batchSize = 5; // how many tasks are started concurrently
- Queue<CountDownLatch> batches = new LinkedList<>();
+ Queue<CountDownLatch> batches = new ArrayDeque<>();
for (int b = 0; b < blockSize; b++) {
batches.add(executeWaitingRunnableBatch(batchSize, executor));
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
new file mode 100644
index 0000000..646f915
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/DestinationConfigurationTest.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DestinationConfigurationTest {
+ private static final String REMOTE = "foo";
+
+ @Mock private RemoteConfig remoteConfigMock;
+ @Mock private Config cfgMock;
+
+ private DestinationConfiguration objectUnderTest;
+
+ @Before
+ public void setUp() {
+ when(remoteConfigMock.getName()).thenReturn(REMOTE);
+ when(cfgMock.getStringList(Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(new String[] {});
+ objectUnderTest = new DestinationConfiguration(remoteConfigMock, cfgMock);
+ }
+
+ @Test
+ public void shouldIgnoreRemotePushBatchSizeWhenClusterReplicationIsConfigured() {
+ // given
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+ when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(0);
+ }
+
+ @Test
+ public void shouldIgnoreGlobalPushBatchSizeWhenClusterReplicationIsConfigured() {
+ // given
+ int globalPushBatchSize = 1;
+ when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+ .thenReturn(globalPushBatchSize);
+ when(cfgMock.getInt("replication", "distributionInterval", 0)).thenReturn(1);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(0);
+ }
+
+ @Test
+ public void shouldReturnRemotePushBatchSizeWhenClusterReplicationIsNotConfigured() {
+ // given
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", 0)).thenReturn(1);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldReturnGlobalPushBatchSizeWhenClusterReplicationIsNotConfigured() {
+ // given
+ int globalPushBatchSize = 1;
+ when(cfgMock.getInt("gerrit", "pushBatchSize", 0)).thenReturn(globalPushBatchSize);
+ when(cfgMock.getInt("remote", REMOTE, "pushBatchSize", globalPushBatchSize))
+ .thenReturn(globalPushBatchSize);
+
+ // when
+ int actual = objectUnderTest.getPushBatchSize();
+
+ // then
+ assertThat(actual).isEqualTo(globalPushBatchSize);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
index a1f61fe..5e6bde8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -65,6 +65,7 @@
return implementedByOverrider;
}
} catch (NoSuchMethodException | SecurityException e) {
+ return null;
}
return null;
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 94f0dc4..cfe9002 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,8 +17,10 @@
import static org.eclipse.jgit.lib.Ref.Storage.NEW;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -36,6 +38,7 @@
import com.google.gerrit.server.util.IdGenerator;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -115,7 +118,8 @@
new ObjectIdRef.Unpeeled(
NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
- localRefs = Arrays.asList(newLocalRef);
+ localRefs = new ArrayList<>();
+ localRefs.add(newLocalRef);
Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
remoteRefs = new HashMap<>();
@@ -223,6 +227,52 @@
verify(transportMock, never()).push(any(), any());
}
+ @Test
+ public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
+ throws InterruptedException, IOException {
+ replicateTwoRefs(createPushOne(null));
+ verify(transportMock).push(any(), any());
+ }
+
+ @Test
+ public void shouldPushInBatchesWhenPushBatchSizeIsConfigured()
+ throws InterruptedException, IOException {
+ when(destinationMock.getPushBatchSize()).thenReturn(1);
+ replicateTwoRefs(createPushOne(null));
+ verify(transportMock, times(2)).push(any(), any());
+ }
+
+ @Test
+ public void shouldStopPushingInBatchesWhenPushOperationGetsCanceled()
+ throws InterruptedException, IOException {
+ when(destinationMock.getPushBatchSize()).thenReturn(1);
+ PushOne pushOne = createPushOne(null);
+
+ // cancel replication during the first push
+ doAnswer(
+ invocation -> {
+ pushOne.setCanceledWhileRunning();
+ return new PushResult();
+ })
+ .when(transportMock)
+ .push(any(), any());
+
+ replicateTwoRefs(pushOne);
+ verify(transportMock, times(1)).push(any(), any());
+ }
+
+ private void replicateTwoRefs(PushOne pushOne) throws InterruptedException {
+ ObjectIdRef barLocalRef =
+ new ObjectIdRef.Unpeeled(
+ NEW, "bar", ObjectId.fromString("0000000000000000000000000000000000000001"));
+ localRefs.add(barLocalRef);
+
+ pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.run();
+
+ isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+ }
+
private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
PushOne push =
new PushOne(
@@ -272,7 +322,7 @@
@Override
public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
- return new Callable<Object>() {
+ return new Callable<>() {
@Override
public Object call() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index 9606371..b6a3ed1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -91,6 +91,18 @@
}
protected void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project, Integer pushBatchSize)
+ throws IOException {
+ setReplicationDestination(
+ remoteName,
+ Arrays.asList(replicaSuffix),
+ project,
+ TEST_REPLICATION_DELAY_SECONDS,
+ false,
+ Optional.ofNullable(pushBatchSize));
+ }
+
+ protected void setReplicationDestination(
String remoteName, String replicaSuffix, Optional<String> project, boolean mirror)
throws IOException {
setReplicationDestination(
@@ -125,13 +137,37 @@
protected void setReplicationDestination(
String remoteName,
+ String replicaSuffix,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror,
+ Optional<Integer> pushBatchSize)
+ throws IOException {
+ setReplicationDestination(
+ remoteName, Arrays.asList(replicaSuffix), project, replicationDelay, mirror, pushBatchSize);
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
List<String> replicaSuffixes,
Optional<String> project,
int replicationDelay,
boolean mirror)
throws IOException {
setReplicationDestination(
- config, remoteName, replicaSuffixes, project, replicationDelay, mirror);
+ remoteName, replicaSuffixes, project, replicationDelay, mirror, Optional.empty());
+ }
+
+ protected void setReplicationDestination(
+ String remoteName,
+ List<String> replicaSuffixes,
+ Optional<String> project,
+ int replicationDelay,
+ boolean mirror,
+ Optional<Integer> pushBatchSize)
+ throws IOException {
+ setReplicationDestination(
+ config, remoteName, replicaSuffixes, project, replicationDelay, mirror, pushBatchSize);
config.setBoolean("gerrit", null, "autoReload", true);
config.save();
}
@@ -142,7 +178,8 @@
Optional<String> project,
int replicationDelay)
throws IOException {
- setReplicationDestination(config, null, replicaSuffixes, project, replicationDelay, false);
+ setReplicationDestination(
+ config, null, replicaSuffixes, project, replicationDelay, false, Optional.empty());
}
protected void setReplicationDestination(
@@ -151,7 +188,8 @@
List<String> replicaSuffixes,
Optional<String> project,
int replicationDelay,
- boolean mirror)
+ boolean mirror,
+ Optional<Integer> pushBatchSize)
throws IOException {
List<String> replicaUrls =
@@ -163,6 +201,7 @@
remoteConfig.setInt("remote", remoteName, "replicationRetry", TEST_REPLICATION_RETRY_MINUTES);
remoteConfig.setBoolean("remote", remoteName, "mirror", mirror);
project.ifPresent(prj -> remoteConfig.setString("remote", remoteName, "projects", prj));
+ pushBatchSize.ifPresent(pbs -> remoteConfig.setInt("remote", remoteName, "pushBatchSize", pbs));
remoteConfig.save();
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
index 5ade68d..3d0dfc1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -19,6 +19,7 @@
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.BranchNameKey;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.git.WorkQueue;
import java.time.Duration;
@@ -89,8 +90,9 @@
reloadConfig();
String newBranch = "refs/heads/foo_branch";
- createBranch(project, "refs/heads/master", newBranch);
+ createBranch(BranchNameKey.create(project, newBranch));
+ assertThat(listWaitingReplicationTasks(newBranch)).hasSize(1);
deleteWaitingReplicationTasks(newBranch); // This simulates the work being started by other node
assertThat(waitForProjectTaskCount(0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS)))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
index 2d69a47..b32829c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertTrue;
import com.google.common.base.Objects;
import com.google.gerrit.acceptance.PushOneCommit.Result;
@@ -42,11 +43,13 @@
import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.net.URISyntaxException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.URIish;
import org.junit.Before;
import org.junit.Test;
@@ -308,6 +311,34 @@
assertThat(origEvent).isEqualTo(gotEvent);
}
+ @Test
+ public void shouldSerializeRefReplicatedEvent() throws URISyntaxException {
+ RefReplicatedEvent origEvent =
+ new RefReplicatedEvent(
+ project.get(),
+ "refs/heads/master",
+ new URIish(String.format("git://someHost/%s.git", project.get())),
+ ReplicationState.RefPushResult.SUCCEEDED,
+ RemoteRefUpdate.Status.OK);
+
+ assertThat(origEvent)
+ .isEqualTo(eventGson.fromJson(eventGson.toJson(origEvent), RefReplicatedEvent.class));
+ }
+
+ @Test
+ public void shouldSerializeReplicationScheduledEvent() throws URISyntaxException {
+ ReplicationScheduledEvent origEvent =
+ new ReplicationScheduledEvent(
+ project.get(),
+ "refs/heads/master",
+ new URIish(String.format("git://someHost/%s.git", project.get())));
+
+ assertTrue(
+ equals(
+ origEvent,
+ eventGson.fromJson(eventGson.toJson(origEvent), ReplicationScheduledEvent.class)));
+ }
+
private <T extends RefEvent> void waitForRefEvent(Supplier<List<T>> events, String refName)
throws InterruptedException {
WaitUtil.waitUntil(
@@ -352,4 +383,25 @@
return Objects.hashCode(event);
}
}
+
+ @SuppressWarnings("deprecation")
+ private boolean equals(ReplicationScheduledEvent scheduledEvent, Object other) {
+ if (!(other instanceof ReplicationScheduledEvent)) {
+ return false;
+ }
+ ReplicationScheduledEvent event = (ReplicationScheduledEvent) other;
+ if (!Objects.equal(event.project, scheduledEvent.project)) {
+ return false;
+ }
+ if (!Objects.equal(event.ref, scheduledEvent.ref)) {
+ return false;
+ }
+ if (!Objects.equal(event.targetUri, scheduledEvent.targetUri)) {
+ return false;
+ }
+ if (!Objects.equal(event.status, scheduledEvent.status)) {
+ return false;
+ }
+ return Objects.equal(event.targetNode, scheduledEvent.targetNode);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 17c8933..eb2b999 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -426,6 +426,28 @@
}
}
+ @Test
+ public void shouldReplicateWithPushBatchSizeSetForRemote() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, 1);
+ reloadConfig();
+
+ // creating a change results in 2 refs creation therefore it already qualifies for push in two
+ // batches of size 1 each
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().refName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
new file mode 100644
index 0000000..cf8dbe3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationPushInBatchesIT.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.Project;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Test;
+
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationPushInBatchesIT extends ReplicationDaemon {
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ initConfig();
+ config.setInt("gerrit", null, "pushBatchSize", 1);
+ config.save();
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ super.setUpTestPlugin();
+ }
+
+ @Test
+ public void shouldReplicateWithPushBatchSizeSetGlobaly() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ // creating a change results in 2 refs creation therefore it already qualifies for push in two
+ // batches of size 1 each
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().refName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ WaitUtil.waitUntil(() -> checkedGetRef(repo, sourceRef) != null, Duration.ofSeconds(60));
+
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
index 901200b..5881ea7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -22,14 +22,14 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefEvent;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class TestDispatcher implements EventDispatcher {
- private final List<ProjectEvent> projectEvents = new LinkedList<>();
- private final List<RefEvent> refEvents = new LinkedList<>();
- private final List<Event> events = new LinkedList<>();
+ private final List<ProjectEvent> projectEvents = new ArrayList<>();
+ private final List<RefEvent> refEvents = new ArrayList<>();
+ private final List<Event> events = new ArrayList<>();
@Override
public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index f61114e..080f279 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.entities.Project;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
@@ -34,7 +35,7 @@
public static TestUriUpdates create(
Project.NameKey project, URIish uri, String remote, Set<String> refs) {
- return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+ return new AutoValue_TestUriUpdates(project, uri, remote, ImmutableSet.copyOf(refs));
}
@Override
@@ -47,5 +48,5 @@
public abstract String getRemoteName();
@Override
- public abstract Set<String> getRefs();
+ public abstract ImmutableSet<String> getRefs();
}