Merge "docs/metrics: fix rendering of text in angle brackets"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
index 89b97e9..c61a123 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -85,7 +85,7 @@
super(
threadPool,
stream.iterator(),
- new ForwardingRunner<T>(runner) {
+ new ForwardingRunner<>(runner) {
@Override
public void onDone() {
stream.close();
@@ -109,7 +109,7 @@
try {
runner.run(item);
} catch (RuntimeException e) { // catch to prevent chain from breaking
- logger.atSevere().withCause(e).log("Error while running: " + item);
+ logger.atSevere().withCause(e).log("Error while running: %s", item);
}
if (!scheduledNext) {
runner.onDone();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 00a46de..b205073 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -57,7 +57,6 @@
import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.Provides;
-import com.google.inject.Scopes;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
@@ -69,7 +68,9 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -80,7 +81,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import org.apache.http.impl.client.CloseableHttpClient;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
@@ -192,9 +193,6 @@
.to(AdminApiFactory.DefaultAdminApiFactory.class);
install(new FactoryModuleBuilder().build(GerritRestApi.Factory.class));
- bind(CloseableHttpClient.class)
- .toProvider(HttpClientProvider.class)
- .in(Scopes.SINGLETON);
}
@Provides
@@ -275,6 +273,8 @@
}
private void foreachPushOp(Map<URIish, PushOne> opsMap, Function<PushOne, Void> pushOneFunction) {
+ // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
+ // values to loop over them.
for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
pushOneFunction.apply(pushOne);
}
@@ -386,17 +386,21 @@
return false;
}
- void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) {
- schedule(project, ref, uri, state, false);
+ void schedule(Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state) {
+ schedule(project, refs, uri, state, false);
}
void schedule(
- Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
- if (!shouldReplicate(project, ref, state)) {
- repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
- return;
+ Project.NameKey project, Set<String> refs, URIish uri, ReplicationState state, boolean now) {
+ Set<String> refsToSchedule = new HashSet<>();
+ for (String ref : refs) {
+ if (!shouldReplicate(project, ref, state)) {
+ repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri);
+ continue;
+ }
+ refsToSchedule.add(ref);
}
- repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri);
+ repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri);
if (!config.replicatePermissions()) {
PushOne e;
@@ -427,22 +431,25 @@
PushOne task = getPendingPush(uri);
if (task == null) {
task = opFactory.create(project, uri);
- addRef(task, ref);
- task.addState(ref, state);
+ addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+ task.addState(refsToSchedule, state);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
pending.put(uri, task);
repLog.atInfo().log(
"scheduled %s:%s => %s to run %s",
- project, ref, task, now ? "now" : "after " + config.getDelay() + "s");
+ project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
} else {
- addRef(task, ref);
- task.addState(ref, state);
+ addRefs(task, ImmutableSet.copyOf(refsToSchedule));
+ task.addState(refsToSchedule, state);
repLog.atInfo().log(
- "consolidated %s:%s => %s with an existing pending push", project, ref, task);
+ "consolidated %s:%s => %s with an existing pending push",
+ project, refsToSchedule, task);
}
- state.increasePushTaskCount(project.get(), ref);
+ for (String ref : refsToSchedule) {
+ state.increasePushTaskCount(project.get(), ref);
+ }
}
}
@@ -458,6 +465,7 @@
synchronized (stateLock) {
URIish uri = pushOp.getURI();
pending.remove(uri);
+ pushOp.notifyNotAttempted(pushOp.getRefs());
}
}
@@ -475,9 +483,9 @@
pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
}
- private void addRef(PushOne e, String ref) {
- e.addRef(ref);
- postReplicationScheduledEvent(e, ref);
+ private void addRefs(PushOne e, ImmutableSet<String> refs) {
+ e.addRefBatch(refs);
+ postReplicationScheduledEvent(e, refs);
}
/**
@@ -521,7 +529,7 @@
// second one fails, it will also be rescheduled and then,
// here, find out replication to its URI is already pending
// for retry (blocking).
- pendingPushOp.addRefs(pushOp.getRefs());
+ pendingPushOp.addRefBatches(pushOp.getRefs());
pendingPushOp.addStates(pushOp.getStates());
pushOp.removeStates();
@@ -540,7 +548,7 @@
pendingPushOp.canceledByReplication();
pending.remove(uri);
- pushOp.addRefs(pendingPushOp.getRefs());
+ pushOp.addRefBatches(pendingPushOp.getRefs());
pushOp.addStates(pendingPushOp.getStates());
pendingPushOp.removeStates();
}
@@ -634,7 +642,7 @@
return true;
}
- boolean matches = (new ReplicationFilter(projects)).matches(project);
+ boolean matches = new ReplicationFilter(projects).matches(project);
if (!matches) {
repLog.atFine().log(
"Skipping replication of project %s; does not match filter", project.get());
@@ -789,10 +797,10 @@
postReplicationScheduledEvent(pushOp, null);
}
- private void postReplicationScheduledEvent(PushOne pushOp, String inputRef) {
- Set<String> refs = inputRef == null ? pushOp.getRefs() : ImmutableSet.of(inputRef);
+ private void postReplicationScheduledEvent(PushOne pushOp, ImmutableSet<String> inputRefs) {
+ Set<ImmutableSet<String>> refBatches = inputRefs == null ? pushOp.getRefs() : Set.of(inputRefs);
Project.NameKey project = pushOp.getProjectNameKey();
- for (String ref : refs) {
+ for (String ref : flattenSetOfRefBatches(refBatches)) {
ReplicationScheduledEvent event =
new ReplicationScheduledEvent(project.get(), ref, pushOp.getURI());
try {
@@ -805,7 +813,7 @@
private void postReplicationFailedEvent(PushOne pushOp, RemoteRefUpdate.Status status) {
Project.NameKey project = pushOp.getProjectNameKey();
- for (String ref : pushOp.getRefs()) {
+ for (String ref : flattenSetOfRefBatches(pushOp.getRefs())) {
RefReplicatedEvent event =
new RefReplicatedEvent(project.get(), ref, pushOp.getURI(), RefPushResult.FAILED, status);
try {
@@ -815,4 +823,8 @@
}
}
}
+
+ private Set<String> flattenSetOfRefBatches(Set<ImmutableSet<String>> refBatches) {
+ return refBatches.stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index 4957a64..79a0683 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -41,6 +41,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Predicate;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.transport.URIish;
@@ -157,11 +158,13 @@
}
@Override
- public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
+ public List<Destination> getDestinations(URIish uri, Project.NameKey project, Set<String> refs) {
List<Destination> dests = new ArrayList<>();
for (Destination dest : getAll(FilterType.ALL)) {
- if (dest.wouldPush(uri, project, ref)) {
- dests.add(dest);
+ for (String ref : refs) {
+ if (dest.wouldPush(uri, project, ref)) {
+ dests.add(dest);
+ }
}
}
return dests;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 8b3c9e2..da3089e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -22,15 +22,17 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
@@ -51,7 +53,6 @@
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.jcraft.jsch.JSchException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -85,7 +86,7 @@
import org.eclipse.jgit.transport.URIish;
/**
- * A push to remote operation started by {@link GitReferenceUpdatedListener}.
+ * A push to remote operation started by {@link GitBatchRefUpdateListener}.
*
* <p>Instance members are protected by the lock within PushQueue. Callers must take that lock to
* ensure they are working with a current view of the object.
@@ -120,7 +121,7 @@
private final Project.NameKey projectName;
private final URIish uri;
- private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
+ private final Set<ImmutableSet<String>> refBatchesToPush = Sets.newHashSetWithExpectedSize(4);
private boolean pushAllRefs;
private Repository git;
private boolean isCollision;
@@ -241,12 +242,16 @@
* config.
*/
protected String getLimitedRefs() {
- Set<String> refs = getRefs();
+ Set<ImmutableSet<String>> refs = getRefs();
int maxRefsToShow = replConfig.getMaxRefsToShow();
if (maxRefsToShow == 0) {
maxRefsToShow = refs.size();
}
- String refsString = refs.stream().limit(maxRefsToShow).collect(Collectors.joining(" "));
+ String refsString =
+ refs.stream()
+ .flatMap(Collection::stream)
+ .limit(maxRefsToShow)
+ .collect(Collectors.joining(" "));
int hiddenRefs = refs.size() - maxRefsToShow;
if (hiddenRefs > 0) {
refsString += " (+" + hiddenRefs + ")";
@@ -282,52 +287,60 @@
}
void addRef(String ref) {
- if (ALL_REFS.equals(ref)) {
- delta.clear();
+ addRefBatch(ImmutableSet.of(ref));
+ }
+
+ void addRefBatch(ImmutableSet<String> refBatch) {
+ if (refBatch.size() == 1 && refBatch.contains(ALL_REFS)) {
+ refBatchesToPush.clear();
pushAllRefs = true;
repLog.atFinest().log("Added all refs for replication to %s", uri);
- } else if (!pushAllRefs && delta.add(ref)) {
- repLog.atFinest().log("Added ref %s for replication to %s", ref, uri);
+ } else if (!pushAllRefs && refBatchesToPush.add(refBatch)) {
+ repLog.atFinest().log("Added ref %s for replication to %s", refBatch, uri);
}
}
@Override
- public Set<String> getRefs() {
- return pushAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
+ public Set<ImmutableSet<String>> getRefs() {
+ return pushAllRefs ? Set.of(ImmutableSet.of(ALL_REFS)) : refBatchesToPush;
}
- void addRefs(Set<String> refs) {
+ void addRefBatches(Set<ImmutableSet<String>> refBatches) {
if (!pushAllRefs) {
- for (String ref : refs) {
- addRef(ref);
+ for (ImmutableSet<String> refBatch : refBatches) {
+ addRefBatch(refBatch);
}
}
}
- Set<String> setStartedRefs(Set<String> startedRefs) {
- Set<String> notAttemptedRefs = Sets.difference(delta, startedRefs);
+ Set<ImmutableSet<String>> setStartedRefs(Set<ImmutableSet<String>> startedRefs) {
+ Set<ImmutableSet<String>> notAttemptedRefs = Sets.difference(refBatchesToPush, startedRefs);
pushAllRefs = false;
- delta.clear();
- addRefs(startedRefs);
+ refBatchesToPush.clear();
+ addRefBatches(startedRefs);
return notAttemptedRefs;
}
- void notifyNotAttempted(Set<String> notAttemptedRefs) {
- notAttemptedRefs.forEach(
- ref ->
- Arrays.asList(getStatesByRef(ref))
- .forEach(
- state ->
- state.notifyRefReplicated(
- projectName.get(),
- ref,
- uri,
- RefPushResult.NOT_ATTEMPTED,
- RemoteRefUpdate.Status.UP_TO_DATE)));
+ void notifyNotAttempted(Set<ImmutableSet<String>> notAttemptedRefs) {
+ notAttemptedRefs.stream()
+ .flatMap(Collection::stream)
+ .forEach(
+ ref ->
+ Arrays.asList(getStatesByRef(ref))
+ .forEach(
+ state ->
+ state.notifyRefReplicated(
+ projectName.get(),
+ ref,
+ uri,
+ RefPushResult.NOT_ATTEMPTED,
+ RemoteRefUpdate.Status.UP_TO_DATE)));
}
- void addState(String ref, ReplicationState state) {
- stateMap.put(ref, state);
+ void addState(Set<String> refs, ReplicationState state) {
+ for (String ref : refs) {
+ stateMap.put(ref, state);
+ }
}
ListMultimap<String, ReplicationState> getStates() {
@@ -455,10 +468,7 @@
} catch (NotSupportedException e) {
stateLog.error("Cannot replicate to " + uri, e, getStatesAsArray());
} catch (TransportException e) {
- Throwable cause = e.getCause();
- if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
- repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
- } else if (e instanceof UpdateRefFailureException) {
+ if (e instanceof UpdateRefFailureException) {
updateRefRetryCount++;
repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
@@ -649,7 +659,7 @@
// to only the references we will update during this operation.
//
Map<String, Ref> n = new HashMap<>();
- for (String src : delta) {
+ for (String src : flattenRefBatchesToPush()) {
Ref r = local.get(src);
if (r != null) {
n.put(src, r);
@@ -711,7 +721,8 @@
private List<RemoteRefUpdate> doPushDelta(Map<String, Ref> local) throws IOException {
List<RemoteRefUpdate> cmds = new ArrayList<>();
boolean noPerms = !pool.isReplicatePermissions();
- for (String src : delta) {
+ Set<String> refs = flattenRefBatchesToPush();
+ for (String src : refs) {
RefSpec spec = matchSrc(src);
if (spec != null) {
// If the ref still exists locally, send it, otherwise delete it.
@@ -762,7 +773,8 @@
return null;
}
- private void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
+ @VisibleForTesting
+ void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, src, dst, force, null, null));
@@ -860,6 +872,10 @@
stateMap.clear();
}
+ private Set<String> flattenRefBatchesToPush() {
+ return refBatchesToPush.stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ }
+
public static class UpdateRefFailureException extends TransportException {
private static final long serialVersionUID = 1L;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 4f33937..6ed47d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -30,7 +30,15 @@
public interface PushResultProcessing {
public static final PushResultProcessing NO_OP = new PushResultProcessing() {};
- /** Invoked when a ref has been replicated to one node. */
+ /**
+ * Invoked when a ref has been replicated to one node.
+ *
+ * @param project the project name
+ * @param ref the ref name
+ * @param uri the URI
+ * @param status the status of the push
+ * @param refStatus the status for the ref
+ */
default void onRefReplicatedToOneNode(
String project,
String ref,
@@ -38,10 +46,20 @@
RefPushResult status,
RemoteRefUpdate.Status refStatus) {}
- /** Invoked when a ref has been replicated to all nodes */
+ /**
+ * Invoked when refs have been replicated to all nodes.
+ *
+ * @param project the project name
+ * @param ref the ref name
+ * @param nodesCount the number of nodes
+ */
default void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
- /** Invoked when all refs have been replicated to all nodes */
+ /**
+ * Invoked when all refs have been replicated to all nodes.
+ *
+ * @param totalPushTasksCount total number of push tasks
+ */
default void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
/**
@@ -90,7 +108,7 @@
StringBuilder sb = new StringBuilder();
sb.append("Replicate ");
sb.append(project);
- sb.append(" ref ");
+ sb.append(" refs ");
sb.append(ref);
sb.append(" to ");
sb.append(resolveNodeName(uri));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
index 2fa6c34..78c1a35 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -20,6 +20,7 @@
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import org.eclipse.jgit.transport.URIish;
/** Git destinations currently active for replication. */
@@ -45,14 +46,14 @@
List<Destination> getAll(FilterType filterType);
/**
- * Return the active replication destinations for a uri/project/ref triplet.
+ * Return the active replication destinations for a uri/project/refs triplet.
*
* @param uriish uri of the destinations
* @param project name of the project
- * @param ref ref name
+ * @param refs ref names
* @return the list of active destinations
*/
- List<Destination> getDestinations(URIish uriish, Project.NameKey project, String ref);
+ List<Destination> getDestinations(URIish uriish, Project.NameKey project, Set<String> refs);
/** Returns true if there are no destinations, false otherwise. */
boolean isEmpty();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 5458b6c..2424e71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -72,8 +72,8 @@
}
/**
- * See
- * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
+ * See {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isReplicateAllOnPluginStart()}
*/
@Override
public boolean isReplicateAllOnPluginStart() {
@@ -81,8 +81,8 @@
}
/**
- * See
- * {@link com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
+ * See {@link
+ * com.googlesource.gerrit.plugins.replication.ReplicationConfig#isDefaultForceUpdate()}
*/
@Override
public boolean isDefaultForceUpdate() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 75fa5b3..9f331f2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -19,7 +19,7 @@
import com.google.common.eventbus.EventBus;
import com.google.gerrit.extensions.annotations.Exports;
import com.google.gerrit.extensions.config.CapabilityDefinition;
-import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
@@ -44,6 +44,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.transport.SshSessionFactory;
@@ -68,7 +69,7 @@
.annotatedWith(UniqueAnnotations.create())
.to(ReplicationQueue.class);
- DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class);
+ DynamicSet.bind(binder(), GitBatchRefUpdateListener.class).to(ReplicationQueue.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
@@ -122,6 +123,7 @@
bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class);
bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
+ bind(CloseableHttpClient.class).toProvider(HttpClientProvider.class).in(Scopes.SINGLETON);
}
private FileBasedConfig getReplicationConfig() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5310c14..5691ac2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -17,15 +17,17 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Queues;
import com.google.gerrit.common.UsedAt;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.util.logging.NamedFluentLogger;
import com.google.inject.Inject;
@@ -45,13 +47,14 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.eclipse.jgit.transport.URIish;
/** Manages automatic replication to remote repositories. */
public class ReplicationQueue
implements ObservableQueue,
LifecycleListener,
- GitReferenceUpdatedListener,
+ GitBatchRefUpdateListener,
ProjectDeletedListener,
HeadUpdatedListener {
static final String REPLICATION_LOG_NAME = "replication_log";
@@ -67,7 +70,7 @@
private final ProjectDeletionState.Factory projectDeletionStateFactory;
private volatile boolean running;
private final AtomicBoolean replaying = new AtomicBoolean();
- private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+ private final Queue<ReferencesUpdatedEvent> beforeStartupEventsQueue;
private Distributor distributor;
protected enum Prune {
@@ -128,71 +131,89 @@
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
- fire(project, urlMatch, PushOne.ALL_REFS, state, now);
+ fire(
+ project,
+ urlMatch,
+ Set.of(new GitReferenceUpdated.UpdatedRef(PushOne.ALL_REFS, null, null, null)),
+ state,
+ now);
}
@Override
- public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- fire(event.getProjectName(), event.getRefName());
+ public void onGitBatchRefUpdate(GitBatchRefUpdateListener.Event event) {
+ fire(event.getProjectName(), event.getUpdatedRefs());
}
- private void fire(String projectName, String refName) {
+ private void fire(String projectName, Set<UpdatedRef> updatedRefs) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
- fire(Project.nameKey(projectName), null, refName, state, false);
+ fire(Project.nameKey(projectName), null, updatedRefs, state, false);
state.markAllPushTasksScheduled();
}
private void fire(
Project.NameKey project,
String urlMatch,
- String refName,
+ Set<UpdatedRef> updatedRefs,
ReplicationState state,
boolean now) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
- beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
+ beforeStartupEventsQueue.add(ReferencesUpdatedEvent.create(project.get(), updatedRefs));
return;
}
for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
- pushReference(cfg, project, urlMatch, refName, state, now);
+ pushReferences(
+ cfg,
+ project,
+ urlMatch,
+ updatedRefs.stream().map(UpdatedRef::getRefName).collect(Collectors.toSet()),
+ state,
+ now);
}
}
- private void fire(URIish uri, Project.NameKey project, String refName) {
+ private void fire(URIish uri, Project.NameKey project, ImmutableSet<String> refNames) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
- for (Destination dest : destinations.get().getDestinations(uri, project, refName)) {
- dest.schedule(project, refName, uri, state);
+ for (Destination dest : destinations.get().getDestinations(uri, project, refNames)) {
+ dest.schedule(project, refNames, uri, state);
}
state.markAllPushTasksScheduled();
}
@UsedAt(UsedAt.Project.COLLABNET)
public void pushReference(Destination cfg, Project.NameKey project, String refName) {
- pushReference(cfg, project, null, refName, null, true);
+ pushReferences(cfg, project, null, Set.of(refName), null, true);
}
- private void pushReference(
+ private void pushReferences(
Destination cfg,
Project.NameKey project,
String urlMatch,
- String refName,
+ Set<String> refNames,
ReplicationState state,
boolean now) {
boolean withoutState = state == null;
if (withoutState) {
state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
}
- if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+ Set<String> refNamesToPush = new HashSet<>();
+ for (String refName : refNames) {
+ if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
+ refNamesToPush.add(refName);
+ } else {
+ repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
+ }
+ }
+ if (!refNamesToPush.isEmpty()) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
replicationTasksStorage.create(
- ReplicateRefUpdate.create(project.get(), refName, uri, cfg.getRemoteConfigName()));
- cfg.schedule(project, refName, uri, state, now);
+ ReplicateRefUpdate.create(
+ project.get(), refNamesToPush, uri, cfg.getRemoteConfigName()));
+ cfg.schedule(project, refNamesToPush, uri, state, now);
}
- } else {
- repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
}
if (withoutState) {
state.markAllPushTasksScheduled();
@@ -201,7 +222,8 @@
private void synchronizePendingEvents(Prune prune) {
if (replaying.compareAndSet(false, true)) {
- final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>();
+ final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate =
+ new ConcurrentHashMap<>();
if (Prune.TRUE.equals(prune)) {
for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate());
@@ -214,7 +236,7 @@
@Override
public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
try {
- fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+ fire(new URIish(u.uri()), Project.nameKey(u.project()), u.refs());
if (Prune.TRUE.equals(prune)) {
taskNamesByReplicateRefUpdate.remove(u);
}
@@ -236,7 +258,7 @@
@Override
public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
- return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
+ return "Scheduling push to " + String.format("%s:%s", u.project(), u.refs());
}
});
}
@@ -281,26 +303,31 @@
private void fireBeforeStartupEvents() {
Set<String> eventsReplayed = new HashSet<>();
- for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
- String eventKey = String.format("%s:%s", event.projectName(), event.refName());
+ for (ReferencesUpdatedEvent event : beforeStartupEventsQueue) {
+ String eventKey = String.format("%s:%s", event.projectName(), event.getRefNames());
if (!eventsReplayed.contains(eventKey)) {
repLog.atInfo().log("Firing pending task %s", event);
- fire(event.projectName(), event.refName());
+ fire(event.projectName(), event.updatedRefs());
eventsReplayed.add(eventKey);
}
}
}
@AutoValue
- abstract static class ReferenceUpdatedEvent {
+ abstract static class ReferencesUpdatedEvent {
- static ReferenceUpdatedEvent create(String projectName, String refName) {
- return new AutoValue_ReplicationQueue_ReferenceUpdatedEvent(projectName, refName);
+ static ReferencesUpdatedEvent create(String projectName, Set<UpdatedRef> updatedRefs) {
+ return new AutoValue_ReplicationQueue_ReferencesUpdatedEvent(
+ projectName, ImmutableSet.copyOf(updatedRefs));
}
public abstract String projectName();
- public abstract String refName();
+ public abstract ImmutableSet<UpdatedRef> updatedRefs();
+
+ public Set<String> getRefNames() {
+ return updatedRefs().stream().map(UpdatedRef::getRefName).collect(Collectors.toSet());
+ }
}
protected class Distributor implements WorkQueue.CancelableRunnable {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
index 3e73033..a2bcf2b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -31,19 +31,19 @@
@Override
public void warn(String msg, ReplicationState... states) {
stateWriteErr("Warning: " + msg, states);
- repLog.atWarning().log(msg);
+ repLog.atWarning().log("%s", msg);
}
@Override
public void error(String msg, ReplicationState... states) {
stateWriteErr("Error: " + msg, states);
- repLog.atSevere().log(msg);
+ repLog.atSevere().log("%s", msg);
}
@Override
public void error(String msg, Throwable t, ReplicationState... states) {
stateWriteErr("Error: " + msg, states);
- repLog.atSevere().withCause(t).log(msg);
+ repLog.atSevere().withCause(t).log("%s", msg);
}
private void stateWriteErr(String msg, ReplicationState[] states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 4736402..3d04cdd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -18,15 +18,22 @@
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.common.hash.Hashing;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
import com.google.inject.Inject;
import com.google.inject.ProvisionException;
import com.google.inject.Singleton;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.NotDirectoryException;
@@ -79,30 +86,50 @@
public static ReplicateRefUpdate create(Path file, Gson gson) throws IOException {
String json = new String(Files.readAllBytes(file), UTF_8);
- return gson.fromJson(json, ReplicateRefUpdate.class);
+ return create(gson.fromJson(json, ReplicateRefUpdate.class), file.getFileName().toString());
}
- public static ReplicateRefUpdate create(String project, String ref, URIish uri, String remote) {
+ public static ReplicateRefUpdate create(
+ String project, Set<String> refs, URIish uri, String remote) {
return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate(
- project, ref, uri.toASCIIString(), remote);
+ project,
+ ImmutableSet.copyOf(refs),
+ uri.toASCIIString(),
+ remote,
+ sha1(project, ImmutableSet.copyOf(refs), uri.toASCIIString(), remote));
+ }
+
+ public static ReplicateRefUpdate create(ReplicateRefUpdate u, String filename) {
+ return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate(
+ u.project(), u.refs(), u.uri(), u.remote(), filename);
}
public abstract String project();
- public abstract String ref();
+ public abstract ImmutableSet<String> refs();
public abstract String uri();
public abstract String remote();
- public String sha1() {
- return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote())
+ public abstract String sha1();
+
+ private static String sha1(String project, Set<String> refs, String uri, String remote) {
+ return ReplicationTasksStorage.sha1(
+ project + "\n" + refs.toString() + "\n" + uri + "\n" + remote)
.name();
}
@Override
public final String toString() {
- return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote();
+ return "ref-update "
+ + project()
+ + ":"
+ + refs().toString()
+ + " uri:"
+ + uri()
+ + " remote:"
+ + remote();
}
public static TypeAdapter<ReplicateRefUpdate> typeAdapter(Gson gson) {
@@ -127,19 +154,21 @@
runningUpdates = refUpdates.resolve("running");
waitingUpdates = refUpdates.resolve("waiting");
gson =
- new GsonBuilder().registerTypeAdapterFactory(AutoValueTypeAdapterFactory.create()).create();
+ new GsonBuilder()
+ .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+ .create();
}
public synchronized String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
- public synchronized Set<String> start(UriUpdates uriUpdates) {
- Set<String> startedRefs = new HashSet<>();
+ public synchronized Set<ImmutableSet<String>> start(UriUpdates uriUpdates) {
+ Set<ImmutableSet<String>> startedRefs = new HashSet<>();
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
Task t = new Task(update);
if (t.start()) {
- startedRefs.add(t.update.ref());
+ startedRefs.add(t.update.refs());
}
}
return startedRefs;
@@ -206,6 +235,102 @@
}
}
+ public static final class ReplicateRefUpdateTypeAdapterFactory implements TypeAdapterFactory {
+ static class ReplicateRefUpdateTypeAdapter<T> extends TypeAdapter<ReplicateRefUpdate> {
+
+ @Override
+ public void write(JsonWriter out, ReplicateRefUpdate value) throws IOException {
+ if (value == null) {
+ out.nullValue();
+ return;
+ }
+ out.beginObject();
+
+ out.name("project");
+ out.value(value.project());
+
+ out.name("refs");
+ out.beginArray();
+ for (String ref : value.refs()) {
+ out.value(ref);
+ }
+ out.endArray();
+
+ out.name("uri");
+ out.value(value.uri());
+
+ out.name("remote");
+ out.value(value.remote());
+
+ out.endObject();
+ }
+
+ @Override
+ public ReplicateRefUpdate read(JsonReader in) throws IOException {
+ if (in.peek() == JsonToken.NULL) {
+ in.nextNull();
+ return null;
+ }
+ String project = null;
+ Set<String> refs = new HashSet<>();
+ URIish uri = null;
+ String remote = null;
+
+ String fieldname = null;
+ in.beginObject();
+
+ while (in.hasNext()) {
+ JsonToken token = in.peek();
+
+ if (token.equals(JsonToken.NAME)) {
+ fieldname = in.nextName();
+ }
+
+ switch (fieldname) {
+ case "project":
+ project = in.nextString();
+ break;
+ case "refs":
+ in.beginArray();
+ while (in.hasNext()) {
+ refs.add(in.nextString());
+ }
+ in.endArray();
+ break;
+ case "ref":
+ refs.add(in.nextString());
+ break;
+ case "uri":
+ try {
+ uri = new URIish(in.nextString());
+ } catch (URISyntaxException e) {
+ throw new IOException("Unable to parse remote URI", e);
+ }
+ break;
+ case "remote":
+ remote = in.nextString();
+ break;
+ default:
+ throw new IOException(String.format("Unknown field in stored task: %s", fieldname));
+ }
+ }
+
+ in.endObject();
+ return ReplicateRefUpdate.create(project, refs, uri, remote);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+ if (type.equals(TypeToken.get(AutoValue_ReplicationTasksStorage_ReplicateRefUpdate.class))
+ || type.equals(TypeToken.get(ReplicateRefUpdate.class))) {
+ return (TypeAdapter<T>) new ReplicateRefUpdateTypeAdapter<T>();
+ }
+ return null;
+ }
+ }
+
@VisibleForTesting
class Task {
public final ReplicateRefUpdate update;
@@ -263,7 +388,8 @@
}
}
- private boolean rename(Path from, Path to) {
+ @VisibleForTesting
+ boolean rename(Path from, Path to) {
try {
logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
@@ -275,7 +401,7 @@
}
private String updateLog() {
- return String.format("(%s:%s => %s)", update.project(), update.ref(), update.uri());
+ return String.format("(%s:%s => %s)", update.project(), update.refs(), update.uri());
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
index a9985d2..77a5574 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.entities.Project;
import java.util.List;
import java.util.Set;
@@ -28,14 +29,15 @@
String getRemoteName();
- Set<String> getRefs();
+ Set<ImmutableSet<String>> getRefs();
default List<ReplicationTasksStorage.ReplicateRefUpdate> getReplicateRefUpdates() {
+ // TODO: keep batch refs together
return getRefs().stream()
.map(
- (ref) ->
+ (refs) ->
ReplicationTasksStorage.ReplicateRefUpdate.create(
- getProjectNameKey().get(), ref, getURI(), getRemoteName()))
+ getProjectNameKey().get(), refs, getURI(), getRemoteName()))
.collect(Collectors.toList());
}
}
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
index b6688e0..e815e08 100644
--- a/src/main/resources/Documentation/cmd-list.md
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -30,15 +30,15 @@
-------
`--remote <PATTERN>`
-: Only print information for destinations whose remote name matches
- the `PATTERN`.
+: Only print information for destinations whose remote name matches
+the `PATTERN`.
`--detail`
-: Print additional detailed information: AdminUrl, AuthGroup, Project
- and queue (pending and in-flight).
+: Print additional detailed information: AdminUrl, AuthGroup, Project
+and queue (pending and in-flight).
`--json`
-: Output in json format.
+: Output in json format.
EXAMPLES
--------
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 5c8e2c7..d401030 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -510,7 +510,8 @@
remote.NAME.remoteNameStyle
: Provides possibilities to influence the name of the target
repository, e.g. by replacing slashes in the `${name}`
- placeholder.
+ placeholder, when the target remote repository is not served
+ by Gerrit.
Github and Gitorious do not permit slashes "/" in repository
names and will change them to dashes "-" at repository creation
@@ -525,6 +526,14 @@
Gerrit server, e.g. `${name}` of `foo/bar/my-repo.git` would
be `my-repo`.
+ > **NOTE**: The use of repository name translation using `remoteNameStyle`
+ > may lead to dangerous situations if there are multiple repositories
+ > that may be mapped to the same target name. For instance when
+ > mapping `/foo/my-repo.git` to `my-repo` using "basenameOnly"
+ > would also map `/bar/my-repo.git` to the same `my-repo` leading
+ > to conflicts where commits can be lost between the two repositories
+ > replicating to the same target `my-repo`.
+
By default, "slash", i.e. remote names will contain slashes as
they do in Gerrit.
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
index 541e95e..aeb71fa 100644
--- a/src/main/resources/Documentation/metrics.md
+++ b/src/main/resources/Documentation/metrics.md
@@ -1,4 +1,5 @@
-# Metrics
+Metrics
+=======
Some metrics are emitted when replication occurs to a remote destination.
The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
@@ -7,17 +8,21 @@
The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
could potentially be considerably big.
-### Project level
+Project level
+-------------
* `plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName>` - Time spent pushing `<ProjectName>` to remote `<destinationName>` (in ms)
-### Destination level
+Destination level
+-----------------
* `plugins_replication_replication_delay_<destinationName>` - Time spent waiting before pushing to remote `<destinationName>` (in ms)
* `plugins_replication_replication_retries_<destinationName>` - Number of retries when pushing to remote `<destinationName>`
* `plugins_replication_replication_latency_<destinationName>` - Time spent pushing to remote `<destinationName>` (in ms)
-### Example
+Example
+-------
+
```
# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
# TYPE plugins_replication_replication_delay_destination summary
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
index e7339d9..725052c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -82,7 +82,7 @@
}
private Provider<ReplicationConfig> newVersionConfigProvider() {
- return new Provider<ReplicationConfig>() {
+ return new Provider<>() {
@Override
public ReplicationConfig get() {
return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
index 90191f2..242922d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -19,9 +19,9 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -367,7 +367,7 @@
WaitingRunner runner = new WaitingRunner();
int batchSize = 5; // how many tasks are started concurrently
- Queue<CountDownLatch> batches = new LinkedList<>();
+ Queue<CountDownLatch> batches = new ArrayDeque<>();
for (int b = 0; b < blockSize; b++) {
batches.add(executeWaitingRunnableBatch(batchSize, executor));
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
index a1f61fe..5e6bde8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -65,6 +65,7 @@
return implementedByOverrider;
}
} catch (NoSuchMethodException | SecurityException e) {
+ return null;
}
return null;
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index bb3e886..dc29612 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -17,6 +17,7 @@
import static org.eclipse.jgit.lib.Ref.Storage.NEW;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -25,6 +26,7 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.metrics.Timer1;
@@ -69,6 +71,7 @@
import org.eclipse.jgit.util.FS;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -228,6 +231,55 @@
}
@Test
+ public void shouldPushMetaRefTogetherWithChangeRef() throws InterruptedException, IOException {
+ PushOne pushOne = Mockito.spy(createPushOne(null));
+
+ Ref newLocalChangeRef =
+ new ObjectIdRef.Unpeeled(
+ NEW,
+ "refs/changes/11/11111/1",
+ ObjectId.fromString("0000000000000000000000000000000000000002"));
+
+ Ref newLocalChangeMetaRef =
+ new ObjectIdRef.Unpeeled(
+ NEW,
+ "refs/changes/11/11111/meta",
+ ObjectId.fromString("0000000000000000000000000000000000000003"));
+
+ localRefs.add(newLocalChangeRef);
+ localRefs.add(newLocalChangeMetaRef);
+
+ pushOne.addRefBatch(
+ ImmutableSet.of(newLocalChangeRef.getName(), newLocalChangeMetaRef.getName()));
+ pushOne.run();
+
+ isCallFinished.await(10, TimeUnit.SECONDS);
+ verify(transportMock, atLeastOnce()).push(any(), any());
+ verify(pushOne, times(2)).push(any(), any(), any());
+ }
+
+ @Test
+ public void shouldNotAttemptDuplicateRemoteRefUpdate() throws InterruptedException, IOException {
+ PushOne pushOne = Mockito.spy(createPushOne(null));
+
+ Ref newLocalChangeRef =
+ new ObjectIdRef.Unpeeled(
+ NEW,
+ "refs/changes/11/11111/1",
+ ObjectId.fromString("0000000000000000000000000000000000000002"));
+
+ localRefs.add(newLocalChangeRef);
+
+ pushOne.addRefBatch(ImmutableSet.of(newLocalChangeRef.getName()));
+ pushOne.addRefBatch(ImmutableSet.of(newLocalChangeRef.getName()));
+ pushOne.run();
+
+ isCallFinished.await(10, TimeUnit.SECONDS);
+ verify(transportMock, times(1)).push(any(), any());
+ verify(pushOne, times(1)).push(any(), any(), any());
+ }
+
+ @Test
public void shouldPushInSingleOperationWhenPushBatchSizeIsNotConfigured()
throws InterruptedException, IOException {
replicateTwoRefs(createPushOne(null));
@@ -322,7 +374,7 @@
@Override
public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
- return new Callable<Object>() {
+ return new Callable<>() {
@Override
public Object call() throws Exception {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
index 3d0dfc1..dfcf250 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -24,6 +24,7 @@
import com.google.gerrit.server.git.WorkQueue;
import java.time.Duration;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefUpdate;
@@ -61,7 +62,7 @@
Project.NameKey targetProject = createTestProject(project + replica);
ReplicationTasksStorage.ReplicateRefUpdate ref =
ReplicationTasksStorage.ReplicateRefUpdate.create(
- project.get(), newBranch, new URIish(getProjectUri(targetProject)), remote);
+ project.get(), Set.of(newBranch), new URIish(getProjectUri(targetProject)), remote);
createBranch(project, master, newBranch);
setReplicationDestination(remote, replica, ALL_PROJECTS);
reloadConfig();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
index 4cd55f9..b32829c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -384,6 +384,7 @@
}
}
+ @SuppressWarnings("deprecation")
private boolean equals(ReplicationScheduledEvent scheduledEvent, Object other) {
if (!(other instanceof ReplicationScheduledEvent)) {
return false;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 1c1f983..5f80e8c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -191,7 +191,7 @@
Pattern refmaskPattern = Pattern.compile(refRegex);
return tasksStorage
.streamWaiting()
- .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .filter(task -> task.refs().stream().anyMatch(ref -> refmaskPattern.matcher(ref).matches()))
.collect(toList());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 33bd91d..9285c58 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -102,6 +111,7 @@
Result pushResult = createChange();
RevCommit sourceCommit = pushResult.getCommit();
String sourceRef = pushResult.getPatchSet().refName();
+ String metaRef = sourceRef.substring(0, sourceRef.lastIndexOf('/') + 1).concat("meta");
try (Repository repo = repoManager.openRepository(targetProject)) {
waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -109,6 +119,9 @@
Ref targetBranchRef = getRef(repo, sourceRef);
assertThat(targetBranchRef).isNotNull();
assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+
+ Ref targetBranchMetaRef = getRef(repo, metaRef);
+ assertThat(targetBranchMetaRef).isNotNull();
}
}
@@ -217,6 +230,75 @@
}
@Test
+ public void pushAllWait() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ future.get();
+ state.waitForReplication();
+ }
+
+ @Test
+ public void pushAllWaitCancelNotRunningTask() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Executor service = Executors.newSingleThreadExecutor();
+ service.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ state.waitForReplication();
+ latch.countDown();
+ } catch (Exception e) {
+ // fails the test because we don't countDown
+ }
+ }
+ });
+
+ // Cancel the replication task
+ waitUntil(() -> getProjectTasks().size() != 0);
+ WorkQueue.Task<?> task = getProjectTasks().get(0);
+ assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+ task.cancel(false);
+
+ // Confirm our waiting thread completed
+ boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+ assertThat(receivedSignal).isTrue();
+ }
+
+ private List<WorkQueue.Task<?>> getProjectTasks() {
+ return getInstance(WorkQueue.class).getTasks().stream()
+ .filter(t -> t instanceof WorkQueue.ProjectTask)
+ .collect(Collectors.toList());
+ }
+
+ @Test
public void shouldReplicateHeadUpdate() throws Exception {
setReplicationDestination("foo", "replica", ALL_PROJECTS);
reloadConfig();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
index f549f47..b6d14e8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -61,7 +61,7 @@
Pattern refmaskPattern = Pattern.compile(refRegex);
return tasksStorage
.streamWaiting()
- .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .filter(task -> refmaskPattern.matcher(task.refs().toArray()[0].toString()).matches())
.collect(toList());
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index e2e1e21..9390798 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.eclipse.jgit.transport.URIish;
@@ -96,7 +97,7 @@
.forEach(
(update) -> {
try {
- UriUpdates uriUpdates = TestUriUpdates.create(update);
+ UriUpdates uriUpdates = new TestUriUpdates(update);
tasksStorage.start(uriUpdates);
tasksStorage.finish(uriUpdates);
} catch (URISyntaxException e) {
@@ -125,7 +126,7 @@
.forEach(
(update) -> {
try {
- UriUpdates uriUpdates = TestUriUpdates.create(update);
+ UriUpdates uriUpdates = new TestUriUpdates(update);
tasksStorage.start(uriUpdates);
tasksStorage.finish(uriUpdates);
} catch (URISyntaxException e) {
@@ -211,7 +212,7 @@
.forEach(
(task) -> {
assertThat(task.uri()).isEqualTo(expectedURI);
- assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+ assertThat(task.refs()).isEqualTo(Set.of(PushOne.ALL_REFS));
});
}
@@ -236,7 +237,7 @@
.forEach(
(task) -> {
assertThat(task.uri()).isEqualTo(expectedURI);
- assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+ assertThat(task.refs()).isEqualTo(Set.of(PushOne.ALL_REFS));
});
}
@@ -351,14 +352,14 @@
String changeRef, String remote) {
return tasksStorage
.streamWaiting()
- .filter(task -> changeRef.equals(task.ref()))
+ .filter(task -> task.refs().stream().anyMatch(ref -> changeRef.equals(ref)))
.filter(task -> remote.equals(task.remote()));
}
private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
return updates
- .filter(task -> changeRef.equals(task.ref()))
+ .filter(task -> task.refs().stream().anyMatch(ref -> changeRef.equals(ref)))
.filter(task -> remote.equals(task.remote()));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
index 5cfb2d0..cf5168f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -24,6 +24,7 @@
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
+import java.util.Set;
import org.eclipse.jgit.transport.URIish;
import org.junit.After;
import org.junit.Before;
@@ -36,7 +37,9 @@
protected static final URIish URISH =
ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
protected static final ReplicateRefUpdate REF_UPDATE =
- ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+ ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
+ protected static final ReplicateRefUpdate STORED_REF_UPDATE =
+ ReplicateRefUpdate.create(REF_UPDATE, REF_UPDATE.sha1());
protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE);
protected ReplicationTasksStorage nodeA;
@@ -66,7 +69,7 @@
nodeA.create(REF_UPDATE);
nodeB.create(REF_UPDATE);
- assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(STORED_REF_UPDATE);
}
@Test
@@ -74,7 +77,7 @@
nodeA.create(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -86,10 +89,10 @@
nodeA.start(URI_UPDATES);
nodeA.reset(URI_UPDATES);
- assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(STORED_REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -103,10 +106,10 @@
nodeB.start(URI_UPDATES);
nodeB.reset(URI_UPDATES);
- assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(STORED_REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -121,7 +124,7 @@
nodeB.reset(URI_UPDATES);
nodeA.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeA.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -133,10 +136,10 @@
nodeA.start(URI_UPDATES);
nodeB.recoverAll();
- assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(STORED_REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeA.finish(URI_UPDATES);
// Bug: https://crbug.com/gerrit/12973
@@ -153,10 +156,10 @@
nodeB.recoverAll();
nodeA.finish(URI_UPDATES);
- assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(STORED_REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -169,7 +172,7 @@
nodeB.recoverAll();
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeB.finish(URI_UPDATES);
ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
@@ -182,14 +185,14 @@
public void multipleNodesCanReplicateSameRef() {
nodeA.create(REF_UPDATE);
nodeA.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeA.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
nodeB.create(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -200,16 +203,16 @@
nodeA.create(REF_UPDATE);
nodeB.create(REF_UPDATE);
- assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref());
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.refs());
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
assertThat(nodeB.start(URI_UPDATES)).isEmpty();
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
}
public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
try {
- return TestUriUpdates.create(refUpdate);
+ return new TestUriUpdates(refUpdate);
} catch (URISyntaxException e) {
throw new RuntimeException("Cannot instantiate UriUpdates object", e);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
index 202cac9..481fa9c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -24,6 +24,7 @@
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.nio.file.FileSystem;
import java.nio.file.Path;
+import java.util.Set;
import org.eclipse.jgit.transport.URIish;
import org.junit.After;
import org.junit.Before;
@@ -36,7 +37,7 @@
protected static final URIish URISH =
ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
protected static final ReplicateRefUpdate REF_UPDATE =
- ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+ ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
protected FileSystem fileSystem;
protected Path storageSite;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index a2e5e4d..43c0b3e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -14,17 +14,28 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.hash.Hashing;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdateTypeAdapterFactory;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.Task;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Set;
+import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.transport.URIish;
import org.junit.After;
import org.junit.Before;
@@ -36,7 +47,7 @@
protected static final String REMOTE = "myDest";
protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
protected static final ReplicateRefUpdate REF_UPDATE =
- ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+ ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
protected ReplicationTasksStorage tasksStorage;
protected FileSystem fileSystem;
@@ -200,8 +211,10 @@
@Test
public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception {
- Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
- Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
+ Task updateA =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE));
+ Task updateB =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE));
updateA.create();
updateB.create();
assertIsWaiting(updateA);
@@ -210,8 +223,10 @@
@Test
public void canHaveTwoRunningTasksForDifferentRefs() throws Exception {
- Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
- Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
+ Task updateA =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE));
+ Task updateB =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE));
updateA.create();
updateB.create();
updateA.start();
@@ -226,12 +241,12 @@
tasksStorage
.new Task(
ReplicateRefUpdate.create(
- "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+ "projectA", Set.of(REF), getUrish("http://example.com/projectA.git"), REMOTE));
Task updateB =
tasksStorage
.new Task(
ReplicateRefUpdate.create(
- "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+ "projectB", Set.of(REF), getUrish("http://example.com/projectB.git"), REMOTE));
updateA.create();
updateB.create();
assertIsWaiting(updateA);
@@ -244,12 +259,12 @@
tasksStorage
.new Task(
ReplicateRefUpdate.create(
- "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
+ "projectA", Set.of(REF), getUrish("http://example.com/projectA.git"), REMOTE));
Task updateB =
tasksStorage
.new Task(
ReplicateRefUpdate.create(
- "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
+ "projectB", Set.of(REF), getUrish("http://example.com/projectB.git"), REMOTE));
updateA.create();
updateB.create();
updateA.start();
@@ -260,8 +275,10 @@
@Test
public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception {
- Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
- Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
+ Task updateA =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteA"));
+ Task updateB =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteB"));
updateA.create();
updateB.create();
assertIsWaiting(updateA);
@@ -270,8 +287,10 @@
@Test
public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception {
- Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
- Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
+ Task updateA =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteA"));
+ Task updateB =
+ tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, "remoteB"));
updateA.create();
updateB.create();
updateA.start();
@@ -346,6 +365,130 @@
assertIsWaiting(persistedView);
}
+ @Test
+ public void writeReplicateRefUpdateTypeAdapter() throws Exception {
+ Gson gson =
+ new GsonBuilder()
+ .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+ .create();
+ ReplicateRefUpdate update =
+ ReplicateRefUpdate.create(
+ "someProject",
+ ImmutableSet.of("ref1"),
+ new URIish("git://host1/someRepo.git"),
+ "someRemote");
+ assertEquals(
+ gson.toJson(update),
+ "{\"project\":\"someProject\",\"refs\":[\"ref1\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}");
+ ReplicateRefUpdate update2 =
+ ReplicateRefUpdate.create(
+ "someProject",
+ ImmutableSet.of("ref1", "ref2"),
+ new URIish("git://host1/someRepo.git"),
+ "someRemote");
+ assertEquals(
+ gson.toJson(update2),
+ "{\"project\":\"someProject\",\"refs\":[\"ref1\",\"ref2\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}");
+ }
+
+ @Test
+ public void ReadReplicateRefUpdateTypeAdapter() throws Exception {
+ Gson gson =
+ new GsonBuilder()
+ .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+ .create();
+ ReplicateRefUpdate update =
+ ReplicateRefUpdate.create(
+ "someProject",
+ ImmutableSet.of("ref1"),
+ new URIish("git://host1/someRepo.git"),
+ "someRemote");
+ assertEquals(
+ gson.fromJson(
+ "{\"project\":\"someProject\",\"refs\":[\"ref1\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}",
+ ReplicateRefUpdate.class),
+ update);
+ ReplicateRefUpdate update2 =
+ ReplicateRefUpdate.create(
+ "someProject",
+ ImmutableSet.of("ref1", "ref2"),
+ new URIish("git://host1/someRepo.git"),
+ "someRemote");
+ ReplicateRefUpdate restoredUpdate2 =
+ gson.fromJson(
+ "{\"project\":\"someProject\",\"refs\":[\"ref1\",\"ref2\"],\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}",
+ ReplicateRefUpdate.class);
+ // ReplicateRefUpdate.sha() might be different for the compared objects, since
+ // the order of refs() might differ.
+ assertEquals(update2.project(), restoredUpdate2.project());
+ assertEquals(update2.uri(), restoredUpdate2.uri());
+ assertEquals(update2.remote(), restoredUpdate2.remote());
+ assertEquals(update2.refs(), restoredUpdate2.refs());
+ }
+
+ @Test
+ public void ReplicateRefUpdateTypeAdapter_FailWithUnknownField() throws Exception {
+ Gson gson =
+ new GsonBuilder()
+ .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+ .create();
+ assertThrows(
+ JsonSyntaxException.class,
+ () -> gson.fromJson("{\"unknownKey\":\"someValue\"}", ReplicateRefUpdate.class));
+ }
+
+ @Test
+ public void ReadOldFormatReplicateRefUpdateTypeAdapter() throws Exception {
+ Gson gson =
+ new GsonBuilder()
+ .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+ .create();
+ ReplicateRefUpdate update =
+ ReplicateRefUpdate.create(
+ "someProject",
+ ImmutableSet.of("ref1"),
+ new URIish("git://host1/someRepo.git"),
+ "someRemote");
+ assertEquals(
+ gson.fromJson(
+ "{\"project\":\"someProject\",\"ref\":\"ref1\",\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}",
+ ReplicateRefUpdate.class),
+ update);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void schedulingTaskFromOldFormatTasksIsSuccessful() throws Exception {
+ Gson gson =
+ new GsonBuilder()
+ .registerTypeAdapterFactory(new ReplicateRefUpdateTypeAdapterFactory())
+ .create();
+ ReplicateRefUpdate update =
+ gson.fromJson(
+ "{\"project\":\"someProject\",\"ref\":\"ref1\",\"uri\":\"git://host1/someRepo.git\",\"remote\":\"someRemote\"}",
+ ReplicateRefUpdate.class);
+
+ String oldTaskKey =
+ ObjectId.fromRaw(
+ Hashing.sha1()
+ .hashString("someProject\nref1\ngit://host1/someRepo.git\nsomeRemote", UTF_8)
+ .asBytes())
+ .name();
+ String json = gson.toJson(update) + "\n";
+ Path tmp =
+ Files.createTempFile(
+ Files.createDirectories(fileSystem.getPath("replication_site").resolve("building")),
+ oldTaskKey,
+ null);
+ Files.write(tmp, json.getBytes(UTF_8));
+
+ Task task = tasksStorage.new Task(update);
+ task.rename(tmp, task.waiting);
+
+ task.start();
+ assertIsRunning(task);
+ }
+
protected static void assertIsWaiting(Task task) {
assertTrue(task.isWaiting());
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 16a0363..7061c79 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -27,6 +27,7 @@
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jgit.transport.URIish;
@@ -37,10 +38,17 @@
public class ReplicationTasksStorageTest {
protected static final String PROJECT = "myProject";
protected static final String REF = "myRef";
+ protected static final String REF_2 = "myRef2";
protected static final String REMOTE = "myDest";
protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
protected static final ReplicateRefUpdate REF_UPDATE =
- ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
+ ReplicateRefUpdate.create(PROJECT, Set.of(REF), URISH, REMOTE);
+ protected static final ReplicateRefUpdate STORED_REF_UPDATE =
+ ReplicateRefUpdate.create(REF_UPDATE, REF_UPDATE.sha1());
+ protected static final ReplicateRefUpdate REFS_UPDATE =
+ ReplicateRefUpdate.create(PROJECT, Set.of(REF, REF_2), URISH, REMOTE);
+ protected static final ReplicateRefUpdate STORED_REFS_UPDATE =
+ ReplicateRefUpdate.create(REFS_UPDATE, REFS_UPDATE.sha1());
protected ReplicationTasksStorage storage;
protected FileSystem fileSystem;
@@ -52,7 +60,7 @@
fileSystem = Jimfs.newFileSystem(Configuration.unix());
storageSite = fileSystem.getPath("replication_site");
storage = new ReplicationTasksStorage(storageSite);
- uriUpdates = TestUriUpdates.create(REF_UPDATE);
+ uriUpdates = new TestUriUpdates(REF_UPDATE);
}
@After
@@ -69,7 +77,7 @@
@Test
public void canListWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
- assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(STORED_REF_UPDATE);
}
@Test
@@ -84,10 +92,20 @@
@Test
public void canStartWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
- assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref());
+ assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.refs());
assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE);
+ }
+
+ @Test
+ public void canStartWaitingUpdateWithMultipleRefs() throws Exception {
+ TestUriUpdates updates = new TestUriUpdates(REFS_UPDATE);
+ storage.create(REFS_UPDATE);
+ assertThat(storage.start(updates)).containsExactly(REFS_UPDATE.refs());
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertFalse(storage.isWaiting(updates));
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REFS_UPDATE);
}
@Test
@@ -106,14 +124,14 @@
assertThatStream(persistedView.streamWaiting()).isEmpty();
storage.create(REF_UPDATE);
- assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
- assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(STORED_REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(STORED_REF_UPDATE);
storage.start(uriUpdates);
assertThatStream(storage.streamWaiting()).isEmpty();
assertThatStream(persistedView.streamWaiting()).isEmpty();
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
- assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(STORED_REF_UPDATE);
storage.finish(uriUpdates);
assertThatStream(storage.streamRunning()).isEmpty();
@@ -125,7 +143,7 @@
String key = storage.create(REF_UPDATE);
String secondKey = storage.create(REF_UPDATE);
assertEquals(key, secondKey);
- assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(STORED_REF_UPDATE);
}
@Test
@@ -133,7 +151,7 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
@@ -141,7 +159,7 @@
String keyB = storage.create(updateB);
assertThatStream(storage.streamWaiting()).hasSize(2);
assertTrue(storage.isWaiting(uriUpdates));
- assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
+ assertTrue(storage.isWaiting(new TestUriUpdates(updateB)));
assertNotEquals(keyA, keyB);
}
@@ -150,20 +168,21 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
- UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+ UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
+ ReplicateRefUpdate storedUpdateB = ReplicateRefUpdate.create(updateB, updateB.sha1());
storage.start(uriUpdates);
- assertThatStream(storage.streamWaiting()).containsExactly(updateB);
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(storedUpdateB);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE);
storage.start(uriUpdatesB);
assertThatStream(storage.streamWaiting()).isEmpty();
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE, storedUpdateB);
}
@Test
@@ -171,17 +190,18 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
- UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+ UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
storage.start(uriUpdates);
storage.start(uriUpdatesB);
storage.finish(uriUpdates);
- assertThatStream(storage.streamRunning()).containsExactly(updateB);
+ assertThatStream(storage.streamRunning())
+ .containsExactly(ReplicateRefUpdate.create(updateB, updateB.sha1()));
storage.finish(uriUpdatesB);
assertThatStream(storage.streamRunning()).isEmpty();
@@ -192,7 +212,7 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
@@ -202,35 +222,38 @@
storage.create(updateB);
assertThatStream(storage.streamWaiting()).hasSize(2);
assertTrue(storage.isWaiting(uriUpdates));
- assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
+ assertTrue(storage.isWaiting(new TestUriUpdates(updateB)));
}
@Test
public void canCreateMulipleRefsForSameUri() throws Exception {
- ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
- ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
+ ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE);
+ ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE);
String keyA = storage.create(refA);
String keyB = storage.create(refB);
assertThatStream(storage.streamWaiting()).hasSize(2);
assertNotEquals(keyA, keyB);
- assertTrue(storage.isWaiting(TestUriUpdates.create(refA)));
- assertTrue(storage.isWaiting(TestUriUpdates.create(refB)));
+ assertTrue(storage.isWaiting(new TestUriUpdates(refA)));
+ assertTrue(storage.isWaiting(new TestUriUpdates(refB)));
}
@Test
public void canFinishMulipleRefsForSameUri() throws Exception {
- ReplicateRefUpdate refUpdateA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
- ReplicateRefUpdate refUpdateB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
- UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA);
- UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB);
+ ReplicateRefUpdate refUpdateA =
+ ReplicateRefUpdate.create(PROJECT, Set.of("refA"), URISH, REMOTE);
+ ReplicateRefUpdate refUpdateB =
+ ReplicateRefUpdate.create(PROJECT, Set.of("refB"), URISH, REMOTE);
+ UriUpdates uriUpdatesA = new TestUriUpdates(refUpdateA);
+ UriUpdates uriUpdatesB = new TestUriUpdates(refUpdateB);
storage.create(refUpdateA);
storage.create(refUpdateB);
storage.start(uriUpdatesA);
storage.start(uriUpdatesB);
storage.finish(uriUpdatesA);
- assertThatStream(storage.streamRunning()).containsExactly(refUpdateB);
+ assertThatStream(storage.streamRunning())
+ .containsExactly(ReplicateRefUpdate.create(refUpdateB, refUpdateB.sha1()));
storage.finish(uriUpdatesB);
assertThatStream(storage.streamRunning()).isEmpty();
@@ -242,7 +265,7 @@
storage.start(uriUpdates);
storage.reset(uriUpdates);
- assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(STORED_REF_UPDATE);
assertThatStream(storage.streamRunning()).isEmpty();
}
@@ -253,7 +276,7 @@
storage.reset(uriUpdates);
storage.start(uriUpdates);
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE);
assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
@@ -273,7 +296,7 @@
storage.start(uriUpdates);
storage.recoverAll();
- assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(STORED_REF_UPDATE);
assertThatStream(storage.streamRunning()).isEmpty();
assertTrue(storage.isWaiting(uriUpdates));
}
@@ -285,7 +308,7 @@
storage.recoverAll();
storage.start(uriUpdates);
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE);
assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
@@ -298,17 +321,18 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
- UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+ UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
storage.start(uriUpdates);
storage.start(uriUpdatesB);
storage.recoverAll();
- assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamWaiting())
+ .containsExactly(STORED_REF_UPDATE, ReplicateRefUpdate.create(updateB, updateB.sha1()));
}
@Test
@@ -316,22 +340,23 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
- UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+ UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
storage.start(uriUpdates);
storage.start(uriUpdatesB);
storage.recoverAll();
+ ReplicateRefUpdate storedUpdateB = ReplicateRefUpdate.create(updateB, updateB.sha1());
storage.start(uriUpdates);
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
- assertThatStream(storage.streamWaiting()).containsExactly(updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(storedUpdateB);
storage.start(uriUpdatesB);
- assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(STORED_REF_UPDATE, storedUpdateB);
assertThatStream(storage.streamWaiting()).isEmpty();
storage.finish(uriUpdates);
@@ -358,10 +383,10 @@
ReplicateRefUpdate updateB =
ReplicateRefUpdate.create(
PROJECT,
- REF,
+ Set.of(REF),
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
REMOTE);
- UriUpdates uriUpdatesB = TestUriUpdates.create(updateB);
+ UriUpdates uriUpdatesB = new TestUriUpdates(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
storage.start(uriUpdates);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
index 901200b..5881ea7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -22,14 +22,14 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefEvent;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class TestDispatcher implements EventDispatcher {
- private final List<ProjectEvent> projectEvents = new LinkedList<>();
- private final List<RefEvent> refEvents = new LinkedList<>();
- private final List<Event> events = new LinkedList<>();
+ private final List<ProjectEvent> projectEvents = new ArrayList<>();
+ private final List<RefEvent> refEvents = new ArrayList<>();
+ private final List<Event> events = new ArrayList<>();
@Override
public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index f61114e..f6eec83 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -14,38 +14,43 @@
package com.googlesource.gerrit.plugins.replication;
-import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.entities.Project;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
-import java.util.Collections;
import java.util.Set;
import org.eclipse.jgit.transport.URIish;
-@AutoValue
-public abstract class TestUriUpdates implements UriUpdates {
- public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException {
- return create(
- Project.nameKey(update.project()),
- new URIish(update.uri()),
- update.remote(),
- Collections.singleton(update.ref()));
- }
+public class TestUriUpdates implements UriUpdates {
+ private final Project.NameKey project;
+ private final URIish uri;
+ private final String remote;
+ private final Set<ImmutableSet<String>> refs;
- public static TestUriUpdates create(
- Project.NameKey project, URIish uri, String remote, Set<String> refs) {
- return new AutoValue_TestUriUpdates(project, uri, remote, refs);
+ public TestUriUpdates(ReplicateRefUpdate update) throws URISyntaxException {
+ project = Project.nameKey(update.project());
+ uri = new URIish(update.uri());
+ remote = update.remote();
+ refs = Set.of(update.refs());
}
@Override
- public abstract Project.NameKey getProjectNameKey();
+ public Project.NameKey getProjectNameKey() {
+ return project;
+ }
@Override
- public abstract URIish getURI();
+ public URIish getURI() {
+ return uri;
+ }
@Override
- public abstract String getRemoteName();
+ public String getRemoteName() {
+ return remote;
+ }
@Override
- public abstract Set<String> getRefs();
+ public Set<ImmutableSet<String>> getRefs() {
+ return refs;
+ }
}