Merge branch 'stable-3.3'
* stable-3.3:
Call retryDone() when giving up after lock failures
Fix issue with task cleanup after retry
Change-Id: Ib2216e3b06ea62cb06c22ad955a8c252f3bacccc
diff --git a/BUILD b/BUILD
index ee97660..4c74bf2 100644
--- a/BUILD
+++ b/BUILD
@@ -14,6 +14,9 @@
"Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.SshModule",
],
resources = glob(["src/main/resources/**/*"]),
+ deps = [
+ "//lib/auto:auto-value-gson",
+ ],
)
junit_tests(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 782ff4f..a0d9624 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -77,6 +77,11 @@
}
@Override
+ public int getMaxRefsToShow() {
+ return currentConfig.getMaxRefsToShow();
+ }
+
+ @Override
public Path getEventsDirectory() {
return currentConfig.getEventsDirectory();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java
new file mode 100644
index 0000000..5b27176
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gson.TypeAdapterFactory;
+import com.ryanharter.auto.value.gson.GsonTypeAdapterFactory;
+
+@GsonTypeAdapterFactory
+public abstract class AutoValueTypeAdapterFactory implements TypeAdapterFactory {
+ public static TypeAdapterFactory create() {
+ return new AutoValueGson_AutoValueTypeAdapterFactory();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
new file mode 100644
index 0000000..89b97e9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -0,0 +1,179 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.flogger.FluentLogger;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Stream;
+
+/**
+ * Non-greedily schedules consecutive tasks in a executor, one for each item returned by an {@link
+ * Iterator}.
+ *
+ * <p>This scheduler is useful when an {@link Iterator} might provide a large amount of items to be
+ * worked on in a non prioritized fashion. This scheduler will scavenge any unused threads in its
+ * executor to do work, however only one item will ever be outstanding and waiting at a time. This
+ * scheduling policy allows large collections to be processed without interfering much with higher
+ * prioritized tasks while still making regular progress on the items provided by the {@link
+ * Iterator}. To keep the level of interference to a minimum, ensure that the amount of work needed
+ * for each item is small and short.
+ */
+public class ChainedScheduler<T> {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ /**
+ * Override to implement a common thread pool task for all the items returned by the {@link
+ * Iterator}
+ */
+ public interface Runner<T> {
+ /** Will get executed in the thread pool task for each item */
+ void run(T item);
+
+ /** Will get called after the last item completes */
+ default void onDone() {}
+
+ /** Will get called to display {@link Runnable} for item in show-queue output */
+ default String toString(T item) {
+ return "Chained " + item.toString();
+ }
+ }
+
+ /** Override to decorate an existing {@link Runner} */
+ public static class ForwardingRunner<T> implements Runner<T> {
+ protected Runner<T> delegateRunner;
+
+ public ForwardingRunner(Runner<T> delegate) {
+ delegateRunner = delegate;
+ }
+
+ @Override
+ public void run(T item) {
+ delegateRunner.run(item);
+ }
+
+ @Override
+ public void onDone() {
+ delegateRunner.onDone();
+ }
+
+ @Override
+ public String toString(T item) {
+ return delegateRunner.toString(item);
+ }
+ }
+
+ /**
+ * Use when a {@link Stream} is needed instead of an {@link Iterator}, it will close the {@link
+ * Stream}
+ */
+ public static class StreamScheduler<T> extends ChainedScheduler<T> {
+ public StreamScheduler(
+ ScheduledExecutorService threadPool, final Stream<T> stream, Runner<T> runner) {
+ super(
+ threadPool,
+ stream.iterator(),
+ new ForwardingRunner<T>(runner) {
+ @Override
+ public void onDone() {
+ stream.close();
+ super.onDone();
+ }
+ });
+ }
+ }
+
+ /** Internal {@link Runnable} containing one item to run and which schedules the next one. */
+ protected class Chainer implements Runnable {
+ protected T item;
+
+ public Chainer(T item) {
+ this.item = item;
+ }
+
+ @Override
+ public void run() {
+ boolean scheduledNext = scheduleNext();
+ try {
+ runner.run(item);
+ } catch (RuntimeException e) { // catch to prevent chain from breaking
+ logger.atSevere().withCause(e).log("Error while running: " + item);
+ }
+ if (!scheduledNext) {
+ runner.onDone();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return runner.toString(item);
+ }
+ }
+
+ protected final ScheduledExecutorService threadPool;
+ protected final Iterator<T> iterator;
+ protected final Runner<T> runner;
+
+ /**
+ * Note: The {@link Iterator} passed in will only ever be accessed from one thread at a time, and
+ * the internal state of the {@link Iterator} will be updated after each next() call before
+ * operating on the iterator again.
+ */
+ public ChainedScheduler(
+ ScheduledExecutorService threadPool, Iterator<T> iterator, Runner<T> runner) {
+ this.threadPool = threadPool;
+ this.iterator = iterator;
+ this.runner = runner;
+
+ if (!scheduleNext()) {
+ runner.onDone();
+ }
+ }
+
+ /**
+ * Concurrency note:
+ *
+ * <p>Since there is only one chain of tasks and each task submits the next task to the executor,
+ * the calls from here to the iterator.next() call will never be executed concurrently by more
+ * than one thread.
+ *
+ * <p>Data synchronization note:
+ *
+ * <p>This section in the [1] javadoc: "Actions in a thread prior to the submission of a Runnable
+ * to an Executor happen-before its execution begins..." guarantees that since the iterator.next()
+ * happens before the schedule(), and the new tasks call to hasNext() happen after the submission,
+ * that the hasNext() call will see the results of the previous next() call even though it may
+ * have happened on a different thread.
+ *
+ * <p>[1]
+ * <li>https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html
+ * (section "Memory Consistency Properties")
+ * <li>The methods of all classes in java.util.concurrent and its subpackages extends the
+ * guarantees of the java memory model to higher-level synchronization.
+ * <li>In particular this guarantee of the java.util.concurrent applies here:
+ */
+ protected boolean scheduleNext() {
+ if (!iterator.hasNext()) {
+ return false;
+ }
+
+ schedule(new Chainer(iterator.next()));
+ return true;
+ }
+
+ protected void schedule(Runnable r) {
+ threadPool.execute(r);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index 8ea7227..965ca94 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -22,36 +22,46 @@
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
import java.util.Optional;
import org.eclipse.jgit.transport.URIish;
public class DeleteProjectTask implements Runnable {
interface Factory {
- DeleteProjectTask create(URIish replicateURI, Project.NameKey project);
+
+ DeleteProjectTask create(
+ URIish replicateURI, Project.NameKey project, ProjectDeletionState state);
}
private final DynamicItem<AdminApiFactory> adminApiFactory;
private final int id;
private final URIish replicateURI;
private final Project.NameKey project;
+ private final ProjectDeletionState state;
@Inject
DeleteProjectTask(
DynamicItem<AdminApiFactory> adminApiFactory,
IdGenerator ig,
+ @Assisted ProjectDeletionState state,
@Assisted URIish replicateURI,
@Assisted Project.NameKey project) {
this.adminApiFactory = adminApiFactory;
this.id = ig.next();
this.replicateURI = replicateURI;
this.project = project;
+ this.state = state;
}
@Override
public void run() {
Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
if (adminApi.isPresent()) {
- adminApi.get().deleteProject(project);
+ if (adminApi.get().deleteProject(project)) {
+ state.setSucceeded(replicateURI);
+ } else {
+ state.setFailed(replicateURI);
+ }
return;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index bd34676..dfe7e79 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.replication;
import static com.google.gerrit.server.project.ProjectCache.noSuchProject;
-import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING;
import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.REJECTED_OTHER_REASON;
@@ -64,6 +63,9 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -432,13 +434,16 @@
ScheduledFuture<?> ignored =
pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
pending.put(uri, task);
+ repLog.atInfo().log(
+ "scheduled %s:%s => %s to run %s",
+ project, ref, task, now ? "now" : "after " + config.getDelay() + "s");
} else {
addRef(task, ref);
task.addState(ref, state);
+ repLog.atInfo().log(
+ "consolidated %s:%s => %s with an existing pending push", project, ref, task);
}
state.increasePushTaskCount(project.get(), ref);
- repLog.atInfo().log(
- "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay());
}
}
@@ -457,10 +462,12 @@
}
}
- void scheduleDeleteProject(URIish uri, Project.NameKey project) {
+ void scheduleDeleteProject(URIish uri, Project.NameKey project, ProjectDeletionState state) {
+ repLog.atFine().log("scheduling deletion of project %s at %s", project, uri);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
- pool.schedule(deleteProjectFactory.create(uri, project), 0, TimeUnit.SECONDS);
+ pool.schedule(deleteProjectFactory.create(uri, project, state), 0, TimeUnit.SECONDS);
+ state.setScheduled(uri);
}
void scheduleUpdateHead(URIish uri, Project.NameKey project, String newHead) {
@@ -586,7 +593,7 @@
if (inFlightOp != null) {
return RunwayStatus.denied(inFlightOp.getId());
}
- replicationTasksStorage.get().start(op);
+ op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op)));
inFlight.put(op.getURI(), op);
}
return RunwayStatus.allowed();
@@ -782,10 +789,9 @@
private void postReplicationScheduledEvent(PushOne pushOp, String inputRef) {
Set<String> refs = inputRef == null ? pushOp.getRefs() : ImmutableSet.of(inputRef);
Project.NameKey project = pushOp.getProjectNameKey();
- String targetNode = resolveNodeName(pushOp.getURI());
for (String ref : refs) {
ReplicationScheduledEvent event =
- new ReplicationScheduledEvent(project.get(), ref, targetNode);
+ new ReplicationScheduledEvent(project.get(), ref, pushOp.getURI());
try {
eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
} catch (PermissionBackendException e) {
@@ -796,10 +802,9 @@
private void postReplicationFailedEvent(PushOne pushOp, RemoteRefUpdate.Status status) {
Project.NameKey project = pushOp.getProjectNameKey();
- String targetNode = resolveNodeName(pushOp.getURI());
for (String ref : pushOp.getRefs()) {
RefReplicatedEvent event =
- new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status);
+ new RefReplicatedEvent(project.get(), ref, pushOp.getURI(), RefPushResult.FAILED, status);
try {
eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
} catch (PermissionBackendException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
index 5b24bf5..b915d0d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -137,6 +137,11 @@
}
@Override
+ public int getMaxRefsToShow() {
+ return replicationConfig.getMaxRefsToShow();
+ }
+
+ @Override
public Path getEventsDirectory() {
return replicationConfig.getEventsDirectory();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index d195aa3..0fa02ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -68,7 +68,8 @@
if (exitCode == 1) {
logger.atInfo().log(
"DeleteProject plugin is not installed on %s;"
- + " will not try to forward this operation to that host");
+ + " will not try to forward this operation to that host",
+ uri);
withoutDeleteProjectPlugin.add(uri);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index ebc8889..87c35ee 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -52,6 +52,7 @@
import com.jcraft.jsch.JSchException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -61,6 +62,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.eclipse.jgit.errors.NoRemoteRepositoryException;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.RemoteRepositoryException;
@@ -212,7 +214,7 @@
@Override
public String toString() {
- String print = "[" + HexFormat.fromInt(id) + "] push " + uri;
+ String print = "[" + HexFormat.fromInt(id) + "] push " + uri + " " + getLimitedRefs();
if (retryCount > 0) {
print = "(retry " + retryCount + ") " + print;
@@ -220,6 +222,36 @@
return print;
}
+ /**
+ * Returns a string of refs limited to the maxRefsToShow config with count of total refs hidden
+ * when there are more refs than maxRefsToShow config.
+ *
+ * <ul>
+ * <li>Refs will not be limited when maxRefsToShow config is set to zero.
+ * <li>By default output will be limited to two refs.
+ * </ul>
+ *
+ * The default value of two is chosen because whenever a new patchset is created there are two
+ * refs to be replicated(change ref and meta ref).
+ *
+ * @return Space separated string of refs (in square bracket) limited to the maxRefsToShow with
+ * count of total refs hidden(in parentheses) when there are more refs than maxRefsToShow
+ * config.
+ */
+ protected String getLimitedRefs() {
+ Set<String> refs = getRefs();
+ int maxRefsToShow = replConfig.getMaxRefsToShow();
+ if (maxRefsToShow == 0) {
+ maxRefsToShow = refs.size();
+ }
+ String refsString = refs.stream().limit(maxRefsToShow).collect(Collectors.joining(" "));
+ int hiddenRefs = refs.size() - maxRefsToShow;
+ if (hiddenRefs > 0) {
+ refsString += " (+" + hiddenRefs + ")";
+ }
+ return "[" + refsString + "]";
+ }
+
boolean isRetrying() {
return retrying;
}
@@ -270,6 +302,28 @@
}
}
+ Set<String> setStartedRefs(Set<String> startedRefs) {
+ Set<String> notAttemptedRefs = Sets.difference(delta, startedRefs);
+ pushAllRefs = false;
+ delta.clear();
+ addRefs(startedRefs);
+ return notAttemptedRefs;
+ }
+
+ void notifyNotAttempted(Set<String> notAttemptedRefs) {
+ notAttemptedRefs.forEach(
+ ref ->
+ Arrays.asList(getStatesByRef(ref))
+ .forEach(
+ state ->
+ state.notifyRefReplicated(
+ projectName.get(),
+ ref,
+ uri,
+ RefPushResult.NOT_ATTEMPTED,
+ RemoteRefUpdate.Status.UP_TO_DATE)));
+ }
+
void addState(String ref, ReplicationState state) {
stateMap.put(ref, state);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 92ba4be..5450dd5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -20,6 +20,8 @@
import com.google.gerrit.server.events.RefEvent;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jgit.transport.RemoteRefUpdate;
@@ -191,7 +193,7 @@
URIish uri,
RefPushResult status,
RemoteRefUpdate.Status refStatus) {
- postEvent(new RefReplicatedEvent(project, ref, resolveNodeName(uri), status, refStatus));
+ postEvent(new RefReplicatedEvent(project, ref, uri, status, refStatus));
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 8bbb180..ec75450 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -60,6 +60,13 @@
int getMaxRefsToLog();
/**
+ * Returns the maximum number of replicating refs to show in the show-queue output
+ *
+ * @return maximum number of refs to show, 2 by default.
+ */
+ int getMaxRefsToShow();
+
+ /**
* Configured location where the replication events are stored on the filesystem for being resumed
* and kept across restarts.
*
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 2631cbe..f5c6185 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -34,6 +34,7 @@
private boolean replicateAllOnPluginStart;
private boolean defaultForceUpdate;
private int maxRefsToLog;
+ private final int maxRefsToShow;
private int sshCommandTimeout;
private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
private final FileBasedConfig config;
@@ -54,6 +55,7 @@
this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
+ this.maxRefsToShow = config.getInt("gerrit", "maxRefsToShow", 2);
this.pluginDataDir = pluginDataDir;
}
@@ -96,6 +98,11 @@
}
@Override
+ public int getMaxRefsToShow() {
+ return maxRefsToShow;
+ }
+
+ @Override
public Path getEventsDirectory() {
String eventsDirectory = config.getString("replication", null, "eventsDirectory");
if (!Strings.isNullOrEmpty(eventsDirectory)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index c2b96a1..4d2faed 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -32,6 +32,10 @@
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.internal.UniqueAnnotations;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -77,6 +81,7 @@
.to(StartReplicationCapability.class);
install(new FactoryModuleBuilder().build(PushAll.Factory.class));
+ install(new FactoryModuleBuilder().build(ProjectDeletionState.Factory.class));
bind(EventBus.class).in(Scopes.SINGLETON);
bind(ReplicationDestinations.class).to(DestinationsCollection.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 990e387..ed474ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -33,13 +33,17 @@
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState;
import java.net.URISyntaxException;
+import java.util.Collection;
import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jgit.transport.URIish;
/** Manages automatic replication to remote repositories. */
@@ -59,8 +63,9 @@
private final DynamicItem<EventDispatcher> dispatcher;
private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
private final ReplicationTasksStorage replicationTasksStorage;
+ private final ProjectDeletionState.Factory projectDeletionStateFactory;
private volatile boolean running;
- private volatile boolean replaying;
+ private final AtomicBoolean replaying = new AtomicBoolean();
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
private Distributor distributor;
@@ -71,7 +76,8 @@
Provider<ReplicationDestinations> rd,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
- ReplicationTasksStorage rts) {
+ ReplicationTasksStorage rts,
+ ProjectDeletionState.Factory pd) {
replConfig = rc;
workQueue = wq;
dispatcher = dis;
@@ -79,6 +85,7 @@
stateLog = sl;
replicationTasksStorage = rts;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
+ projectDeletionStateFactory = pd;
}
@Override
@@ -86,10 +93,8 @@
if (!running) {
destinations.get().startup(workQueue);
running = true;
- replicationTasksStorage.resetAll();
- Thread t = new Thread(this::firePendingEvents, "firePendingEvents");
- t.setDaemon(true);
- t.start();
+ replicationTasksStorage.recoverAll();
+ firePendingEvents();
fireBeforeStartupEvents();
distributor = new Distributor(workQueue);
}
@@ -112,7 +117,7 @@
@Override
public boolean isReplaying() {
- return replaying;
+ return replaying.get();
}
public void scheduleFullSync(
@@ -177,7 +182,7 @@
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
replicationTasksStorage.create(
- new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ ReplicateRefUpdate.create(project.get(), refName, uri, cfg.getRemoteConfigName()));
cfg.schedule(project, refName, uri, state, now);
}
} else {
@@ -189,20 +194,33 @@
}
private void firePendingEvents() {
- replaying = true;
- try {
- replaying = true;
- for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
- try {
- fire(new URIish(t.uri), Project.nameKey(t.project), t.ref);
- } catch (URISyntaxException e) {
- repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t);
- }
- }
- } catch (Throwable e) {
- repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
- } finally {
- replaying = false;
+ if (replaying.compareAndSet(false, true)) {
+ new ChainedScheduler.StreamScheduler<>(
+ workQueue.getDefaultQueue(),
+ replicationTasksStorage.streamWaiting(),
+ new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
+ @Override
+ public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
+ try {
+ fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+ } catch (URISyntaxException e) {
+ repLog.atSevere().withCause(e).log(
+ "Encountered malformed URI for persisted event %s", u);
+ } catch (Throwable e) {
+ repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
+ }
+ }
+
+ @Override
+ public void onDone() {
+ replaying.set(false);
+ }
+
+ @Override
+ public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
+ return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
+ }
+ });
}
}
@@ -233,8 +251,12 @@
@Override
public void onProjectDeleted(ProjectDeletedListener.Event event) {
Project.NameKey p = Project.nameKey(event.getProjectName());
- destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
- .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p));
+ ProjectDeletionState state = projectDeletionStateFactory.create(p);
+ Collection<Map.Entry<Destination, URIish>> projectsToDelete =
+ destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries();
+
+ projectsToDelete.forEach(e -> state.setToProcess(e.getValue()));
+ projectsToDelete.forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p, state));
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index b948be0..871ed52 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -79,7 +79,7 @@
pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
RefReplicationStatus completedRefStatus = null;
- boolean allPushTaksCompleted = false;
+ boolean allPushTasksCompleted = false;
countingLock.lock();
try {
RefReplicationStatus refStatus = getRefStatus(project, ref);
@@ -90,7 +90,7 @@
if (refStatus.allDone()) {
completedRefStatus = statusByProjectRef.remove(project, ref);
}
- allPushTaksCompleted = finishedPushTasksCount.get() == totalPushTasksCount.get();
+ allPushTasksCompleted = finishedPushTasksCount.get() == totalPushTasksCount.get();
}
} finally {
countingLock.unlock();
@@ -100,7 +100,7 @@
doRefPushTasksCompleted(completedRefStatus);
}
- if (allPushTaksCompleted) {
+ if (allPushTasksCompleted) {
doAllPushTasksCompleted();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 3947ebc..4736402 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -16,10 +16,13 @@
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
import com.google.common.hash.Hashing;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
import com.google.inject.Inject;
import com.google.inject.ProvisionException;
import com.google.inject.Singleton;
@@ -29,9 +32,9 @@
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
-import java.util.List;
+import java.util.HashSet;
import java.util.Optional;
-import java.util.stream.Collectors;
+import java.util.Set;
import java.util.stream.Stream;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.transport.URIish;
@@ -61,10 +64,11 @@
public class ReplicationTasksStorage {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- public static class ReplicateRefUpdate {
- public static Optional<ReplicateRefUpdate> createOptionally(Path file) {
+ @AutoValue
+ public abstract static class ReplicateRefUpdate {
+ public static Optional<ReplicateRefUpdate> createOptionally(Path file, Gson gson) {
try {
- return Optional.ofNullable(create(file));
+ return Optional.ofNullable(create(file, gson));
} catch (NoSuchFileException e) {
logger.atFine().log("File %s not found while reading task", file);
} catch (IOException e) {
@@ -73,30 +77,40 @@
return Optional.empty();
}
- public static ReplicateRefUpdate create(Path file) throws IOException {
+ public static ReplicateRefUpdate create(Path file, Gson gson) throws IOException {
String json = new String(Files.readAllBytes(file), UTF_8);
- return GSON.fromJson(json, ReplicateRefUpdate.class);
+ return gson.fromJson(json, ReplicateRefUpdate.class);
}
- public final String project;
- public final String ref;
- public final String uri;
- public final String remote;
+ public static ReplicateRefUpdate create(String project, String ref, URIish uri, String remote) {
+ return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate(
+ project, ref, uri.toASCIIString(), remote);
+ }
- public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
- this.project = project;
- this.ref = ref;
- this.uri = uri.toASCIIString();
- this.remote = remote;
+ public abstract String project();
+
+ public abstract String ref();
+
+ public abstract String uri();
+
+ public abstract String remote();
+
+ public String sha1() {
+ return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote())
+ .name();
}
@Override
- public String toString() {
- return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+ public final String toString() {
+ return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote();
+ }
+
+ public static TypeAdapter<ReplicateRefUpdate> typeAdapter(Gson gson) {
+ return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate.GsonTypeAdapter(gson);
}
}
- private static final Gson GSON = new Gson();
+ private final Gson gson;
private final Path buildingUpdates;
private final Path runningUpdates;
@@ -112,16 +126,23 @@
buildingUpdates = refUpdates.resolve("building");
runningUpdates = refUpdates.resolve("running");
waitingUpdates = refUpdates.resolve("waiting");
+ gson =
+ new GsonBuilder().registerTypeAdapterFactory(AutoValueTypeAdapterFactory.create()).create();
}
public synchronized String create(ReplicateRefUpdate r) {
return new Task(r).create();
}
- public synchronized void start(UriUpdates uriUpdates) {
+ public synchronized Set<String> start(UriUpdates uriUpdates) {
+ Set<String> startedRefs = new HashSet<>();
for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) {
- new Task(update).start();
+ Task t = new Task(update);
+ if (t.start()) {
+ startedRefs.add(t.update.ref());
+ }
}
+ return startedRefs;
}
public synchronized void reset(UriUpdates uriUpdates) {
@@ -130,10 +151,8 @@
}
}
- public synchronized void resetAll() {
- for (ReplicateRefUpdate r : list(createDir(runningUpdates))) {
- new Task(r).reset();
- }
+ public synchronized void recoverAll() {
+ streamRunning().forEach(r -> new Task(r).recover());
}
public boolean isWaiting(UriUpdates uriUpdates) {
@@ -148,21 +167,17 @@
}
}
- public List<ReplicateRefUpdate> listWaiting() {
- return list(createDir(waitingUpdates));
+ public Stream<ReplicateRefUpdate> streamWaiting() {
+ return streamRecursive(createDir(waitingUpdates));
}
- public List<ReplicateRefUpdate> listRunning() {
- return list(createDir(runningUpdates));
- }
-
- private List<ReplicateRefUpdate> list(Path taskDir) {
- return streamRecursive(taskDir).collect(Collectors.toList());
+ public Stream<ReplicateRefUpdate> streamRunning() {
+ return streamRecursive(createDir(runningUpdates));
}
private Stream<ReplicateRefUpdate> streamRecursive(Path dir) {
return walkNonDirs(dir)
- .map(path -> ReplicateRefUpdate.createOptionally(path))
+ .map(path -> ReplicateRefUpdate.createOptionally(path, gson))
.filter(Optional::isPresent)
.map(Optional::get);
}
@@ -179,7 +194,7 @@
}
@SuppressWarnings("deprecation")
- private ObjectId sha1(String s) {
+ private static ObjectId sha1(String s) {
return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
}
@@ -200,8 +215,7 @@
public Task(ReplicateRefUpdate update) {
this.update = update;
- String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
- taskKey = sha1(key).name();
+ taskKey = update.sha1();
running = createDir(runningUpdates).resolve(taskKey);
waiting = createDir(waitingUpdates).resolve(taskKey);
}
@@ -211,7 +225,7 @@
return taskKey;
}
- String json = GSON.toJson(update) + "\n";
+ String json = gson.toJson(update) + "\n";
try {
Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
logger.atFine().log("CREATE %s %s", tmp, updateLog());
@@ -224,14 +238,18 @@
return taskKey;
}
- public void start() {
- rename(waiting, running);
+ public boolean start() {
+ return rename(waiting, running);
}
public void reset() {
rename(running, waiting);
}
+ public void recover() {
+ rename(running, waiting);
+ }
+
public boolean isWaiting() {
return Files.exists(waiting);
}
@@ -245,17 +263,19 @@
}
}
- private void rename(Path from, Path to) {
+ private boolean rename(Path from, Path to) {
try {
logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+ return true;
} catch (IOException e) {
logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+ return false;
}
}
private String updateLog() {
- return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
+ return String.format("(%s:%s => %s)", update.project(), update.ref(), update.uri());
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
index 9c56c8e..a9985d2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -34,7 +34,7 @@
return getRefs().stream()
.map(
(ref) ->
- new ReplicationTasksStorage.ReplicateRefUpdate(
+ ReplicationTasksStorage.ReplicateRefUpdate.create(
getProjectNameKey().get(), ref, getURI(), getRemoteName()))
.collect(Collectors.toList());
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationDoneEvent.java
new file mode 100644
index 0000000..4e66743
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationDoneEvent.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.ProjectEvent;
+
+public class ProjectDeletionReplicationDoneEvent extends ProjectEvent {
+ public static final String TYPE = "project-deletion-replication-done";
+
+ private final String project;
+
+ public ProjectDeletionReplicationDoneEvent(String project) {
+ super(TYPE);
+ this.project = project;
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationFailedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationFailedEvent.java
new file mode 100644
index 0000000..e350716
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationFailedEvent.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.ProjectEvent;
+import org.eclipse.jgit.transport.URIish;
+
+public class ProjectDeletionReplicationFailedEvent extends ProjectEvent {
+ public static final String TYPE = "project-deletion-replication-failed";
+
+ private final String project;
+ private final String targetUri;
+
+ public ProjectDeletionReplicationFailedEvent(String project, URIish targetUri) {
+ super(TYPE);
+ this.project = project;
+ this.targetUri = targetUri.toASCIIString();
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+
+ @VisibleForTesting
+ public String getTargetUri() {
+ return targetUri;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationScheduledEvent.java
new file mode 100644
index 0000000..d179106
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationScheduledEvent.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.ProjectEvent;
+import org.eclipse.jgit.transport.URIish;
+
+public class ProjectDeletionReplicationScheduledEvent extends ProjectEvent {
+ public static final String TYPE = "project-deletion-replication-scheduled";
+
+ private final String project;
+ private final String targetUri;
+
+ public ProjectDeletionReplicationScheduledEvent(String project, URIish targetUri) {
+ super(TYPE);
+ this.project = project;
+ this.targetUri = targetUri.toASCIIString();
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+
+ @VisibleForTesting
+ public String getTargetUri() {
+ return targetUri;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationSucceededEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationSucceededEvent.java
new file mode 100644
index 0000000..852c9fe
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationSucceededEvent.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.ProjectEvent;
+import org.eclipse.jgit.transport.URIish;
+
+public class ProjectDeletionReplicationSucceededEvent extends ProjectEvent {
+ public static final String TYPE = "project-deletion-replication-succeeded";
+
+ private final String project;
+ private final String targetUri;
+
+ public ProjectDeletionReplicationSucceededEvent(String project, URIish targetUri) {
+ super(TYPE);
+ this.project = project;
+ this.targetUri = targetUri.toASCIIString();
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+
+ @VisibleForTesting
+ public String getTargetUri() {
+ return targetUri;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionState.java
new file mode 100644
index 0000000..e091915
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionState.java
@@ -0,0 +1,96 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.FAILED;
+import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.SCHEDULED;
+import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.SUCCEEDED;
+import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.TO_PROCESS;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.eclipse.jgit.transport.URIish;
+
+public class ProjectDeletionState {
+ public interface Factory {
+ ProjectDeletionState create(Project.NameKey project);
+ }
+
+ private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final Project.NameKey project;
+ private final ConcurrentMap<URIish, ProjectDeletionStatus> statusByURI =
+ new ConcurrentHashMap<>();
+
+ @Inject
+ public ProjectDeletionState(
+ DynamicItem<EventDispatcher> eventDispatcher, @Assisted Project.NameKey project) {
+ this.eventDispatcher = eventDispatcher;
+ this.project = project;
+ }
+
+ public void setToProcess(URIish uri) {
+ statusByURI.put(uri, TO_PROCESS);
+ }
+
+ public void setScheduled(URIish uri) {
+ setStatusAndBroadcastEvent(
+ uri, SCHEDULED, new ProjectDeletionReplicationScheduledEvent(project.get(), uri));
+ }
+
+ public void setSucceeded(URIish uri) {
+ setStatusAndBroadcastEvent(
+ uri, SUCCEEDED, new ProjectDeletionReplicationSucceededEvent(project.get(), uri));
+ notifyIfDeletionDoneOnAllNodes();
+ }
+
+ public void setFailed(URIish uri) {
+ setStatusAndBroadcastEvent(
+ uri, FAILED, new ProjectDeletionReplicationFailedEvent(project.get(), uri));
+ notifyIfDeletionDoneOnAllNodes();
+ }
+
+ private void setStatusAndBroadcastEvent(
+ URIish uri, ProjectDeletionStatus status, ProjectEvent event) {
+ statusByURI.put(uri, status);
+ eventDispatcher.get().postEvent(project, event);
+ }
+
+ public void notifyIfDeletionDoneOnAllNodes() {
+ synchronized (statusByURI) {
+ if (!statusByURI.isEmpty()
+ && statusByURI.values().stream()
+ .noneMatch(s -> s.equals(TO_PROCESS) || s.equals(SCHEDULED))) {
+
+ statusByURI.clear();
+ eventDispatcher
+ .get()
+ .postEvent(project, new ProjectDeletionReplicationDoneEvent(project.get()));
+ }
+ }
+ }
+
+ public enum ProjectDeletionStatus {
+ TO_PROCESS,
+ SCHEDULED,
+ FAILED,
+ SUCCEEDED;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
similarity index 82%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
index d1ab790..b0c554e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
@@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.replication;
+package com.googlesource.gerrit.plugins.replication.events;
+
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.events.RefEvent;
@@ -20,28 +22,31 @@
import java.util.Objects;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
+import org.eclipse.jgit.transport.URIish;
public class RefReplicatedEvent extends RefEvent {
public static final String TYPE = "ref-replicated";
public final String project;
public final String ref;
- public final String targetNode;
+ @Deprecated public final String targetNode;
+ public final String targetUri;
public final String status;
public final Status refStatus;
public RefReplicatedEvent(
String project,
String ref,
- String targetNode,
+ URIish targetUri,
RefPushResult status,
RemoteRefUpdate.Status refStatus) {
super(TYPE);
this.project = project;
this.ref = ref;
- this.targetNode = targetNode;
+ this.targetNode = resolveNodeName(targetUri);
this.status = status.toString();
this.refStatus = refStatus;
+ this.targetUri = targetUri.toASCIIString();
}
@Override
@@ -69,6 +74,9 @@
if (!Objects.equals(event.targetNode, this.targetNode)) {
return false;
}
+ if (!Objects.equals(event.targetUri, this.targetUri)) {
+ return false;
+ }
if (!Objects.equals(event.status, this.status)) {
return false;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
similarity index 96%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
index e663194..75b8026 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.replication;
+package com.googlesource.gerrit.plugins.replication.events;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.events.RefEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
similarity index 70%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
index 28f6b6b..4a1ade8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
@@ -12,23 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.replication;
+package com.googlesource.gerrit.plugins.replication.events;
+
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.events.RefEvent;
+import org.eclipse.jgit.transport.URIish;
public class ReplicationScheduledEvent extends RefEvent {
public static final String TYPE = "ref-replication-scheduled";
public final String project;
public final String ref;
- public final String targetNode;
+ @Deprecated public final String targetNode;
+ public final String targetUri;
- public ReplicationScheduledEvent(String project, String ref, String targetNode) {
+ public ReplicationScheduledEvent(String project, String ref, URIish targetUri) {
super(TYPE);
this.project = project;
this.ref = ref;
- this.targetNode = targetNode;
+ this.targetNode = resolveNodeName(targetUri);
+ this.targetUri = targetUri.toASCIIString();
}
@Override
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index adb8d4c..ded9d84 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -16,6 +16,12 @@
local path as replication target. This makes e.g. sense if a network
share is mounted to which the repositories should be replicated.
+In multi-primary scenario, any replication work which is already
+in-flight or completed by the other nodes is not performed to
+avoid extra work. This is because, the storage for replication
+events is shared between multiple primaries.(The storage location
+is specified in the config using: `replication.eventsDirectory`).
+
Replication of account data (NoteDb)
------------------------------------
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index d606b7b..f4ea9d6 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -86,6 +86,15 @@
: Number of refs, that are pushed during replication, to be logged.
For printing all refs to the logs, use a value of 0. By default, 0.
+gerrit.maxRefsToShow
+: Number of refs, that are pushed during replication, to be shown
+ in the show-queue output. To show all refs, use a value of 0.
+ By default, 2, because whenever a new patchset is created there
+ are two refs (change ref and meta ref) eg.
+
+ `(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]`
+
+
gerrit.sshCommandTimeout
: Timeout for SSH command execution. If 0, there is no timeout and
the client waits indefinitely. By default, 0.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
new file mode 100644
index 0000000..90191f2
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -0,0 +1,463 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.google.common.collect.ForwardingIterator;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChainedSchedulerTest {
+ /** A simple {@link Runnable} that waits until the start() method is called. */
+ public class WaitingRunnable implements Runnable {
+ protected final CountDownLatch start;
+
+ public WaitingRunnable() {
+ this(new CountDownLatch(1));
+ }
+
+ public WaitingRunnable(CountDownLatch latch) {
+ this.start = latch;
+ }
+
+ @Override
+ public void run() {
+ try {
+ start.await();
+ } catch (InterruptedException e) {
+ missedAwaits.incrementAndGet();
+ }
+ }
+
+ public void start() {
+ start.countDown();
+ }
+ }
+
+ /** A simple {@link Runnable} that can be awaited to start to run. */
+ public static class WaitableRunnable implements Runnable {
+ protected final CountDownLatch started = new CountDownLatch(1);
+
+ @Override
+ public void run() {
+ started.countDown();
+ }
+
+ public boolean isStarted() {
+ return started.getCount() == 0;
+ }
+
+ public boolean awaitStart(int itemsBefore) {
+ try {
+ return started.await(SECONDS_SYNCHRONIZE * itemsBefore, SECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ }
+
+ /** An {@link Iterator} wrapper which keeps track of how many times next() has been called. */
+ public static class CountingIterator extends ForwardingIterator<String> {
+ public volatile int count = 0;
+
+ protected Iterator<String> delegate;
+
+ public CountingIterator(Iterator<String> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public synchronized String next() {
+ count++;
+ return super.next();
+ }
+
+ @Override
+ protected Iterator<String> delegate() {
+ return delegate;
+ }
+ }
+
+ /** A {@link ChainedScheduler.Runner} which keeps track of completion and counts. */
+ public static class TestRunner implements ChainedScheduler.Runner<String> {
+ protected final AtomicInteger runCount = new AtomicInteger(0);
+ protected final CountDownLatch onDone = new CountDownLatch(1);
+
+ @Override
+ public void run(String item) {
+ incrementAndGet();
+ }
+
+ public int runCount() {
+ return runCount.get();
+ }
+
+ @Override
+ public void onDone() {
+ onDone.countDown();
+ }
+
+ public boolean isDone() {
+ return onDone.getCount() <= 0;
+ }
+
+ public boolean awaitDone(int items) {
+ try {
+ return onDone.await(items * SECONDS_SYNCHRONIZE, SECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ protected int incrementAndGet() {
+ return runCount.incrementAndGet();
+ }
+ }
+
+ /**
+ * A {@link TestRunner} that can be awaited to start to run and additionally will wait until
+ * increment() or runOnewRandomStarted() is called to complete.
+ */
+ public class WaitingRunner extends TestRunner {
+ protected class RunContext extends WaitableRunnable {
+ CountDownLatch run = new CountDownLatch(1);
+ CountDownLatch ran = new CountDownLatch(1);
+ int count;
+
+ @Override
+ public void run() {
+ super.run();
+ try {
+ run.await();
+ count = incrementAndGet();
+ ran.countDown();
+ } catch (InterruptedException e) {
+ missedAwaits.incrementAndGet();
+ }
+ }
+
+ public synchronized boolean startIfNotRunning() throws InterruptedException {
+ if (run.getCount() > 0) {
+ increment();
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized int increment() throws InterruptedException {
+ run.countDown();
+ ran.await(); // no timeout needed as RunContext.run() calls countDown unless interrupted
+ return count;
+ }
+ }
+
+ protected final Map<String, RunContext> ctxByItem = new ConcurrentHashMap<>();
+
+ @Override
+ public void run(String item) {
+ context(item).run();
+ }
+
+ public void runOneRandomStarted() throws InterruptedException {
+ while (true) {
+ for (RunContext ctx : ctxByItem.values()) {
+ if (ctx.isStarted()) {
+ if (ctx.startIfNotRunning()) {
+ return;
+ }
+ }
+ }
+ MILLISECONDS.sleep(1);
+ }
+ }
+
+ public boolean awaitStart(String item, int itemsBefore) {
+ return context(item).awaitStart(itemsBefore);
+ }
+
+ public int increment(String item) throws InterruptedException {
+ return increment(item, 1);
+ }
+
+ public int increment(String item, int itemsBefore) throws InterruptedException {
+ awaitStart(item, itemsBefore);
+ return context(item).increment();
+ }
+
+ protected RunContext context(String item) {
+ return ctxByItem.computeIfAbsent(item, k -> new RunContext());
+ }
+ }
+
+ // Time for one synchronization event such as await(), or variable
+ // incrementing across threads to take
+ public static final int SECONDS_SYNCHRONIZE = 3;
+ public static final int MANY_ITEMS_SIZE = 1000;
+
+ public static String FIRST = item(1);
+ public static String SECOND = item(2);
+ public static String THIRD = item(3);
+
+ public final AtomicInteger missedAwaits = new AtomicInteger(0); // non-zero signals an error
+
+ @Before
+ public void setup() {
+ missedAwaits.set(0);
+ }
+
+ @Test
+ public void emptyCompletesImmediately() {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ TestRunner runner = new TestRunner();
+ List<String> items = new ArrayList<>();
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+ assertThat(runner.awaitDone(1)).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void oneItemCompletes() throws Exception {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ TestRunner runner = new TestRunner();
+ List<String> items = new ArrayList<>();
+ items.add(FIRST);
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+ assertThat(runner.awaitDone(1)).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void manyItemsAllComplete() throws Exception {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ TestRunner runner = new TestRunner();
+ List<String> items = createManyItems();
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+ assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(items.size());
+ }
+
+ @Test
+ public void exceptionInTaskDoesNotAbortIteration() throws Exception {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ TestRunner runner =
+ new TestRunner() {
+ @Override
+ public void run(String item) {
+ super.run(item);
+ throw new RuntimeException();
+ }
+ };
+ List<String> items = new ArrayList<>();
+ items.add(FIRST);
+ items.add(SECOND);
+ items.add(THIRD);
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+ assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(items.size());
+ }
+
+ @Test
+ public void onDoneNotCalledBeforeAllCompleted() throws Exception {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ WaitingRunner runner = new WaitingRunner();
+ List<String> items = createManyItems();
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+ for (int i = 1; i <= items.size(); i++) {
+ assertThat(runner.isDone()).isEqualTo(false);
+ runner.runOneRandomStarted();
+ }
+
+ assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(items.size());
+ assertThat(missedAwaits.get()).isEqualTo(0);
+ }
+
+ @Test
+ public void otherTasksOnlyEverWaitForAtMostOneRunningPlusOneWaiting() throws Exception {
+ for (int threads = 1; threads <= 10; threads++) {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ WaitingRunner runner = new WaitingRunner();
+ List<String> items = createItems(threads + 1 /* Running */ + 1 /* Waiting */);
+ CountingIterator it = new CountingIterator(items.iterator());
+
+ new ChainedScheduler<>(executor, it, runner);
+ assertThat(runner.awaitStart(FIRST, 1)).isEqualTo(true); // Confirms at least one Running
+ assertThat(it.count).isGreaterThan(1); // Confirms at least one extra Waiting or Running
+ assertThat(it.count).isLessThan(items.size()); // Confirms at least one still not queued
+
+ WaitableRunnable external = new WaitableRunnable();
+ executor.execute(external);
+ assertThat(external.isStarted()).isEqualTo(false);
+
+ // Completes 2, (at most one Running + 1 Waiting)
+ assertThat(runner.increment(FIRST)).isEqualTo(1); // Was Running
+ assertThat(runner.increment(SECOND)).isEqualTo(2); // Was Waiting
+ // Asserts that the one that still needed to be queued is not blocking this external task
+ assertThat(external.awaitStart(1)).isEqualTo(true);
+
+ for (int i = 3; i <= items.size(); i++) {
+ runner.increment(item(i));
+ }
+ assertThat(missedAwaits.get()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void saturatesManyFreeThreads() throws Exception {
+ int threads = 10;
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);
+ WaitingRunner runner = new WaitingRunner();
+ List<String> items = createManyItems();
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+
+ for (int j = 1; j <= MANY_ITEMS_SIZE; j += threads) {
+ // If #threads items can start before any complete, it proves #threads are
+ // running in parallel and saturating all available threads.
+ for (int i = j; i < j + threads; i++) {
+ assertThat(runner.awaitStart(item(i), threads)).isEqualTo(true);
+ }
+ for (int i = j; i < j + threads; i++) {
+ assertThat(runner.increment(item(i))).isEqualTo(i);
+ }
+ }
+
+ assertThat(runner.awaitDone(threads)).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(items.size());
+ assertThat(missedAwaits.get()).isEqualTo(0);
+ }
+
+ @Test
+ public void makesProgressEvenWhenSaturatedByOtherTasks() throws Exception {
+ int blockSize = 5; // how many batches to queue at once
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(blockSize);
+ List<String> items = createManyItems();
+ WaitingRunner runner = new WaitingRunner();
+
+ int batchSize = 5; // how many tasks are started concurrently
+ Queue<CountDownLatch> batches = new LinkedList<>();
+ for (int b = 0; b < blockSize; b++) {
+ batches.add(executeWaitingRunnableBatch(batchSize, executor));
+ }
+
+ new ChainedScheduler<>(executor, items.iterator(), runner);
+
+ for (int i = 1; i <= items.size(); i++) {
+ for (int b = 0; b < blockSize; b++) {
+ // Ensure saturation by always having at least a full thread count of
+ // other tasks waiting in the queue after the waiting item so that when
+ // one batch is executed, and the item then executes, there will still
+ // be at least a full batch waiting.
+ batches.add(executeWaitingRunnableBatch(batchSize, executor));
+ batches.remove().countDown();
+ }
+ assertThat(runner.increment(item(i), batchSize)).isEqualTo(i); // Assert progress can be made
+ }
+ assertThat(runner.runCount()).isEqualTo(items.size());
+
+ while (batches.size() > 0) {
+ batches.remove().countDown();
+ }
+ assertThat(missedAwaits.get()).isEqualTo(0);
+ }
+
+ @Test
+ public void forwardingRunnerForwards() throws Exception {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ TestRunner runner = new TestRunner();
+ List<String> items = createManyItems();
+
+ new ChainedScheduler<>(
+ executor, items.iterator(), new ChainedScheduler.ForwardingRunner<>(runner));
+ assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(items.size());
+ }
+
+ @Test
+ public void streamSchedulerClosesStream() throws Exception {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ WaitingRunner runner = new WaitingRunner();
+ List<String> items = new ArrayList<>();
+ items.add(FIRST);
+ items.add(SECOND);
+
+ final AtomicBoolean closed = new AtomicBoolean(false);
+ Object closeRecorder =
+ new Object() {
+ @SuppressWarnings("unused") // Called via reflection
+ public void close() {
+ closed.set(true);
+ }
+ };
+ @SuppressWarnings("unchecked") // Stream.class is converted to Stream<String>.class
+ Stream<String> stream = ForwardingProxy.create(Stream.class, items.stream(), closeRecorder);
+
+ new ChainedScheduler.StreamScheduler<>(executor, stream, runner);
+ assertThat(closed.get()).isEqualTo(false);
+
+ // Since there is only a single thread, the Stream cannot get closed before this runs
+ runner.increment(FIRST);
+ // The Stream should get closed as the last item (SECOND) runs, before its runner is called
+ runner.increment(SECOND); // Ensure the last item's runner has already been called
+ assertThat(runner.awaitDone(items.size())).isEqualTo(true);
+ assertThat(runner.runCount()).isEqualTo(items.size());
+ assertThat(closed.get()).isEqualTo(true);
+ }
+
+ protected CountDownLatch executeWaitingRunnableBatch(
+ int batchSize, ScheduledThreadPoolExecutor executor) {
+ CountDownLatch latch = new CountDownLatch(1);
+ for (int e = 0; e < batchSize; e++) {
+ executor.execute(new WaitingRunnable(latch));
+ }
+ return latch;
+ }
+
+ protected static List<String> createManyItems() {
+ return createItems(MANY_ITEMS_SIZE);
+ }
+
+ protected static List<String> createItems(int count) {
+ List<String> items = new ArrayList<>();
+ for (int i = 1; i <= count; i++) {
+ items.add(item(i));
+ }
+ return items;
+ }
+
+ protected static String item(int i) {
+ return "Item #" + i;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
new file mode 100644
index 0000000..a1f61fe
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -0,0 +1,81 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+/**
+ * A ForwardingProxy creates a Proxy which forwards all method calls to its delegate except for
+ * calls to methods which are implemented by its overrider's class.
+ *
+ * <p>Using this Proxy class makes it possible to use the delegate pattern on any interface without
+ * having to implement any of the interface's methods which directly forward their calls to the
+ * delegate. Using this is intended to make forwarding automated, easy, and less error prone by
+ * making it possible to implement the delegate pattern with an overrider object which only
+ * implements those methods which need overridden functionality and which will not directly forward
+ * their calls to the delegate.
+ *
+ * <p>The overrider object will be assumed to not implement any default java Object methods which
+ * are not overridden, as that would likely not be desirable behavior, and thus the Proxy will not
+ * forward those methods to the overrider unless the overrider overrides them.
+ *
+ * <p>If an overrider needs to make calls to the delegate, this can be achieved by passing the
+ * delegate into the overrider during construction.
+ */
+public class ForwardingProxy {
+ protected static class Handler<T> implements InvocationHandler {
+ protected T delegate;
+ protected Object overrider;
+
+ protected Handler(T delegate, Object overrider) {
+ this.delegate = delegate;
+ this.overrider = overrider;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Method overriden = getOverriden(method);
+ if (overriden != null) {
+ return overriden.invoke(overrider, args);
+ }
+ return method.invoke(delegate, args);
+ }
+
+ protected Method getOverriden(Method method) {
+ try {
+ Method implementedByOverrider =
+ overrider.getClass().getMethod(method.getName(), method.getParameterTypes());
+
+ // Only allow defined (non java defaulted) methods to actually be overridden
+ if (Object.class != implementedByOverrider.getDeclaringClass()) {
+ return implementedByOverrider;
+ }
+ } catch (NoSuchMethodException | SecurityException e) {
+ }
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked") // newProxyInstance returns Object
+ public static <T> T create(Class<T> toProxy, T delegate, Object overrider) {
+ return (T)
+ Proxy.newProxyInstance(
+ delegate.getClass().getClassLoader(),
+ new Class[] {toProxy},
+ new Handler<>(delegate, overrider));
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 2ee9a39..43d97c1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -23,6 +23,8 @@
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
import java.net.URISyntaxException;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.URIish;
@@ -41,18 +43,19 @@
@Test
public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
+ URIish completeUri = new URIish("git://someHost/basePath/someProject.git");
RefReplicatedEvent expectedEvent =
new RefReplicatedEvent(
"someProject",
"refs/heads/master",
- "someHost",
+ completeUri,
RefPushResult.SUCCEEDED,
RemoteRefUpdate.Status.OK);
gitUpdateProcessing.onRefReplicatedToOneNode(
"someProject",
"refs/heads/master",
- new URIish("git://someHost/someProject.git"),
+ completeUri,
RefPushResult.SUCCEEDED,
RemoteRefUpdate.Status.OK);
verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
@@ -60,18 +63,19 @@
@Test
public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
+ URIish completeUri = new URIish("git://someHost/basePath/someProject.git");
RefReplicatedEvent expectedEvent =
new RefReplicatedEvent(
"someProject",
"refs/changes/01/1/1",
- "someHost",
+ completeUri,
RefPushResult.FAILED,
RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
gitUpdateProcessing.onRefReplicatedToOneNode(
"someProject",
"refs/changes/01/1/1",
- new URIish("git://someHost/someProject.git"),
+ completeUri,
RefPushResult.FAILED,
RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index afe1d82..202d77e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -23,6 +23,8 @@
import com.google.gerrit.acceptance.WaitUtil;
import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.changes.NotifyHandling;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import java.io.IOException;
@@ -51,11 +53,14 @@
protected static final Optional<String> ALL_PROJECTS = Optional.empty();
protected static final int TEST_REPLICATION_DELAY_SECONDS = 1;
+ protected static final int TEST_LONG_REPLICATION_DELAY_SECONDS = 30;
protected static final int TEST_REPLICATION_RETRY_MINUTES = 1;
protected static final int TEST_PUSH_TIME_SECONDS = 1;
protected static final int TEST_PROJECT_CREATION_SECONDS = 10;
protected static final Duration TEST_PUSH_TIMEOUT =
Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
+ protected static final Duration TEST_PUSH_TIMEOUT_LONG =
+ Duration.ofSeconds(TEST_LONG_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS);
protected static final Duration TEST_NEW_PROJECT_TIMEOUT =
Duration.ofSeconds(
(TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
@@ -230,4 +235,24 @@
config.save();
}
}
+
+ protected ProjectDeletedListener.Event projectDeletedEvent(String projectNameDeleted) {
+ return new ProjectDeletedListener.Event() {
+ @Override
+ public String getProjectName() {
+ return projectNameDeleted;
+ }
+
+ @Override
+ public NotifyHandling getNotify() {
+ return NotifyHandling.NONE;
+ }
+ };
+ }
+
+ protected void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
+ throws IOException {
+ config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion);
+ config.save();
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
new file mode 100644
index 0000000..13afc0d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -0,0 +1,277 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.Sandboxed;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationFailedEvent;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationScheduledEvent;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@Sandboxed
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationEventsIT extends ReplicationDaemon {
+ private static final Duration TEST_POST_EVENT_TIMEOUT = Duration.ofSeconds(1);
+
+ @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
+ @Inject private DynamicItem<EventDispatcher> eventDispatcher;
+ private TestDispatcher testDispatcher;
+
+ @Before
+ public void setup() throws Exception {
+ initConfig();
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ setUpTestPlugin();
+ testDispatcher = new TestDispatcher();
+ eventDispatcher.set(testDispatcher, eventDispatcher.getPluginName());
+ }
+
+ @Test
+ public void replicateNewChangeSendsEvents() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Result pushResult = createChange();
+ String sourceRef = pushResult.getPatchSet().refName();
+ String metaRef = pushResult.getChange().notes().getRefName();
+ BranchNameKey changeBranch = BranchNameKey.create(project, sourceRef);
+ BranchNameKey metaBranch = BranchNameKey.create(project, metaRef);
+
+ assertThat(testDispatcher.getEvents(changeBranch, ReplicationScheduledEvent.class)).hasSize(1);
+ assertThat(testDispatcher.getEvents(metaBranch, ReplicationScheduledEvent.class)).hasSize(1);
+
+ isPushCompleted(targetProject, sourceRef, TEST_PUSH_TIMEOUT);
+ isPushCompleted(targetProject, metaRef, TEST_PUSH_TIMEOUT);
+
+ waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), metaRef);
+ waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), sourceRef);
+ assertThat(testDispatcher.getEvents(RefReplicatedEvent.class).size()).isEqualTo(2);
+
+ waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), metaRef);
+ waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), sourceRef);
+ assertThat(testDispatcher.getEvents(RefReplicationDoneEvent.class).size()).isEqualTo(2);
+ }
+
+ @Test
+ public void replicateNewBranchSendsEvents() throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey targetProject = createTestProject(project + "replica");
+ String newBranch = "refs/heads/mybranch";
+ BranchNameKey branchName = BranchNameKey.create(project, newBranch);
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+ assertThat(testDispatcher.getEvents(branchName, ReplicationScheduledEvent.class)).hasSize(1);
+
+ isPushCompleted(targetProject, newBranch, TEST_PUSH_TIMEOUT);
+
+ waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), newBranch);
+ assertThat(testDispatcher.getEvents(RefReplicatedEvent.class).size()).isEqualTo(1);
+
+ waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), newBranch);
+ assertThat(testDispatcher.getEvents(RefReplicationDoneEvent.class).size()).isEqualTo(1);
+ }
+
+ @Test
+ public void shouldEmitProjectDeletionEventsForOneRemote() throws Exception {
+ String projectName = project.get();
+ setReplicationTarget("replica", project.get());
+
+ reloadConfig();
+
+ for (ProjectDeletedListener l : deletedListeners) {
+ l.onProjectDeleted(projectDeletedEvent(projectName));
+ }
+
+ List<ProjectDeletionReplicationScheduledEvent> scheduledEvents =
+ testDispatcher.getEvents(project, ProjectDeletionReplicationScheduledEvent.class);
+ assertThat(scheduledEvents).hasSize(1);
+
+ assertThatAnyMatch(
+ scheduledEvents,
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica.git"));
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), 1);
+
+ assertThatAnyMatch(
+ testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class),
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica.git"));
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), 1);
+
+ assertThatAnyMatch(
+ testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class),
+ e -> project.equals(e.getProjectNameKey()));
+ }
+
+ @Test
+ public void shouldEmitProjectDeletionEventsForMultipleRemotesWhenSucceeding() throws Exception {
+ String projectName = project.get();
+ setReplicationTarget("replica1", projectName);
+ setReplicationTarget("replica2", projectName);
+
+ reloadConfig();
+
+ for (ProjectDeletedListener l : deletedListeners) {
+ l.onProjectDeleted(projectDeletedEvent(projectName));
+ }
+
+ List<ProjectDeletionReplicationScheduledEvent> scheduledEvents =
+ testDispatcher.getEvents(project, ProjectDeletionReplicationScheduledEvent.class);
+ assertThat(scheduledEvents).hasSize(2);
+
+ assertThatAnyMatch(
+ scheduledEvents,
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git"));
+ assertThatAnyMatch(
+ scheduledEvents,
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica2.git"));
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), 2);
+
+ List<ProjectDeletionReplicationSucceededEvent> successEvents =
+ testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class);
+
+ assertThatAnyMatch(
+ successEvents,
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git"));
+ assertThatAnyMatch(
+ successEvents,
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica2.git"));
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), 1);
+
+ assertThatAnyMatch(
+ testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class),
+ e -> project.equals(e.getProjectNameKey()));
+ }
+
+ @Test
+ public void shouldEmitProjectDeletionEventsForMultipleRemotesWhenFailing() throws Exception {
+ String projectName = project.get();
+ setReplicationTarget("replica1", projectName);
+
+ setReplicationDestination(
+ "not-existing-replica", "not-existing-replica", Optional.of(projectName));
+ setProjectDeletionReplication("not-existing-replica", true);
+
+ reloadConfig();
+
+ for (ProjectDeletedListener l : deletedListeners) {
+ l.onProjectDeleted(projectDeletedEvent(projectName));
+ }
+
+ List<ProjectDeletionReplicationScheduledEvent> scheduledEvents =
+ testDispatcher.getEvents(project, ProjectDeletionReplicationScheduledEvent.class);
+ assertThat(scheduledEvents).hasSize(2);
+
+ assertThatAnyMatch(
+ scheduledEvents,
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git"));
+ assertThatAnyMatch(
+ scheduledEvents,
+ e ->
+ project.equals(e.getProjectNameKey())
+ && e.getTargetUri().endsWith("not-existing-replica.git"));
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), 1);
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationFailedEvent.class), 1);
+
+ assertThatAnyMatch(
+ testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class),
+ e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git"));
+
+ assertThatAnyMatch(
+ testDispatcher.getEvents(project, ProjectDeletionReplicationFailedEvent.class),
+ e ->
+ project.equals(e.getProjectNameKey())
+ && e.getTargetUri().endsWith("not-existing-replica.git"));
+
+ waitForProjectEvent(
+ () -> testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), 1);
+
+ assertThatAnyMatch(
+ testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class),
+ e -> project.equals(e.getProjectNameKey()));
+ }
+
+ private <T extends RefEvent> void waitForRefEvent(Supplier<List<T>> events, String refName)
+ throws InterruptedException {
+ WaitUtil.waitUntil(
+ () -> events.get().stream().filter(e -> refName.equals(e.getRefName())).count() == 1,
+ TEST_POST_EVENT_TIMEOUT);
+ }
+
+ private <T extends ProjectEvent> void waitForProjectEvent(Supplier<List<T>> events, int count)
+ throws InterruptedException {
+ WaitUtil.waitUntil(() -> events.get().size() == count, TEST_POST_EVENT_TIMEOUT);
+ }
+
+ private Project.NameKey setReplicationTarget(String replica, String ofProject) throws Exception {
+ Project.NameKey replicaProject = createTestProject(String.format("%s%s", ofProject, replica));
+ setReplicationDestination(replica, replica, Optional.of(ofProject));
+ setProjectDeletionReplication(replica, true);
+ return replicaProject;
+ }
+
+ private <T extends ProjectEvent> void assertThatAnyMatch(List<T> events, Predicate<T> p) {
+ assertThat(events.stream().anyMatch(p)).isTrue();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
index 536080e..1c1f983 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -189,8 +189,9 @@
private List<ReplicateRefUpdate> listWaitingTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
- return tasksStorage.listWaiting().stream()
- .filter(task -> refmaskPattern.matcher(task.ref).matches())
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> refmaskPattern.matcher(task.ref()).matches())
.collect(toList());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index ba6f94d..a174e91 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -23,13 +23,11 @@
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.acceptance.WaitUtil;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.api.changes.NotifyHandling;
import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.inject.Inject;
-import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Supplier;
@@ -87,21 +85,8 @@
setProjectDeletionReplication("foo", true);
reloadConfig();
- ProjectDeletedListener.Event event =
- new ProjectDeletedListener.Event() {
- @Override
- public String getProjectName() {
- return projectNameDeleted;
- }
-
- @Override
- public NotifyHandling getNotify() {
- return NotifyHandling.NONE;
- }
- };
-
for (ProjectDeletedListener l : deletedListeners) {
- l.onProjectDeleted(event);
+ l.onProjectDeleted(projectDeletedEvent(projectNameDeleted));
}
waitUntil(() -> !nonEmptyProjectExists(replicaProject));
@@ -363,12 +348,6 @@
}
}
- private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion)
- throws IOException {
- config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion);
- config.save();
- }
-
private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
new file mode 100644
index 0000000..f549f47
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -0,0 +1,87 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class can be extended by any ReplicationStorage*IT class and provides common setup and
+ * helper methods.
+ */
+public class ReplicationStorageDaemon extends ReplicationDaemon {
+ protected static final int TEST_TASK_FINISH_SECONDS = 1;
+ protected static final int TEST_REPLICATION_MAX_RETRIES = 1;
+ protected static final Duration TEST_TASK_FINISH_TIMEOUT =
+ Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
+ protected static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
+ Duration.ofSeconds(
+ (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+ * TEST_REPLICATION_MAX_RETRIES
+ + 10);
+ protected ReplicationTasksStorage tasksStorage;
+ protected DestinationsCollection destinationCollection;
+ protected ReplicationConfig replicationConfig;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ initConfig();
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ super.setUpTestPlugin();
+ tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+ destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
+ replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
+ }
+
+ protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaitingReplicationTasks(
+ String refRegex) {
+ Pattern refmaskPattern = Pattern.compile(refRegex);
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> refmaskPattern.matcher(task.ref()).matches())
+ .collect(toList());
+ }
+
+ protected void deleteWaitingReplicationTasks(String refRegex) {
+ Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates");
+ Path waitingUpdates = refUpdates.resolve("waiting");
+ for (ReplicationTasksStorage.ReplicateRefUpdate r : listWaitingReplicationTasks(refRegex)) {
+ try {
+ Files.deleteIfExists(waitingUpdates.resolve(r.sha1()));
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't delete waiting task", e);
+ }
+ }
+ }
+
+ protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaiting() {
+ return tasksStorage.streamWaiting().collect(Collectors.toList());
+ }
+
+ protected List<ReplicationTasksStorage.ReplicateRefUpdate> listRunning() {
+ return tasksStorage.streamRunning().collect(Collectors.toList());
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index 01f20b4..e2e1e21 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -16,7 +16,6 @@
import static com.google.common.truth.Truth.assertThat;
import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP;
-import static java.util.stream.Collectors.toList;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
@@ -30,7 +29,6 @@
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -51,32 +49,7 @@
@TestPlugin(
name = "replication",
sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
-public class ReplicationStorageIT extends ReplicationDaemon {
- protected static final int TEST_TASK_FINISH_SECONDS = 1;
- protected static final int TEST_REPLICATION_MAX_RETRIES = 1;
- protected static final Duration TEST_TASK_FINISH_TIMEOUT =
- Duration.ofSeconds(TEST_TASK_FINISH_SECONDS);
- private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT =
- Duration.ofSeconds(
- (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
- * TEST_REPLICATION_MAX_RETRIES
- + 10);
- protected ReplicationTasksStorage tasksStorage;
- private DestinationsCollection destinationCollection;
- private ReplicationConfig replicationConfig;
-
- @Override
- public void setUpTestPlugin() throws Exception {
- initConfig();
- setReplicationDestination(
- "remote1",
- "suffix1",
- Optional.of("not-used-project")); // Simulates a full replication.config initialization
- super.setUpTestPlugin();
- tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
- destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class);
- replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class);
- }
+public class ReplicationStorageIT extends ReplicationStorageDaemon {
@Test
public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
@@ -119,7 +92,7 @@
reloadConfig();
String changeRef = createChange().getPatchSet().refName();
- changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+ changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
.forEach(
(update) -> {
try {
@@ -148,7 +121,7 @@
reloadConfig();
String changeRef = createChange().getPatchSet().refName();
- changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1)
+ changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1)
.forEach(
(update) -> {
try {
@@ -232,11 +205,14 @@
.getInstance(ReplicationQueue.class)
.scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
- assertThat(tasksStorage.listWaiting()).hasSize(1);
- for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
- assertThat(task.uri).isEqualTo(expectedURI);
- assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
- }
+ assertThat(listWaiting()).hasSize(1);
+ tasksStorage
+ .streamWaiting()
+ .forEach(
+ (task) -> {
+ assertThat(task.uri()).isEqualTo(expectedURI);
+ assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+ });
}
@Test
@@ -254,11 +230,14 @@
.getInstance(ReplicationQueue.class)
.scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false);
- assertThat(tasksStorage.listWaiting()).hasSize(1);
- for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) {
- assertThat(task.uri).isEqualTo(expectedURI);
- assertThat(task.ref).isEqualTo(PushOne.ALL_REFS);
- }
+ assertThat(listWaiting()).hasSize(1);
+ tasksStorage
+ .streamWaiting()
+ .forEach(
+ (task) -> {
+ assertThat(task.uri()).isEqualTo(expectedURI);
+ assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS);
+ });
}
@Test
@@ -277,13 +256,13 @@
config.setInt("remote", "task_cleanup_project", "replicationRetry", 0);
config.save();
reloadConfig();
- assertThat(tasksStorage.listRunning()).hasSize(0);
+ assertThat(listRunning()).hasSize(0);
Project.NameKey sourceProject = createTestProject("task_cleanup_project");
WaitUtil.waitUntil(
() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")),
TEST_NEW_PROJECT_TIMEOUT);
- WaitUtil.waitUntil(() -> tasksStorage.listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
+ WaitUtil.waitUntil(() -> listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT);
}
@Test
@@ -292,7 +271,7 @@
config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0);
config.save();
reloadConfig();
- assertThat(tasksStorage.listRunning()).hasSize(0);
+ assertThat(listRunning()).hasSize(0);
Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project");
WaitUtil.waitUntil(
@@ -370,22 +349,16 @@
private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote(
String changeRef, String remote) {
- return tasksStorage.listWaiting().stream()
- .filter(task -> changeRef.equals(task.ref))
- .filter(task -> remote.equals(task.remote));
+ return tasksStorage
+ .streamWaiting()
+ .filter(task -> changeRef.equals(task.ref()))
+ .filter(task -> remote.equals(task.remote()));
}
private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote(
Stream<ReplicateRefUpdate> updates, String changeRef, String remote) {
return updates
- .filter(task -> changeRef.equals(task.ref))
- .filter(task -> remote.equals(task.remote));
- }
-
- private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) {
- Pattern refmaskPattern = Pattern.compile(refRegex);
- return tasksStorage.listWaiting().stream()
- .filter(task -> refmaskPattern.matcher(task.ref).matches())
- .collect(toList());
+ .filter(task -> changeRef.equals(task.ref()))
+ .filter(task -> remote.equals(task.remote()));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
new file mode 100644
index 0000000..1001e6c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
@@ -0,0 +1,75 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.junit.Test;
+
+/**
+ * The tests in this class ensure that events in the storage are correctly managed under multi-
+ * primary scenarios.
+ *
+ * @see com.googlesource.gerrit.plugins.replication.ReplicationStorageIT
+ */
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationStorageMPIT extends ReplicationStorageDaemon {
+
+ @Test
+ public void workFromOnlyWaitingIsPerformed() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+ setReplicationDestination("foo", "replica", ALL_PROJECTS, TEST_LONG_REPLICATION_DELAY_SECONDS);
+ reloadConfig();
+
+ String newBranchA = "refs/heads/foo_branch_a";
+ String newBranchB = "refs/heads/foo_branch_b";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newBranchA).create(input);
+ gApi.projects().name(project.get()).branch(newBranchB).create(input);
+
+ deleteWaitingReplicationTasks(
+ newBranchA); // This simulates the work being completed by other node
+ assertThat(listWaitingReplicationTasks("refs/heads/foo_branch_.*")).hasSize(1);
+
+ try (Repository repo = repoManager.openRepository(targetProject);
+ Repository sourceRepo = repoManager.openRepository(project)) {
+ WaitUtil.waitUntil(
+ () -> checkedGetRef(repo, newBranchA) == null && checkedGetRef(repo, newBranchB) != null,
+ TEST_PUSH_TIMEOUT_LONG);
+
+ Ref masterRef = getRef(sourceRepo, master);
+ Ref targetBranchRefA = getRef(repo, newBranchA);
+ Ref targetBranchRefB = getRef(repo, newBranchB);
+ assertThat(targetBranchRefA).isNull();
+ assertThat(targetBranchRefB).isNotNull();
+ assertThat(targetBranchRefB.getObjectId()).isEqualTo(masterRef.getObjectId());
+ }
+
+ WaitUtil.waitUntil(() -> listRunning().isEmpty(), TEST_TASK_FINISH_TIMEOUT);
+ assertThat(listWaiting()).isEmpty();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
index 42c0914..5cfb2d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -14,11 +14,13 @@
package com.googlesource.gerrit.plugins.replication;
-import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertContainsExactly;
+import static com.google.common.truth.Truth.assertThat;
import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks;
+import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
@@ -33,8 +35,8 @@
protected static final String REMOTE = "myDest";
protected static final URIish URISH =
ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
- protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE =
- new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+ protected static final ReplicateRefUpdate REF_UPDATE =
+ ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE);
protected ReplicationTasksStorage nodeA;
@@ -64,7 +66,7 @@
nodeA.create(REF_UPDATE);
nodeB.create(REF_UPDATE);
- assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
}
@Test
@@ -72,7 +74,7 @@
nodeA.create(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -84,10 +86,10 @@
nodeA.start(URI_UPDATES);
nodeA.reset(URI_UPDATES);
- assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -101,10 +103,10 @@
nodeB.start(URI_UPDATES);
nodeB.reset(URI_UPDATES);
- assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
@@ -119,22 +121,22 @@
nodeB.reset(URI_UPDATES);
nodeA.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeA.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
}
@Test
- public void canBeResetAllAndCompletedByOtherNode() {
+ public void canBeRecoveredAndCompletedByOtherNode() {
nodeA.create(REF_UPDATE);
nodeA.start(URI_UPDATES);
- nodeB.resetAll();
- assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+ nodeB.recoverAll();
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeA.finish(URI_UPDATES);
// Bug: https://crbug.com/gerrit/12973
@@ -145,29 +147,29 @@
}
@Test
- public void canBeResetAllAndCompletedByOtherNodeFastOriginalNode() {
+ public void canBeRecoveredAndCompletedByOtherNodeFastOriginalNode() {
nodeA.create(REF_UPDATE);
nodeA.start(URI_UPDATES);
- nodeB.resetAll();
+ nodeB.recoverAll();
nodeA.finish(URI_UPDATES);
- assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
}
@Test
- public void canBeResetAllAndCompletedByOtherNodeSlowOriginalNode() {
+ public void canBeRecoveredAndCompletedByOtherNodeSlowOriginalNode() {
nodeA.create(REF_UPDATE);
nodeA.start(URI_UPDATES);
- nodeB.resetAll();
+ nodeB.recoverAll();
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeB.finish(URI_UPDATES);
ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView);
@@ -180,19 +182,31 @@
public void multipleNodesCanReplicateSameRef() {
nodeA.create(REF_UPDATE);
nodeA.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeA.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
nodeB.create(REF_UPDATE);
nodeB.start(URI_UPDATES);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
nodeB.finish(URI_UPDATES);
assertNoIncompleteTasks(persistedView);
}
+ @Test
+ public void duplicateWorkIsNotPerformed() {
+ nodeA.create(REF_UPDATE);
+ nodeB.create(REF_UPDATE);
+
+ assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref());
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+
+ assertThat(nodeB.start(URI_UPDATES)).isEmpty();
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
+ }
+
public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) {
try {
return TestUriUpdates.create(refUpdate);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
index 23d6759..202cac9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -21,6 +21,7 @@
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import org.eclipse.jgit.transport.URIish;
@@ -34,8 +35,8 @@
protected static final String REMOTE = "myDest";
protected static final URIish URISH =
ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git");
- protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE =
- new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+ protected static final ReplicateRefUpdate REF_UPDATE =
+ ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
protected FileSystem fileSystem;
protected Path storageSite;
@@ -131,11 +132,11 @@
}
@Test
- public void canBeResetAllAndCompletedByOtherNode() {
+ public void canBeRecoveredAndCompletedByOtherNode() {
taskA.create();
taskA.start();
- nodeB.resetAll();
+ nodeB.recoverAll();
assertIsWaiting(taskA);
taskB.create();
@@ -154,10 +155,10 @@
}
@Test
- public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() {
+ public void recoveredAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() {
taskA.create();
taskA.start();
- nodeB.resetAll();
+ nodeB.recoverAll();
taskA.finish();
assertIsWaiting(taskA);
@@ -173,10 +174,10 @@
}
@Test
- public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() {
+ public void recoveredAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() {
taskA.create();
taskA.start();
- nodeB.resetAll();
+ nodeB.recoverAll();
taskB.start();
assertIsRunning(taskA);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
index d9fbbe5..a2e5e4d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -36,7 +36,7 @@
protected static final String REMOTE = "myDest";
protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
protected static final ReplicateRefUpdate REF_UPDATE =
- new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+ ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
protected ReplicationTasksStorage tasksStorage;
protected FileSystem fileSystem;
@@ -200,8 +200,8 @@
@Test
public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception {
- Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
- Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+ Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
+ Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
updateA.create();
updateB.create();
assertIsWaiting(updateA);
@@ -210,8 +210,8 @@
@Test
public void canHaveTwoRunningTasksForDifferentRefs() throws Exception {
- Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE));
- Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE));
+ Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE));
+ Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE));
updateA.create();
updateB.create();
updateA.start();
@@ -225,12 +225,12 @@
Task updateA =
tasksStorage
.new Task(
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
"projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
Task updateB =
tasksStorage
.new Task(
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
"projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
updateA.create();
updateB.create();
@@ -243,12 +243,12 @@
Task updateA =
tasksStorage
.new Task(
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
"projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE));
Task updateB =
tasksStorage
.new Task(
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
"projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE));
updateA.create();
updateB.create();
@@ -260,8 +260,8 @@
@Test
public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception {
- Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
- Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+ Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
+ Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
updateA.create();
updateB.create();
assertIsWaiting(updateA);
@@ -270,8 +270,8 @@
@Test
public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception {
- Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA"));
- Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB"));
+ Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA"));
+ Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB"));
updateA.create();
updateB.create();
updateA.start();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
index 141f739..16a0363 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -22,12 +22,13 @@
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
+import com.google.common.truth.IterableSubject;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
-import java.util.List;
-import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.eclipse.jgit.transport.URIish;
import org.junit.After;
import org.junit.Before;
@@ -39,7 +40,7 @@
protected static final String REMOTE = "myDest";
protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git");
protected static final ReplicateRefUpdate REF_UPDATE =
- new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE);
+ ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE);
protected ReplicationTasksStorage storage;
protected FileSystem fileSystem;
@@ -61,14 +62,14 @@
@Test
public void canListEmptyStorage() throws Exception {
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
public void canListWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
- assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
}
@Test
@@ -83,10 +84,10 @@
@Test
public void canStartWaitingUpdate() throws Exception {
storage.create(REF_UPDATE);
- storage.start(uriUpdates);
- assertThat(storage.listWaiting()).isEmpty();
+ assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref());
+ assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
- assertContainsExactly(storage.listRunning(), REF_UPDATE);
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
}
@Test
@@ -101,22 +102,22 @@
public void instancesOfTheSameStorageHaveTheSameElements() throws Exception {
ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite);
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(persistedView.listWaiting()).isEmpty();
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(persistedView.streamWaiting()).isEmpty();
storage.create(REF_UPDATE);
- assertContainsExactly(storage.listWaiting(), REF_UPDATE);
- assertContainsExactly(persistedView.listWaiting(), REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE);
storage.start(uriUpdates);
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(persistedView.listWaiting()).isEmpty();
- assertContainsExactly(storage.listRunning(), REF_UPDATE);
- assertContainsExactly(persistedView.listRunning(), REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(persistedView.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE);
storage.finish(uriUpdates);
- assertThat(storage.listRunning()).isEmpty();
- assertThat(persistedView.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
+ assertThatStream(persistedView.streamRunning()).isEmpty();
}
@Test
@@ -124,13 +125,13 @@
String key = storage.create(REF_UPDATE);
String secondKey = storage.create(REF_UPDATE);
assertEquals(key, secondKey);
- assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
}
@Test
public void canCreateDifferentUris() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -138,7 +139,7 @@
String keyA = storage.create(REF_UPDATE);
String keyB = storage.create(updateB);
- assertThat(storage.listWaiting()).hasSize(2);
+ assertThatStream(storage.streamWaiting()).hasSize(2);
assertTrue(storage.isWaiting(uriUpdates));
assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
assertNotEquals(keyA, keyB);
@@ -147,7 +148,7 @@
@Test
public void canStartDifferentUris() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -157,18 +158,18 @@
storage.create(updateB);
storage.start(uriUpdates);
- assertContainsExactly(storage.listWaiting(), updateB);
- assertContainsExactly(storage.listRunning(), REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
storage.start(uriUpdatesB);
- assertThat(storage.listWaiting()).isEmpty();
- assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
}
@Test
public void canFinishDifferentUris() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -180,16 +181,16 @@
storage.start(uriUpdatesB);
storage.finish(uriUpdates);
- assertContainsExactly(storage.listRunning(), updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(updateB);
storage.finish(uriUpdatesB);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
public void differentUrisCreatedTwiceIsStoredOnce() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -199,19 +200,19 @@
storage.create(updateB);
storage.create(REF_UPDATE);
storage.create(updateB);
- assertThat(storage.listWaiting()).hasSize(2);
+ assertThatStream(storage.streamWaiting()).hasSize(2);
assertTrue(storage.isWaiting(uriUpdates));
assertTrue(storage.isWaiting(TestUriUpdates.create(updateB)));
}
@Test
public void canCreateMulipleRefsForSameUri() throws Exception {
- ReplicateRefUpdate refA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
- ReplicateRefUpdate refB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+ ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
+ ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
String keyA = storage.create(refA);
String keyB = storage.create(refB);
- assertThat(storage.listWaiting()).hasSize(2);
+ assertThatStream(storage.streamWaiting()).hasSize(2);
assertNotEquals(keyA, keyB);
assertTrue(storage.isWaiting(TestUriUpdates.create(refA)));
assertTrue(storage.isWaiting(TestUriUpdates.create(refB)));
@@ -219,8 +220,8 @@
@Test
public void canFinishMulipleRefsForSameUri() throws Exception {
- ReplicateRefUpdate refUpdateA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE);
- ReplicateRefUpdate refUpdateB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE);
+ ReplicateRefUpdate refUpdateA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE);
+ ReplicateRefUpdate refUpdateB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE);
UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA);
UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB);
storage.create(refUpdateA);
@@ -229,10 +230,10 @@
storage.start(uriUpdatesB);
storage.finish(uriUpdatesA);
- assertContainsExactly(storage.listRunning(), refUpdateB);
+ assertThatStream(storage.streamRunning()).containsExactly(refUpdateB);
storage.finish(uriUpdatesB);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
@@ -241,8 +242,8 @@
storage.start(uriUpdates);
storage.reset(uriUpdates);
- assertContainsExactly(storage.listWaiting(), REF_UPDATE);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).isEmpty();
}
@Test
@@ -252,40 +253,40 @@
storage.reset(uriUpdates);
storage.start(uriUpdates);
- assertContainsExactly(storage.listRunning(), REF_UPDATE);
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
- assertThat(storage.listWaiting()).isEmpty();
storage.finish(uriUpdates);
assertNoIncompleteTasks(storage);
}
@Test
- public void canResetAllEmpty() throws Exception {
- storage.resetAll();
+ public void canRecoverEmpty() throws Exception {
+ storage.recoverAll();
assertNoIncompleteTasks(storage);
}
@Test
- public void canResetAllUpdate() throws Exception {
+ public void canRecoverUpdate() throws Exception {
storage.create(REF_UPDATE);
storage.start(uriUpdates);
- storage.resetAll();
- assertContainsExactly(storage.listWaiting(), REF_UPDATE);
+ storage.recoverAll();
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamRunning()).isEmpty();
assertTrue(storage.isWaiting(uriUpdates));
- assertThat(storage.listRunning()).isEmpty();
}
@Test
- public void canCompleteResetAllUpdate() throws Exception {
+ public void canCompleteRecoveredUpdate() throws Exception {
storage.create(REF_UPDATE);
storage.start(uriUpdates);
- storage.resetAll();
+ storage.recoverAll();
storage.start(uriUpdates);
- assertContainsExactly(storage.listRunning(), REF_UPDATE);
- assertThat(storage.listWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).isEmpty();
assertFalse(storage.isWaiting(uriUpdates));
storage.finish(uriUpdates);
@@ -293,9 +294,9 @@
}
@Test
- public void canResetAllMultipleUpdates() throws Exception {
+ public void canRecoverMultipleUpdates() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -306,14 +307,14 @@
storage.start(uriUpdates);
storage.start(uriUpdatesB);
- storage.resetAll();
- assertContainsExactly(storage.listWaiting(), REF_UPDATE, updateB);
+ storage.recoverAll();
+ assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB);
}
@Test
- public void canCompleteMultipleResetAllUpdates() throws Exception {
+ public void canCompleteMultipleRecoveredUpdates() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -323,15 +324,15 @@
storage.create(updateB);
storage.start(uriUpdates);
storage.start(uriUpdatesB);
- storage.resetAll();
+ storage.recoverAll();
storage.start(uriUpdates);
- assertContainsExactly(storage.listRunning(), REF_UPDATE);
- assertContainsExactly(storage.listWaiting(), updateB);
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE);
+ assertThatStream(storage.streamWaiting()).containsExactly(updateB);
storage.start(uriUpdatesB);
- assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB);
- assertThat(storage.listWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB);
+ assertThatStream(storage.streamWaiting()).isEmpty();
storage.finish(uriUpdates);
storage.finish(uriUpdatesB);
@@ -355,7 +356,7 @@
@Test(expected = Test.None.class /* no exception expected */)
public void illegalDoubleFinishDifferentUriIsGraceful() throws Exception {
ReplicateRefUpdate updateB =
- new ReplicateRefUpdate(
+ ReplicateRefUpdate.create(
PROJECT,
REF,
getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http
@@ -370,30 +371,16 @@
storage.finish(uriUpdates);
storage.finish(uriUpdatesB);
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
protected static void assertNoIncompleteTasks(ReplicationTasksStorage storage) {
- assertThat(storage.listWaiting()).isEmpty();
- assertThat(storage.listRunning()).isEmpty();
+ assertThatStream(storage.streamWaiting()).isEmpty();
+ assertThatStream(storage.streamRunning()).isEmpty();
}
- protected static void assertContainsExactly(
- List<ReplicateRefUpdate> all, ReplicateRefUpdate... refUpdates) {
- assertThat(all).hasSize(refUpdates.length);
- for (int i = 0; i < refUpdates.length; i++) {
- assertTrue(equals(all.get(i), refUpdates[i]));
- }
- }
-
- public static boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) {
- return (one == null && two == null)
- || (one != null
- && two != null
- && Objects.equals(one.project, two.project)
- && Objects.equals(one.ref, two.ref)
- && Objects.equals(one.remote, two.remote)
- && Objects.equals(one.uri, two.uri));
+ protected static IterableSubject assertThatStream(Stream<?> stream) {
+ return assertThat(stream.collect(Collectors.toList()));
}
public static URIish getUrish(String uri) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
new file mode 100644
index 0000000..901200b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -0,0 +1,78 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefEvent;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestDispatcher implements EventDispatcher {
+ private final List<ProjectEvent> projectEvents = new LinkedList<>();
+ private final List<RefEvent> refEvents = new LinkedList<>();
+ private final List<Event> events = new LinkedList<>();
+
+ @Override
+ public void postEvent(Change change, ChangeEvent event) {} // Not used in replication
+
+ @Override
+ public void postEvent(BranchNameKey branchName, RefEvent event) {
+ refEvents.add(event);
+ }
+
+ @Override
+ public void postEvent(Project.NameKey projectName, ProjectEvent event) {
+ projectEvents.add(event);
+ }
+
+ @Override
+ public void postEvent(Event event) {
+ events.add(event);
+ }
+
+ public List<RefEvent> getEvents(BranchNameKey branch, Class<? extends RefEvent> clazz) {
+ return getEvents(branch).stream().filter(clazz::isInstance).collect(Collectors.toList());
+ }
+
+ public <T extends ProjectEvent> List<T> getEvents(Project.NameKey project, Class<T> clazz) {
+ return getEvents(project).stream()
+ .filter(clazz::isInstance)
+ .map(clazz::cast)
+ .collect(Collectors.toList());
+ }
+
+ public <T extends RefEvent> List<T> getEvents(Class<T> clazz) {
+ return events.stream().filter(clazz::isInstance).map(clazz::cast).collect(Collectors.toList());
+ }
+
+ private List<RefEvent> getEvents(BranchNameKey branch) {
+ return refEvents.stream()
+ .filter(e -> e.getBranchNameKey().equals(branch))
+ .collect(Collectors.toList());
+ }
+
+ private List<ProjectEvent> getEvents(Project.NameKey project) {
+ return projectEvents.stream()
+ .filter(e -> e.getProjectNameKey().equals(project))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
index 2fd3ee3..f61114e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -26,10 +26,10 @@
public abstract class TestUriUpdates implements UriUpdates {
public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException {
return create(
- Project.nameKey(update.project),
- new URIish(update.uri),
- update.remote,
- Collections.singleton(update.ref));
+ Project.nameKey(update.project()),
+ new URIish(update.uri()),
+ update.remote(),
+ Collections.singleton(update.ref()));
}
public static TestUriUpdates create(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/events/EventFieldsTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/events/EventFieldsTest.java
new file mode 100644
index 0000000..9f0fb9f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/events/EventFieldsTest.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.events;
+
+import static org.junit.Assert.assertEquals;
+
+import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import java.net.URISyntaxException;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+public class EventFieldsTest {
+ @SuppressWarnings("deprecation")
+ @Test
+ public void refReplicatedEventFields() throws URISyntaxException {
+ URIish completeUri = new URIish("git://someHost:9417/basePath/someProject.git");
+ RefReplicatedEvent event =
+ new RefReplicatedEvent(
+ "someProject",
+ "refs/heads/master",
+ completeUri,
+ RefPushResult.SUCCEEDED,
+ RemoteRefUpdate.Status.OK);
+
+ assertEquals("someProject", event.project);
+ assertEquals("refs/heads/master", event.ref);
+ assertEquals("someHost:9417", event.targetNode);
+ assertEquals("git://someHost:9417/basePath/someProject.git", event.targetUri);
+ assertEquals("succeeded", event.status);
+ assertEquals(RemoteRefUpdate.Status.OK, event.refStatus);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void scheduledEventFields() throws URISyntaxException {
+ URIish completeUri = new URIish("git://someHost:9417/basePath/someProject.git");
+ ReplicationScheduledEvent event =
+ new ReplicationScheduledEvent("someProject", "refs/heads/master", completeUri);
+
+ assertEquals("someProject", event.project);
+ assertEquals("git://someHost:9417/basePath/someProject.git", event.targetUri);
+ assertEquals("someHost:9417", event.targetNode);
+ }
+
+ @Test
+ public void refReplicatedEventsEqual() throws URISyntaxException {
+ URIish completeUri = new URIish("git://someHost:9417/basePath/someProject.git");
+ RefReplicatedEvent first =
+ new RefReplicatedEvent(
+ "someProject",
+ "refs/heads/master",
+ completeUri,
+ RefPushResult.SUCCEEDED,
+ RemoteRefUpdate.Status.OK);
+
+ RefReplicatedEvent second =
+ new RefReplicatedEvent(
+ "someProject",
+ "refs/heads/master",
+ completeUri,
+ RefPushResult.SUCCEEDED,
+ RemoteRefUpdate.Status.OK);
+
+ assertEquals(first, second);
+ }
+}