Merge branch 'stable-3.3' into stable-3.4 * origin/stable-3.3: Do not retry replication when local repository not found Change-Id: I6a8d0650ca24a4aac86fdafc819b028cf8864332
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index 782ff4f..a0d9624 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -77,6 +77,11 @@ } @Override + public int getMaxRefsToShow() { + return currentConfig.getMaxRefsToShow(); + } + + @Override public Path getEventsDirectory() { return currentConfig.getEventsDirectory(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java new file mode 100644 index 0000000..5b27176 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoValueTypeAdapterFactory.java
@@ -0,0 +1,25 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.gson.TypeAdapterFactory; +import com.ryanharter.auto.value.gson.GsonTypeAdapterFactory; + +@GsonTypeAdapterFactory +public abstract class AutoValueTypeAdapterFactory implements TypeAdapterFactory { + public static TypeAdapterFactory create() { + return new AutoValueGson_AutoValueTypeAdapterFactory(); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java new file mode 100644 index 0000000..89b97e9 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -0,0 +1,179 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.common.flogger.FluentLogger; +import java.util.Iterator; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Stream; + +/** + * Non-greedily schedules consecutive tasks in a executor, one for each item returned by an {@link + * Iterator}. + * + * <p>This scheduler is useful when an {@link Iterator} might provide a large amount of items to be + * worked on in a non prioritized fashion. This scheduler will scavenge any unused threads in its + * executor to do work, however only one item will ever be outstanding and waiting at a time. This + * scheduling policy allows large collections to be processed without interfering much with higher + * prioritized tasks while still making regular progress on the items provided by the {@link + * Iterator}. To keep the level of interference to a minimum, ensure that the amount of work needed + * for each item is small and short. + */ +public class ChainedScheduler<T> { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + /** + * Override to implement a common thread pool task for all the items returned by the {@link + * Iterator} + */ + public interface Runner<T> { + /** Will get executed in the thread pool task for each item */ + void run(T item); + + /** Will get called after the last item completes */ + default void onDone() {} + + /** Will get called to display {@link Runnable} for item in show-queue output */ + default String toString(T item) { + return "Chained " + item.toString(); + } + } + + /** Override to decorate an existing {@link Runner} */ + public static class ForwardingRunner<T> implements Runner<T> { + protected Runner<T> delegateRunner; + + public ForwardingRunner(Runner<T> delegate) { + delegateRunner = delegate; + } + + @Override + public void run(T item) { + delegateRunner.run(item); + } + + @Override + public void onDone() { + delegateRunner.onDone(); + } + + @Override + public String toString(T item) { + return delegateRunner.toString(item); + } + } + + /** + * Use when a {@link Stream} is needed instead of an {@link Iterator}, it will close the {@link + * Stream} + */ + public static class StreamScheduler<T> extends ChainedScheduler<T> { + public StreamScheduler( + ScheduledExecutorService threadPool, final Stream<T> stream, Runner<T> runner) { + super( + threadPool, + stream.iterator(), + new ForwardingRunner<T>(runner) { + @Override + public void onDone() { + stream.close(); + super.onDone(); + } + }); + } + } + + /** Internal {@link Runnable} containing one item to run and which schedules the next one. */ + protected class Chainer implements Runnable { + protected T item; + + public Chainer(T item) { + this.item = item; + } + + @Override + public void run() { + boolean scheduledNext = scheduleNext(); + try { + runner.run(item); + } catch (RuntimeException e) { // catch to prevent chain from breaking + logger.atSevere().withCause(e).log("Error while running: " + item); + } + if (!scheduledNext) { + runner.onDone(); + } + } + + @Override + public String toString() { + return runner.toString(item); + } + } + + protected final ScheduledExecutorService threadPool; + protected final Iterator<T> iterator; + protected final Runner<T> runner; + + /** + * Note: The {@link Iterator} passed in will only ever be accessed from one thread at a time, and + * the internal state of the {@link Iterator} will be updated after each next() call before + * operating on the iterator again. + */ + public ChainedScheduler( + ScheduledExecutorService threadPool, Iterator<T> iterator, Runner<T> runner) { + this.threadPool = threadPool; + this.iterator = iterator; + this.runner = runner; + + if (!scheduleNext()) { + runner.onDone(); + } + } + + /** + * Concurrency note: + * + * <p>Since there is only one chain of tasks and each task submits the next task to the executor, + * the calls from here to the iterator.next() call will never be executed concurrently by more + * than one thread. + * + * <p>Data synchronization note: + * + * <p>This section in the [1] javadoc: "Actions in a thread prior to the submission of a Runnable + * to an Executor happen-before its execution begins..." guarantees that since the iterator.next() + * happens before the schedule(), and the new tasks call to hasNext() happen after the submission, + * that the hasNext() call will see the results of the previous next() call even though it may + * have happened on a different thread. + * + * <p>[1] + * <li>https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html + * (section "Memory Consistency Properties") + * <li>The methods of all classes in java.util.concurrent and its subpackages extends the + * guarantees of the java memory model to higher-level synchronization. + * <li>In particular this guarantee of the java.util.concurrent applies here: + */ + protected boolean scheduleNext() { + if (!iterator.hasNext()) { + return false; + } + + schedule(new Chainer(iterator.next())); + return true; + } + + protected void schedule(Runnable r) { + threadPool.execute(r); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java index 8ea7227..965ca94 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -22,36 +22,46 @@ import com.google.gerrit.server.util.IdGenerator; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState; import java.util.Optional; import org.eclipse.jgit.transport.URIish; public class DeleteProjectTask implements Runnable { interface Factory { - DeleteProjectTask create(URIish replicateURI, Project.NameKey project); + + DeleteProjectTask create( + URIish replicateURI, Project.NameKey project, ProjectDeletionState state); } private final DynamicItem<AdminApiFactory> adminApiFactory; private final int id; private final URIish replicateURI; private final Project.NameKey project; + private final ProjectDeletionState state; @Inject DeleteProjectTask( DynamicItem<AdminApiFactory> adminApiFactory, IdGenerator ig, + @Assisted ProjectDeletionState state, @Assisted URIish replicateURI, @Assisted Project.NameKey project) { this.adminApiFactory = adminApiFactory; this.id = ig.next(); this.replicateURI = replicateURI; this.project = project; + this.state = state; } @Override public void run() { Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI); if (adminApi.isPresent()) { - adminApi.get().deleteProject(project); + if (adminApi.get().deleteProject(project)) { + state.setSucceeded(replicateURI); + } else { + state.setFailed(replicateURI); + } return; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index bd34676..baf0328 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -15,7 +15,6 @@ package com.googlesource.gerrit.plugins.replication; import static com.google.gerrit.server.project.ProjectCache.noSuchProject; -import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName; import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING; import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.REJECTED_OTHER_REASON; @@ -64,11 +63,14 @@ import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.servlet.RequestScoped; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState; +import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; +import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -432,13 +434,16 @@ ScheduledFuture<?> ignored = pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS); pending.put(uri, task); + repLog.atInfo().log( + "scheduled %s:%s => %s to run %s", + project, ref, task, now ? "now" : "after " + config.getDelay() + "s"); } else { addRef(task, ref); task.addState(ref, state); + repLog.atInfo().log( + "consolidated %s:%s => %s with an existing pending push", project, ref, task); } state.increasePushTaskCount(project.get(), ref); - repLog.atInfo().log( - "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay()); } } @@ -454,13 +459,16 @@ synchronized (stateLock) { URIish uri = pushOp.getURI(); pending.remove(uri); + pushOp.notifyNotAttempted(pushOp.getRefs()); } } - void scheduleDeleteProject(URIish uri, Project.NameKey project) { + void scheduleDeleteProject(URIish uri, Project.NameKey project, ProjectDeletionState state) { + repLog.atFine().log("scheduling deletion of project %s at %s", project, uri); @SuppressWarnings("unused") ScheduledFuture<?> ignored = - pool.schedule(deleteProjectFactory.create(uri, project), 0, TimeUnit.SECONDS); + pool.schedule(deleteProjectFactory.create(uri, project, state), 0, TimeUnit.SECONDS); + state.setScheduled(uri); } void scheduleUpdateHead(URIish uri, Project.NameKey project, String newHead) { @@ -586,7 +594,7 @@ if (inFlightOp != null) { return RunwayStatus.denied(inFlightOp.getId()); } - replicationTasksStorage.get().start(op); + op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op))); inFlight.put(op.getURI(), op); } return RunwayStatus.allowed(); @@ -601,15 +609,15 @@ } } - public Set<String> getPrunableTaskNames() { - Set<String> names = new HashSet<>(); + public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() { + Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>(); for (PushOne push : pending.values()) { - if (!replicationTasksStorage.get().isWaiting(push)) { - repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI()); - names.add(push.toString()); + String taskName = push.toString(); + for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) { + taskNameByReplicateRefUpdate.put(refUpdate, taskName); } } - return names; + return taskNameByReplicateRefUpdate; } boolean wouldPush(URIish uri, Project.NameKey project, String ref) { @@ -782,10 +790,9 @@ private void postReplicationScheduledEvent(PushOne pushOp, String inputRef) { Set<String> refs = inputRef == null ? pushOp.getRefs() : ImmutableSet.of(inputRef); Project.NameKey project = pushOp.getProjectNameKey(); - String targetNode = resolveNodeName(pushOp.getURI()); for (String ref : refs) { ReplicationScheduledEvent event = - new ReplicationScheduledEvent(project.get(), ref, targetNode); + new ReplicationScheduledEvent(project.get(), ref, pushOp.getURI()); try { eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { @@ -796,10 +803,9 @@ private void postReplicationFailedEvent(PushOne pushOp, RemoteRefUpdate.Status status) { Project.NameKey project = pushOp.getProjectNameKey(); - String targetNode = resolveNodeName(pushOp.getURI()); for (String ref : pushOp.getRefs()) { RefReplicatedEvent event = - new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status); + new RefReplicatedEvent(project.get(), ref, pushOp.getURI(), RefPushResult.FAILED, status); try { eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java index 5b24bf5..b915d0d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -137,6 +137,11 @@ } @Override + public int getMaxRefsToShow() { + return replicationConfig.getMaxRefsToShow(); + } + + @Override public Path getEventsDirectory() { return replicationConfig.getEventsDirectory(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java index d195aa3..0fa02ef 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -68,7 +68,8 @@ if (exitCode == 1) { logger.atInfo().log( "DeleteProject plugin is not installed on %s;" - + " will not try to forward this operation to that host"); + + " will not try to forward this operation to that host", + uri); withoutDeleteProjectPlugin.add(uri); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 693dd73..5e940f8 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -52,6 +52,7 @@ import com.jcraft.jsch.JSchException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -61,6 +62,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.eclipse.jgit.errors.NoRemoteRepositoryException; import org.eclipse.jgit.errors.NotSupportedException; import org.eclipse.jgit.errors.RemoteRepositoryException; @@ -212,7 +214,7 @@ @Override public String toString() { - String print = "[" + HexFormat.fromInt(id) + "] push " + uri; + String print = "[" + HexFormat.fromInt(id) + "] push " + uri + " " + getLimitedRefs(); if (retryCount > 0) { print = "(retry " + retryCount + ") " + print; @@ -220,6 +222,36 @@ return print; } + /** + * Returns a string of refs limited to the maxRefsToShow config with count of total refs hidden + * when there are more refs than maxRefsToShow config. + * + * <ul> + * <li>Refs will not be limited when maxRefsToShow config is set to zero. + * <li>By default output will be limited to two refs. + * </ul> + * + * The default value of two is chosen because whenever a new patchset is created there are two + * refs to be replicated(change ref and meta ref). + * + * @return Space separated string of refs (in square bracket) limited to the maxRefsToShow with + * count of total refs hidden(in parentheses) when there are more refs than maxRefsToShow + * config. + */ + protected String getLimitedRefs() { + Set<String> refs = getRefs(); + int maxRefsToShow = replConfig.getMaxRefsToShow(); + if (maxRefsToShow == 0) { + maxRefsToShow = refs.size(); + } + String refsString = refs.stream().limit(maxRefsToShow).collect(Collectors.joining(" ")); + int hiddenRefs = refs.size() - maxRefsToShow; + if (hiddenRefs > 0) { + refsString += " (+" + hiddenRefs + ")"; + } + return "[" + refsString + "]"; + } + boolean isRetrying() { return retrying; } @@ -270,6 +302,28 @@ } } + Set<String> setStartedRefs(Set<String> startedRefs) { + Set<String> notAttemptedRefs = Sets.difference(delta, startedRefs); + pushAllRefs = false; + delta.clear(); + addRefs(startedRefs); + return notAttemptedRefs; + } + + void notifyNotAttempted(Set<String> notAttemptedRefs) { + notAttemptedRefs.forEach( + ref -> + Arrays.asList(getStatesByRef(ref)) + .forEach( + state -> + state.notifyRefReplicated( + projectName.get(), + ref, + uri, + RefPushResult.NOT_ATTEMPTED, + RemoteRefUpdate.Status.UP_TO_DATE))); + } + void addState(String ref, ReplicationState state) { stateMap.put(ref, state); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java index 92ba4be..5450dd5 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -20,6 +20,8 @@ import com.google.gerrit.server.events.RefEvent; import com.google.gerrit.server.permissions.PermissionBackendException; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; +import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.transport.RemoteRefUpdate; @@ -191,7 +193,7 @@ URIish uri, RefPushResult status, RemoteRefUpdate.Status refStatus) { - postEvent(new RefReplicatedEvent(project, ref, resolveNodeName(uri), status, refStatus)); + postEvent(new RefReplicatedEvent(project, ref, uri, status, refStatus)); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index 8bbb180..ec75450 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -60,6 +60,13 @@ int getMaxRefsToLog(); /** + * Returns the maximum number of replicating refs to show in the show-queue output + * + * @return maximum number of refs to show, 2 by default. + */ + int getMaxRefsToShow(); + + /** * Configured location where the replication events are stored on the filesystem for being resumed * and kept across restarts. *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index 2631cbe..f5c6185 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -34,6 +34,7 @@ private boolean replicateAllOnPluginStart; private boolean defaultForceUpdate; private int maxRefsToLog; + private final int maxRefsToShow; private int sshCommandTimeout; private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS; private final FileBasedConfig config; @@ -54,6 +55,7 @@ this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false); this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0); + this.maxRefsToShow = config.getInt("gerrit", "maxRefsToShow", 2); this.pluginDataDir = pluginDataDir; } @@ -96,6 +98,11 @@ } @Override + public int getMaxRefsToShow() { + return maxRefsToShow; + } + + @Override public Path getEventsDirectory() { String eventsDirectory = config.getString("replication", null, "eventsDirectory"); if (!Strings.isNullOrEmpty(eventsDirectory)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java index c2b96a1..75fa5b3 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -32,6 +32,14 @@ import com.google.inject.Scopes; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.internal.UniqueAnnotations; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationDoneEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationFailedEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationScheduledEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState; +import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; +import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent; +import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent; import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -77,6 +85,7 @@ .to(StartReplicationCapability.class); install(new FactoryModuleBuilder().build(PushAll.Factory.class)); + install(new FactoryModuleBuilder().build(ProjectDeletionState.Factory.class)); bind(EventBus.class).in(Scopes.SINGLETON); bind(ReplicationDestinations.class).to(DestinationsCollection.class); @@ -100,6 +109,16 @@ EventTypes.register(RefReplicatedEvent.TYPE, RefReplicatedEvent.class); EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class); EventTypes.register(ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class); + EventTypes.register( + ProjectDeletionReplicationScheduledEvent.TYPE, + ProjectDeletionReplicationScheduledEvent.class); + EventTypes.register( + ProjectDeletionReplicationFailedEvent.TYPE, ProjectDeletionReplicationFailedEvent.class); + EventTypes.register( + ProjectDeletionReplicationSucceededEvent.TYPE, + ProjectDeletionReplicationSucceededEvent.class); + EventTypes.register( + ProjectDeletionReplicationDoneEvent.TYPE, ProjectDeletionReplicationDoneEvent.class); bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class); bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 990e387..4abb295 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -33,13 +33,18 @@ import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState; import java.net.URISyntaxException; +import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.transport.URIish; /** Manages automatic replication to remote repositories. */ @@ -59,11 +64,17 @@ private final DynamicItem<EventDispatcher> dispatcher; private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency private final ReplicationTasksStorage replicationTasksStorage; + private final ProjectDeletionState.Factory projectDeletionStateFactory; private volatile boolean running; - private volatile boolean replaying; + private final AtomicBoolean replaying = new AtomicBoolean(); private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; private Distributor distributor; + protected enum Prune { + TRUE, + FALSE; + } + @Inject ReplicationQueue( ReplicationConfig rc, @@ -71,7 +82,8 @@ Provider<ReplicationDestinations> rd, DynamicItem<EventDispatcher> dis, ReplicationStateListeners sl, - ReplicationTasksStorage rts) { + ReplicationTasksStorage rts, + ProjectDeletionState.Factory pd) { replConfig = rc; workQueue = wq; dispatcher = dis; @@ -79,6 +91,7 @@ stateLog = sl; replicationTasksStorage = rts; beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); + projectDeletionStateFactory = pd; } @Override @@ -86,10 +99,8 @@ if (!running) { destinations.get().startup(workQueue); running = true; - replicationTasksStorage.resetAll(); - Thread t = new Thread(this::firePendingEvents, "firePendingEvents"); - t.setDaemon(true); - t.start(); + replicationTasksStorage.recoverAll(); + synchronizePendingEvents(Prune.FALSE); fireBeforeStartupEvents(); distributor = new Distributor(workQueue); } @@ -112,7 +123,7 @@ @Override public boolean isReplaying() { - return replaying; + return replaying.get(); } public void scheduleFullSync( @@ -177,7 +188,7 @@ if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) { for (URIish uri : cfg.getURIs(project, urlMatch)) { replicationTasksStorage.create( - new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName())); + ReplicateRefUpdate.create(project.get(), refName, uri, cfg.getRemoteConfigName())); cfg.schedule(project, refName, uri, state, now); } } else { @@ -188,35 +199,55 @@ } } - private void firePendingEvents() { - replaying = true; - try { - replaying = true; - for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { - try { - fire(new URIish(t.uri), Project.nameKey(t.project), t.ref); - } catch (URISyntaxException e) { - repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t); + private void synchronizePendingEvents(Prune prune) { + if (replaying.compareAndSet(false, true)) { + final Map<ReplicateRefUpdate, String> taskNamesByReplicateRefUpdate = new ConcurrentHashMap<>(); + if (Prune.TRUE.equals(prune)) { + for (Destination destination : destinations.get().getAll(FilterType.ALL)) { + taskNamesByReplicateRefUpdate.putAll(destination.getTaskNamesByReplicateRefUpdate()); } } - } catch (Throwable e) { - repLog.atSevere().withCause(e).log("Unexpected error while firing pending events"); - } finally { - replaying = false; + new ChainedScheduler.StreamScheduler<>( + workQueue.getDefaultQueue(), + replicationTasksStorage.streamWaiting(), + new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() { + @Override + public void run(ReplicationTasksStorage.ReplicateRefUpdate u) { + try { + fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref()); + if (Prune.TRUE.equals(prune)) { + taskNamesByReplicateRefUpdate.remove(u); + } + } catch (URISyntaxException e) { + repLog.atSevere().withCause(e).log( + "Encountered malformed URI for persisted event %s", u); + } catch (Throwable e) { + repLog.atSevere().withCause(e).log("Unexpected error while firing pending events"); + } + } + + @Override + public void onDone() { + if (Prune.TRUE.equals(prune)) { + pruneNoLongerPending(taskNamesByReplicateRefUpdate.values()); + } + replaying.set(false); + } + + @Override + public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) { + return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref()); + } + }); } } - private void pruneCompleted() { + private void pruneNoLongerPending(Collection<String> prunableTaskNames) { // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes. // We also cannot access them by taskId since PushOnes don't have a taskId, they do have - // and Id, but it not the id assigned to the task in the queues. The tasks in the queue - // do use the same name as returned by toString() though, so that be used to correlate + // an Id, but it is not the id assigned to the task in the queues. The tasks in the queue + // do use the same name as returned by toString() though, so that can be used to correlate // PushOnes with queue tasks despite their wrappers. - Set<String> prunableTaskNames = new HashSet<>(); - for (Destination destination : destinations.get().getAll(FilterType.ALL)) { - prunableTaskNames.addAll(destination.getPrunableTaskNames()); - } - for (WorkQueue.Task<?> task : workQueue.getTasks()) { WorkQueue.Task.State state = task.getState(); if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) { @@ -233,8 +264,12 @@ @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey p = Project.nameKey(event.getProjectName()); - destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream() - .forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p)); + ProjectDeletionState state = projectDeletionStateFactory.create(p); + Collection<Map.Entry<Destination, URIish>> projectsToDelete = + destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries(); + + projectsToDelete.forEach(e -> state.setToProcess(e.getValue())); + projectsToDelete.forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p, state)); } @Override @@ -288,8 +323,7 @@ return; } try { - firePendingEvents(); - pruneCompleted(); + synchronizePendingEvents(Prune.TRUE); } catch (Exception e) { repLog.atSevere().withCause(e).log("error distributing tasks"); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java index b948be0..871ed52 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -79,7 +79,7 @@ pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus); RefReplicationStatus completedRefStatus = null; - boolean allPushTaksCompleted = false; + boolean allPushTasksCompleted = false; countingLock.lock(); try { RefReplicationStatus refStatus = getRefStatus(project, ref); @@ -90,7 +90,7 @@ if (refStatus.allDone()) { completedRefStatus = statusByProjectRef.remove(project, ref); } - allPushTaksCompleted = finishedPushTasksCount.get() == totalPushTasksCount.get(); + allPushTasksCompleted = finishedPushTasksCount.get() == totalPushTasksCount.get(); } } finally { countingLock.unlock(); @@ -100,7 +100,7 @@ doRefPushTasksCompleted(completedRefStatus); } - if (allPushTaksCompleted) { + if (allPushTasksCompleted) { doAllPushTasksCompleted(); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 3947ebc..4736402 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -16,10 +16,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.FluentLogger; import com.google.common.hash.Hashing; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; import com.google.inject.Inject; import com.google.inject.ProvisionException; import com.google.inject.Singleton; @@ -29,9 +32,9 @@ import java.nio.file.NotDirectoryException; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.util.List; +import java.util.HashSet; import java.util.Optional; -import java.util.stream.Collectors; +import java.util.Set; import java.util.stream.Stream; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.transport.URIish; @@ -61,10 +64,11 @@ public class ReplicationTasksStorage { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - public static class ReplicateRefUpdate { - public static Optional<ReplicateRefUpdate> createOptionally(Path file) { + @AutoValue + public abstract static class ReplicateRefUpdate { + public static Optional<ReplicateRefUpdate> createOptionally(Path file, Gson gson) { try { - return Optional.ofNullable(create(file)); + return Optional.ofNullable(create(file, gson)); } catch (NoSuchFileException e) { logger.atFine().log("File %s not found while reading task", file); } catch (IOException e) { @@ -73,30 +77,40 @@ return Optional.empty(); } - public static ReplicateRefUpdate create(Path file) throws IOException { + public static ReplicateRefUpdate create(Path file, Gson gson) throws IOException { String json = new String(Files.readAllBytes(file), UTF_8); - return GSON.fromJson(json, ReplicateRefUpdate.class); + return gson.fromJson(json, ReplicateRefUpdate.class); } - public final String project; - public final String ref; - public final String uri; - public final String remote; + public static ReplicateRefUpdate create(String project, String ref, URIish uri, String remote) { + return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate( + project, ref, uri.toASCIIString(), remote); + } - public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) { - this.project = project; - this.ref = ref; - this.uri = uri.toASCIIString(); - this.remote = remote; + public abstract String project(); + + public abstract String ref(); + + public abstract String uri(); + + public abstract String remote(); + + public String sha1() { + return ReplicationTasksStorage.sha1(project() + "\n" + ref() + "\n" + uri() + "\n" + remote()) + .name(); } @Override - public String toString() { - return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote; + public final String toString() { + return "ref-update " + project() + ":" + ref() + " uri:" + uri() + " remote:" + remote(); + } + + public static TypeAdapter<ReplicateRefUpdate> typeAdapter(Gson gson) { + return new AutoValue_ReplicationTasksStorage_ReplicateRefUpdate.GsonTypeAdapter(gson); } } - private static final Gson GSON = new Gson(); + private final Gson gson; private final Path buildingUpdates; private final Path runningUpdates; @@ -112,16 +126,23 @@ buildingUpdates = refUpdates.resolve("building"); runningUpdates = refUpdates.resolve("running"); waitingUpdates = refUpdates.resolve("waiting"); + gson = + new GsonBuilder().registerTypeAdapterFactory(AutoValueTypeAdapterFactory.create()).create(); } public synchronized String create(ReplicateRefUpdate r) { return new Task(r).create(); } - public synchronized void start(UriUpdates uriUpdates) { + public synchronized Set<String> start(UriUpdates uriUpdates) { + Set<String> startedRefs = new HashSet<>(); for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { - new Task(update).start(); + Task t = new Task(update); + if (t.start()) { + startedRefs.add(t.update.ref()); + } } + return startedRefs; } public synchronized void reset(UriUpdates uriUpdates) { @@ -130,10 +151,8 @@ } } - public synchronized void resetAll() { - for (ReplicateRefUpdate r : list(createDir(runningUpdates))) { - new Task(r).reset(); - } + public synchronized void recoverAll() { + streamRunning().forEach(r -> new Task(r).recover()); } public boolean isWaiting(UriUpdates uriUpdates) { @@ -148,21 +167,17 @@ } } - public List<ReplicateRefUpdate> listWaiting() { - return list(createDir(waitingUpdates)); + public Stream<ReplicateRefUpdate> streamWaiting() { + return streamRecursive(createDir(waitingUpdates)); } - public List<ReplicateRefUpdate> listRunning() { - return list(createDir(runningUpdates)); - } - - private List<ReplicateRefUpdate> list(Path taskDir) { - return streamRecursive(taskDir).collect(Collectors.toList()); + public Stream<ReplicateRefUpdate> streamRunning() { + return streamRecursive(createDir(runningUpdates)); } private Stream<ReplicateRefUpdate> streamRecursive(Path dir) { return walkNonDirs(dir) - .map(path -> ReplicateRefUpdate.createOptionally(path)) + .map(path -> ReplicateRefUpdate.createOptionally(path, gson)) .filter(Optional::isPresent) .map(Optional::get); } @@ -179,7 +194,7 @@ } @SuppressWarnings("deprecation") - private ObjectId sha1(String s) { + private static ObjectId sha1(String s) { return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes()); } @@ -200,8 +215,7 @@ public Task(ReplicateRefUpdate update) { this.update = update; - String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote; - taskKey = sha1(key).name(); + taskKey = update.sha1(); running = createDir(runningUpdates).resolve(taskKey); waiting = createDir(waitingUpdates).resolve(taskKey); } @@ -211,7 +225,7 @@ return taskKey; } - String json = GSON.toJson(update) + "\n"; + String json = gson.toJson(update) + "\n"; try { Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null); logger.atFine().log("CREATE %s %s", tmp, updateLog()); @@ -224,14 +238,18 @@ return taskKey; } - public void start() { - rename(waiting, running); + public boolean start() { + return rename(waiting, running); } public void reset() { rename(running, waiting); } + public void recover() { + rename(running, waiting); + } + public boolean isWaiting() { return Files.exists(waiting); } @@ -245,17 +263,19 @@ } } - private void rename(Path from, Path to) { + private boolean rename(Path from, Path to) { try { logger.atFine().log("RENAME %s to %s %s", from, to, updateLog()); Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + return true; } catch (IOException e) { logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey); + return false; } } private String updateLog() { - return String.format("(%s:%s => %s)", update.project, update.ref, update.uri); + return String.format("(%s:%s => %s)", update.project(), update.ref(), update.uri()); } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java index 9c56c8e..a9985d2 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UriUpdates.java
@@ -34,7 +34,7 @@ return getRefs().stream() .map( (ref) -> - new ReplicationTasksStorage.ReplicateRefUpdate( + ReplicationTasksStorage.ReplicateRefUpdate.create( getProjectNameKey().get(), ref, getURI(), getRemoteName())) .collect(Collectors.toList()); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationDoneEvent.java new file mode 100644 index 0000000..ebd7202 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationDoneEvent.java
@@ -0,0 +1,49 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import com.google.common.base.Objects; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.events.ProjectEvent; + +public class ProjectDeletionReplicationDoneEvent extends ProjectEvent { + public static final String TYPE = "project-deletion-replication-done"; + + private final String project; + + public ProjectDeletionReplicationDoneEvent(String project) { + super(TYPE); + this.project = project; + } + + @Override + public Project.NameKey getProjectNameKey() { + return Project.nameKey(project); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ProjectDeletionReplicationDoneEvent)) { + return false; + } + ProjectDeletionReplicationDoneEvent that = (ProjectDeletionReplicationDoneEvent) o; + return Objects.equal(project, that.project); + } + + @Override + public int hashCode() { + return Objects.hashCode(project); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationFailedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationFailedEvent.java new file mode 100644 index 0000000..da75208 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationFailedEvent.java
@@ -0,0 +1,58 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.events.ProjectEvent; +import org.eclipse.jgit.transport.URIish; + +public class ProjectDeletionReplicationFailedEvent extends ProjectEvent { + public static final String TYPE = "project-deletion-replication-failed"; + + private final String project; + private final String targetUri; + + public ProjectDeletionReplicationFailedEvent(String project, URIish targetUri) { + super(TYPE); + this.project = project; + this.targetUri = targetUri.toASCIIString(); + } + + @Override + public Project.NameKey getProjectNameKey() { + return Project.nameKey(project); + } + + @VisibleForTesting + public String getTargetUri() { + return targetUri; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ProjectDeletionReplicationFailedEvent)) { + return false; + } + ProjectDeletionReplicationFailedEvent that = (ProjectDeletionReplicationFailedEvent) o; + return Objects.equal(project, that.project) && Objects.equal(targetUri, that.targetUri); + } + + @Override + public int hashCode() { + return Objects.hashCode(project, targetUri); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationScheduledEvent.java new file mode 100644 index 0000000..48e621b --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationScheduledEvent.java
@@ -0,0 +1,58 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.events.ProjectEvent; +import org.eclipse.jgit.transport.URIish; + +public class ProjectDeletionReplicationScheduledEvent extends ProjectEvent { + public static final String TYPE = "project-deletion-replication-scheduled"; + + private final String project; + private final String targetUri; + + public ProjectDeletionReplicationScheduledEvent(String project, URIish targetUri) { + super(TYPE); + this.project = project; + this.targetUri = targetUri.toASCIIString(); + } + + @Override + public Project.NameKey getProjectNameKey() { + return Project.nameKey(project); + } + + @VisibleForTesting + public String getTargetUri() { + return targetUri; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ProjectDeletionReplicationScheduledEvent)) { + return false; + } + ProjectDeletionReplicationScheduledEvent that = (ProjectDeletionReplicationScheduledEvent) o; + return Objects.equal(project, that.project) && Objects.equal(targetUri, that.targetUri); + } + + @Override + public int hashCode() { + return Objects.hashCode(project, targetUri); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationSucceededEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationSucceededEvent.java new file mode 100644 index 0000000..6f881cf --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionReplicationSucceededEvent.java
@@ -0,0 +1,58 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.events.ProjectEvent; +import org.eclipse.jgit.transport.URIish; + +public class ProjectDeletionReplicationSucceededEvent extends ProjectEvent { + public static final String TYPE = "project-deletion-replication-succeeded"; + + private final String project; + private final String targetUri; + + public ProjectDeletionReplicationSucceededEvent(String project, URIish targetUri) { + super(TYPE); + this.project = project; + this.targetUri = targetUri.toASCIIString(); + } + + @Override + public Project.NameKey getProjectNameKey() { + return Project.nameKey(project); + } + + @VisibleForTesting + public String getTargetUri() { + return targetUri; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ProjectDeletionReplicationSucceededEvent)) { + return false; + } + ProjectDeletionReplicationSucceededEvent that = (ProjectDeletionReplicationSucceededEvent) o; + return Objects.equal(project, that.project) && Objects.equal(targetUri, that.targetUri); + } + + @Override + public int hashCode() { + return Objects.hashCode(project, targetUri); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionState.java new file mode 100644 index 0000000..e091915 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ProjectDeletionState.java
@@ -0,0 +1,96 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.FAILED; +import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.SCHEDULED; +import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.SUCCEEDED; +import static com.googlesource.gerrit.plugins.replication.events.ProjectDeletionState.ProjectDeletionStatus.TO_PROCESS; + +import com.google.gerrit.entities.Project; +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.server.events.EventDispatcher; +import com.google.gerrit.server.events.ProjectEvent; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.eclipse.jgit.transport.URIish; + +public class ProjectDeletionState { + public interface Factory { + ProjectDeletionState create(Project.NameKey project); + } + + private final DynamicItem<EventDispatcher> eventDispatcher; + private final Project.NameKey project; + private final ConcurrentMap<URIish, ProjectDeletionStatus> statusByURI = + new ConcurrentHashMap<>(); + + @Inject + public ProjectDeletionState( + DynamicItem<EventDispatcher> eventDispatcher, @Assisted Project.NameKey project) { + this.eventDispatcher = eventDispatcher; + this.project = project; + } + + public void setToProcess(URIish uri) { + statusByURI.put(uri, TO_PROCESS); + } + + public void setScheduled(URIish uri) { + setStatusAndBroadcastEvent( + uri, SCHEDULED, new ProjectDeletionReplicationScheduledEvent(project.get(), uri)); + } + + public void setSucceeded(URIish uri) { + setStatusAndBroadcastEvent( + uri, SUCCEEDED, new ProjectDeletionReplicationSucceededEvent(project.get(), uri)); + notifyIfDeletionDoneOnAllNodes(); + } + + public void setFailed(URIish uri) { + setStatusAndBroadcastEvent( + uri, FAILED, new ProjectDeletionReplicationFailedEvent(project.get(), uri)); + notifyIfDeletionDoneOnAllNodes(); + } + + private void setStatusAndBroadcastEvent( + URIish uri, ProjectDeletionStatus status, ProjectEvent event) { + statusByURI.put(uri, status); + eventDispatcher.get().postEvent(project, event); + } + + public void notifyIfDeletionDoneOnAllNodes() { + synchronized (statusByURI) { + if (!statusByURI.isEmpty() + && statusByURI.values().stream() + .noneMatch(s -> s.equals(TO_PROCESS) || s.equals(SCHEDULED))) { + + statusByURI.clear(); + eventDispatcher + .get() + .postEvent(project, new ProjectDeletionReplicationDoneEvent(project.get())); + } + } + } + + public enum ProjectDeletionStatus { + TO_PROCESS, + SCHEDULED, + FAILED, + SUCCEEDED; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java similarity index 82% rename from src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java index d1ab790..b0c554e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicatedEvent.java
@@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.googlesource.gerrit.plugins.replication; +package com.googlesource.gerrit.plugins.replication.events; + +import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.RefEvent; @@ -20,28 +22,31 @@ import java.util.Objects; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.RemoteRefUpdate.Status; +import org.eclipse.jgit.transport.URIish; public class RefReplicatedEvent extends RefEvent { public static final String TYPE = "ref-replicated"; public final String project; public final String ref; - public final String targetNode; + @Deprecated public final String targetNode; + public final String targetUri; public final String status; public final Status refStatus; public RefReplicatedEvent( String project, String ref, - String targetNode, + URIish targetUri, RefPushResult status, RemoteRefUpdate.Status refStatus) { super(TYPE); this.project = project; this.ref = ref; - this.targetNode = targetNode; + this.targetNode = resolveNodeName(targetUri); this.status = status.toString(); this.refStatus = refStatus; + this.targetUri = targetUri.toASCIIString(); } @Override @@ -69,6 +74,9 @@ if (!Objects.equals(event.targetNode, this.targetNode)) { return false; } + if (!Objects.equals(event.targetUri, this.targetUri)) { + return false; + } if (!Objects.equals(event.status, this.status)) { return false; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java similarity index 96% rename from src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java index e663194..75b8026 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/RefReplicationDoneEvent.java
@@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.googlesource.gerrit.plugins.replication; +package com.googlesource.gerrit.plugins.replication.events; import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.RefEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java similarity index 70% rename from src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java rename to src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java index 28f6b6b..4a1ade8 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/events/ReplicationScheduledEvent.java
@@ -12,23 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.googlesource.gerrit.plugins.replication; +package com.googlesource.gerrit.plugins.replication.events; + +import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.RefEvent; +import org.eclipse.jgit.transport.URIish; public class ReplicationScheduledEvent extends RefEvent { public static final String TYPE = "ref-replication-scheduled"; public final String project; public final String ref; - public final String targetNode; + @Deprecated public final String targetNode; + public final String targetUri; - public ReplicationScheduledEvent(String project, String ref, String targetNode) { + public ReplicationScheduledEvent(String project, String ref, URIish targetUri) { super(TYPE); this.project = project; this.ref = ref; - this.targetNode = targetNode; + this.targetNode = resolveNodeName(targetUri); + this.targetUri = targetUri.toASCIIString(); } @Override
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md index adb8d4c..ded9d84 100644 --- a/src/main/resources/Documentation/about.md +++ b/src/main/resources/Documentation/about.md
@@ -16,6 +16,12 @@ local path as replication target. This makes e.g. sense if a network share is mounted to which the repositories should be replicated. +In multi-primary scenario, any replication work which is already +in-flight or completed by the other nodes is not performed to +avoid extra work. This is because, the storage for replication +events is shared between multiple primaries.(The storage location +is specified in the config using: `replication.eventsDirectory`). + Replication of account data (NoteDb) ------------------------------------
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 13e10ea..af91032 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -86,6 +86,15 @@ : Number of refs, that are pushed during replication, to be logged. For printing all refs to the logs, use a value of 0. By default, 0. +gerrit.maxRefsToShow +: Number of refs, that are pushed during replication, to be shown + in the show-queue output. To show all refs, use a value of 0. + By default, 2, because whenever a new patchset is created there + are two refs (change ref and meta ref) eg. + + `(retry 1) push aaa.com:/git/test.git [refs/heads/b1 refs/heads/b2 (+2)]` + + gerrit.sshCommandTimeout : Timeout for SSH command execution. If 0, there is no timeout and the client waits indefinitely. By default, 0.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java new file mode 100644 index 0000000..90191f2 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -0,0 +1,463 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.google.common.collect.ForwardingIterator; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import org.junit.Before; +import org.junit.Test; + +public class ChainedSchedulerTest { + /** A simple {@link Runnable} that waits until the start() method is called. */ + public class WaitingRunnable implements Runnable { + protected final CountDownLatch start; + + public WaitingRunnable() { + this(new CountDownLatch(1)); + } + + public WaitingRunnable(CountDownLatch latch) { + this.start = latch; + } + + @Override + public void run() { + try { + start.await(); + } catch (InterruptedException e) { + missedAwaits.incrementAndGet(); + } + } + + public void start() { + start.countDown(); + } + } + + /** A simple {@link Runnable} that can be awaited to start to run. */ + public static class WaitableRunnable implements Runnable { + protected final CountDownLatch started = new CountDownLatch(1); + + @Override + public void run() { + started.countDown(); + } + + public boolean isStarted() { + return started.getCount() == 0; + } + + public boolean awaitStart(int itemsBefore) { + try { + return started.await(SECONDS_SYNCHRONIZE * itemsBefore, SECONDS); + } catch (InterruptedException e) { + return false; + } + } + } + + /** An {@link Iterator} wrapper which keeps track of how many times next() has been called. */ + public static class CountingIterator extends ForwardingIterator<String> { + public volatile int count = 0; + + protected Iterator<String> delegate; + + public CountingIterator(Iterator<String> delegate) { + this.delegate = delegate; + } + + @Override + public synchronized String next() { + count++; + return super.next(); + } + + @Override + protected Iterator<String> delegate() { + return delegate; + } + } + + /** A {@link ChainedScheduler.Runner} which keeps track of completion and counts. */ + public static class TestRunner implements ChainedScheduler.Runner<String> { + protected final AtomicInteger runCount = new AtomicInteger(0); + protected final CountDownLatch onDone = new CountDownLatch(1); + + @Override + public void run(String item) { + incrementAndGet(); + } + + public int runCount() { + return runCount.get(); + } + + @Override + public void onDone() { + onDone.countDown(); + } + + public boolean isDone() { + return onDone.getCount() <= 0; + } + + public boolean awaitDone(int items) { + try { + return onDone.await(items * SECONDS_SYNCHRONIZE, SECONDS); + } catch (InterruptedException e) { + return false; + } + } + + protected int incrementAndGet() { + return runCount.incrementAndGet(); + } + } + + /** + * A {@link TestRunner} that can be awaited to start to run and additionally will wait until + * increment() or runOnewRandomStarted() is called to complete. + */ + public class WaitingRunner extends TestRunner { + protected class RunContext extends WaitableRunnable { + CountDownLatch run = new CountDownLatch(1); + CountDownLatch ran = new CountDownLatch(1); + int count; + + @Override + public void run() { + super.run(); + try { + run.await(); + count = incrementAndGet(); + ran.countDown(); + } catch (InterruptedException e) { + missedAwaits.incrementAndGet(); + } + } + + public synchronized boolean startIfNotRunning() throws InterruptedException { + if (run.getCount() > 0) { + increment(); + return true; + } + return false; + } + + public synchronized int increment() throws InterruptedException { + run.countDown(); + ran.await(); // no timeout needed as RunContext.run() calls countDown unless interrupted + return count; + } + } + + protected final Map<String, RunContext> ctxByItem = new ConcurrentHashMap<>(); + + @Override + public void run(String item) { + context(item).run(); + } + + public void runOneRandomStarted() throws InterruptedException { + while (true) { + for (RunContext ctx : ctxByItem.values()) { + if (ctx.isStarted()) { + if (ctx.startIfNotRunning()) { + return; + } + } + } + MILLISECONDS.sleep(1); + } + } + + public boolean awaitStart(String item, int itemsBefore) { + return context(item).awaitStart(itemsBefore); + } + + public int increment(String item) throws InterruptedException { + return increment(item, 1); + } + + public int increment(String item, int itemsBefore) throws InterruptedException { + awaitStart(item, itemsBefore); + return context(item).increment(); + } + + protected RunContext context(String item) { + return ctxByItem.computeIfAbsent(item, k -> new RunContext()); + } + } + + // Time for one synchronization event such as await(), or variable + // incrementing across threads to take + public static final int SECONDS_SYNCHRONIZE = 3; + public static final int MANY_ITEMS_SIZE = 1000; + + public static String FIRST = item(1); + public static String SECOND = item(2); + public static String THIRD = item(3); + + public final AtomicInteger missedAwaits = new AtomicInteger(0); // non-zero signals an error + + @Before + public void setup() { + missedAwaits.set(0); + } + + @Test + public void emptyCompletesImmediately() { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + TestRunner runner = new TestRunner(); + List<String> items = new ArrayList<>(); + + new ChainedScheduler<>(executor, items.iterator(), runner); + assertThat(runner.awaitDone(1)).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(0); + } + + @Test + public void oneItemCompletes() throws Exception { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + TestRunner runner = new TestRunner(); + List<String> items = new ArrayList<>(); + items.add(FIRST); + + new ChainedScheduler<>(executor, items.iterator(), runner); + assertThat(runner.awaitDone(1)).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(1); + } + + @Test + public void manyItemsAllComplete() throws Exception { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + TestRunner runner = new TestRunner(); + List<String> items = createManyItems(); + + new ChainedScheduler<>(executor, items.iterator(), runner); + assertThat(runner.awaitDone(items.size())).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(items.size()); + } + + @Test + public void exceptionInTaskDoesNotAbortIteration() throws Exception { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + TestRunner runner = + new TestRunner() { + @Override + public void run(String item) { + super.run(item); + throw new RuntimeException(); + } + }; + List<String> items = new ArrayList<>(); + items.add(FIRST); + items.add(SECOND); + items.add(THIRD); + + new ChainedScheduler<>(executor, items.iterator(), runner); + assertThat(runner.awaitDone(items.size())).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(items.size()); + } + + @Test + public void onDoneNotCalledBeforeAllCompleted() throws Exception { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + WaitingRunner runner = new WaitingRunner(); + List<String> items = createManyItems(); + + new ChainedScheduler<>(executor, items.iterator(), runner); + for (int i = 1; i <= items.size(); i++) { + assertThat(runner.isDone()).isEqualTo(false); + runner.runOneRandomStarted(); + } + + assertThat(runner.awaitDone(items.size())).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(items.size()); + assertThat(missedAwaits.get()).isEqualTo(0); + } + + @Test + public void otherTasksOnlyEverWaitForAtMostOneRunningPlusOneWaiting() throws Exception { + for (int threads = 1; threads <= 10; threads++) { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + WaitingRunner runner = new WaitingRunner(); + List<String> items = createItems(threads + 1 /* Running */ + 1 /* Waiting */); + CountingIterator it = new CountingIterator(items.iterator()); + + new ChainedScheduler<>(executor, it, runner); + assertThat(runner.awaitStart(FIRST, 1)).isEqualTo(true); // Confirms at least one Running + assertThat(it.count).isGreaterThan(1); // Confirms at least one extra Waiting or Running + assertThat(it.count).isLessThan(items.size()); // Confirms at least one still not queued + + WaitableRunnable external = new WaitableRunnable(); + executor.execute(external); + assertThat(external.isStarted()).isEqualTo(false); + + // Completes 2, (at most one Running + 1 Waiting) + assertThat(runner.increment(FIRST)).isEqualTo(1); // Was Running + assertThat(runner.increment(SECOND)).isEqualTo(2); // Was Waiting + // Asserts that the one that still needed to be queued is not blocking this external task + assertThat(external.awaitStart(1)).isEqualTo(true); + + for (int i = 3; i <= items.size(); i++) { + runner.increment(item(i)); + } + assertThat(missedAwaits.get()).isEqualTo(0); + } + } + + @Test + public void saturatesManyFreeThreads() throws Exception { + int threads = 10; + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads); + WaitingRunner runner = new WaitingRunner(); + List<String> items = createManyItems(); + + new ChainedScheduler<>(executor, items.iterator(), runner); + + for (int j = 1; j <= MANY_ITEMS_SIZE; j += threads) { + // If #threads items can start before any complete, it proves #threads are + // running in parallel and saturating all available threads. + for (int i = j; i < j + threads; i++) { + assertThat(runner.awaitStart(item(i), threads)).isEqualTo(true); + } + for (int i = j; i < j + threads; i++) { + assertThat(runner.increment(item(i))).isEqualTo(i); + } + } + + assertThat(runner.awaitDone(threads)).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(items.size()); + assertThat(missedAwaits.get()).isEqualTo(0); + } + + @Test + public void makesProgressEvenWhenSaturatedByOtherTasks() throws Exception { + int blockSize = 5; // how many batches to queue at once + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(blockSize); + List<String> items = createManyItems(); + WaitingRunner runner = new WaitingRunner(); + + int batchSize = 5; // how many tasks are started concurrently + Queue<CountDownLatch> batches = new LinkedList<>(); + for (int b = 0; b < blockSize; b++) { + batches.add(executeWaitingRunnableBatch(batchSize, executor)); + } + + new ChainedScheduler<>(executor, items.iterator(), runner); + + for (int i = 1; i <= items.size(); i++) { + for (int b = 0; b < blockSize; b++) { + // Ensure saturation by always having at least a full thread count of + // other tasks waiting in the queue after the waiting item so that when + // one batch is executed, and the item then executes, there will still + // be at least a full batch waiting. + batches.add(executeWaitingRunnableBatch(batchSize, executor)); + batches.remove().countDown(); + } + assertThat(runner.increment(item(i), batchSize)).isEqualTo(i); // Assert progress can be made + } + assertThat(runner.runCount()).isEqualTo(items.size()); + + while (batches.size() > 0) { + batches.remove().countDown(); + } + assertThat(missedAwaits.get()).isEqualTo(0); + } + + @Test + public void forwardingRunnerForwards() throws Exception { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + TestRunner runner = new TestRunner(); + List<String> items = createManyItems(); + + new ChainedScheduler<>( + executor, items.iterator(), new ChainedScheduler.ForwardingRunner<>(runner)); + assertThat(runner.awaitDone(items.size())).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(items.size()); + } + + @Test + public void streamSchedulerClosesStream() throws Exception { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + WaitingRunner runner = new WaitingRunner(); + List<String> items = new ArrayList<>(); + items.add(FIRST); + items.add(SECOND); + + final AtomicBoolean closed = new AtomicBoolean(false); + Object closeRecorder = + new Object() { + @SuppressWarnings("unused") // Called via reflection + public void close() { + closed.set(true); + } + }; + @SuppressWarnings("unchecked") // Stream.class is converted to Stream<String>.class + Stream<String> stream = ForwardingProxy.create(Stream.class, items.stream(), closeRecorder); + + new ChainedScheduler.StreamScheduler<>(executor, stream, runner); + assertThat(closed.get()).isEqualTo(false); + + // Since there is only a single thread, the Stream cannot get closed before this runs + runner.increment(FIRST); + // The Stream should get closed as the last item (SECOND) runs, before its runner is called + runner.increment(SECOND); // Ensure the last item's runner has already been called + assertThat(runner.awaitDone(items.size())).isEqualTo(true); + assertThat(runner.runCount()).isEqualTo(items.size()); + assertThat(closed.get()).isEqualTo(true); + } + + protected CountDownLatch executeWaitingRunnableBatch( + int batchSize, ScheduledThreadPoolExecutor executor) { + CountDownLatch latch = new CountDownLatch(1); + for (int e = 0; e < batchSize; e++) { + executor.execute(new WaitingRunnable(latch)); + } + return latch; + } + + protected static List<String> createManyItems() { + return createItems(MANY_ITEMS_SIZE); + } + + protected static List<String> createItems(int count) { + List<String> items = new ArrayList<>(); + for (int i = 1; i <= count; i++) { + items.add(item(i)); + } + return items; + } + + protected static String item(int i) { + return "Item #" + i; + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java new file mode 100644 index 0000000..a1f61fe --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -0,0 +1,81 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * A ForwardingProxy creates a Proxy which forwards all method calls to its delegate except for + * calls to methods which are implemented by its overrider's class. + * + * <p>Using this Proxy class makes it possible to use the delegate pattern on any interface without + * having to implement any of the interface's methods which directly forward their calls to the + * delegate. Using this is intended to make forwarding automated, easy, and less error prone by + * making it possible to implement the delegate pattern with an overrider object which only + * implements those methods which need overridden functionality and which will not directly forward + * their calls to the delegate. + * + * <p>The overrider object will be assumed to not implement any default java Object methods which + * are not overridden, as that would likely not be desirable behavior, and thus the Proxy will not + * forward those methods to the overrider unless the overrider overrides them. + * + * <p>If an overrider needs to make calls to the delegate, this can be achieved by passing the + * delegate into the overrider during construction. + */ +public class ForwardingProxy { + protected static class Handler<T> implements InvocationHandler { + protected T delegate; + protected Object overrider; + + protected Handler(T delegate, Object overrider) { + this.delegate = delegate; + this.overrider = overrider; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Method overriden = getOverriden(method); + if (overriden != null) { + return overriden.invoke(overrider, args); + } + return method.invoke(delegate, args); + } + + protected Method getOverriden(Method method) { + try { + Method implementedByOverrider = + overrider.getClass().getMethod(method.getName(), method.getParameterTypes()); + + // Only allow defined (non java defaulted) methods to actually be overridden + if (Object.class != implementedByOverrider.getDeclaringClass()) { + return implementedByOverrider; + } + } catch (NoSuchMethodException | SecurityException e) { + } + return null; + } + } + + @SuppressWarnings("unchecked") // newProxyInstance returns Object + public static <T> T create(Class<T> toProxy, T delegate, Object overrider) { + return (T) + Proxy.newProxyInstance( + delegate.getClass().getClassLoader(), + new Class[] {toProxy}, + new Handler<>(delegate, overrider)); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java index 2ee9a39..43d97c1 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -23,6 +23,8 @@ import com.google.gerrit.server.permissions.PermissionBackendException; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; +import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent; import java.net.URISyntaxException; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.URIish; @@ -41,18 +43,19 @@ @Test public void headRefReplicated() throws URISyntaxException, PermissionBackendException { + URIish completeUri = new URIish("git://someHost/basePath/someProject.git"); RefReplicatedEvent expectedEvent = new RefReplicatedEvent( "someProject", "refs/heads/master", - "someHost", + completeUri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); gitUpdateProcessing.onRefReplicatedToOneNode( "someProject", "refs/heads/master", - new URIish("git://someHost/someProject.git"), + completeUri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK); verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent)); @@ -60,18 +63,19 @@ @Test public void changeRefReplicated() throws URISyntaxException, PermissionBackendException { + URIish completeUri = new URIish("git://someHost/basePath/someProject.git"); RefReplicatedEvent expectedEvent = new RefReplicatedEvent( "someProject", "refs/changes/01/1/1", - "someHost", + completeUri, RefPushResult.FAILED, RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD); gitUpdateProcessing.onRefReplicatedToOneNode( "someProject", "refs/changes/01/1/1", - new URIish("git://someHost/someProject.git"), + completeUri, RefPushResult.FAILED, RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD); verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java index afe1d82..9606371 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -23,7 +23,10 @@ import com.google.gerrit.acceptance.WaitUtil; import com.google.gerrit.acceptance.testsuite.project.ProjectOperations; import com.google.gerrit.entities.Project; +import com.google.gerrit.extensions.api.changes.NotifyHandling; +import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.server.git.LocalDiskRepositoryManager; import com.google.inject.Inject; import java.io.IOException; import java.nio.file.Path; @@ -51,11 +54,14 @@ protected static final Optional<String> ALL_PROJECTS = Optional.empty(); protected static final int TEST_REPLICATION_DELAY_SECONDS = 1; + protected static final int TEST_LONG_REPLICATION_DELAY_SECONDS = 30; protected static final int TEST_REPLICATION_RETRY_MINUTES = 1; protected static final int TEST_PUSH_TIME_SECONDS = 1; protected static final int TEST_PROJECT_CREATION_SECONDS = 10; protected static final Duration TEST_PUSH_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS); + protected static final Duration TEST_PUSH_TIMEOUT_LONG = + Duration.ofSeconds(TEST_LONG_REPLICATION_DELAY_SECONDS + TEST_PUSH_TIME_SECONDS); protected static final Duration TEST_NEW_PROJECT_TIMEOUT = Duration.ofSeconds( (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) @@ -66,6 +72,18 @@ protected Path gitPath; protected FileBasedConfig config; + protected void setDistributionInterval(int interval) throws IOException { + config.setInt("replication", null, "distributionInterval", interval); + config.save(); + } + + protected String getProjectUri(Project.NameKey project) throws Exception { + return ((LocalDiskRepositoryManager) repoManager) + .getBasePath(project) + .resolve(project.get() + ".git") + .toString(); + } + protected void setReplicationDestination( String remoteName, String replicaSuffix, Optional<String> project) throws IOException { setReplicationDestination( @@ -230,4 +248,24 @@ config.save(); } } + + protected ProjectDeletedListener.Event projectDeletedEvent(String projectNameDeleted) { + return new ProjectDeletedListener.Event() { + @Override + public String getProjectName() { + return projectNameDeleted; + } + + @Override + public NotifyHandling getNotify() { + return NotifyHandling.NONE; + } + }; + } + + protected void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion) + throws IOException { + config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion); + config.save(); + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java new file mode 100644 index 0000000..5ade68d --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDistributorIT.java
@@ -0,0 +1,124 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.acceptance.WaitUtil; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.git.WorkQueue; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.RefUpdate; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.URIish; +import org.junit.Test; + +/** + * The tests in this class ensure the correctness of {@link + * com.googlesource.gerrit.plugins.replication.ReplicationQueue.Distributor} + */ +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationDistributorIT extends ReplicationStorageDaemon { + private static final int TEST_DISTRIBUTION_INTERVAL_SECONDS = 3; + private static final int TEST_DISTRIBUTION_DURATION_SECONDS = 1; + private static final int TEST_DISTRIBUTION_CYCLE_SECONDS = + TEST_DISTRIBUTION_INTERVAL_SECONDS + TEST_DISTRIBUTION_DURATION_SECONDS; + + @Override + public void setUpTestPlugin() throws Exception { + initConfig(); + setDistributionInterval(TEST_DISTRIBUTION_INTERVAL_SECONDS); + super.setUpTestPlugin(); + } + + @Test + public void distributorAddingTaskFromStorage() throws Exception { + String remote = "foo"; + String replica = "replica"; + String master = "refs/heads/master"; + String newBranch = "refs/heads/foo_branch"; + Project.NameKey targetProject = createTestProject(project + replica); + ReplicationTasksStorage.ReplicateRefUpdate ref = + ReplicationTasksStorage.ReplicateRefUpdate.create( + project.get(), newBranch, new URIish(getProjectUri(targetProject)), remote); + createBranch(project, master, newBranch); + setReplicationDestination(remote, replica, ALL_PROJECTS); + reloadConfig(); + + tasksStorage.create(ref); // Mimics RefUpdate inserted into storage by other Primary + WaitUtil.waitUntil( + () -> getProjectTasks().size() != 0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS)); + + List<WorkQueue.Task<?>> tasks = getProjectTasks(); + assertThat(tasks).hasSize(1); // ReplicationTask for the created ref in queue + assertThat(waitForProjectTaskCount(0, TEST_PUSH_TIMEOUT)).isTrue(); + + try (Repository targetRepo = repoManager.openRepository(targetProject); + Repository sourceRepo = repoManager.openRepository(project)) { + Ref masterRef = getRef(sourceRepo, master); + Ref targetBranchRef = getRef(targetRepo, newBranch); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId()); + } + } + + @Test + public void distributorPrunesTaskFromWorkQueue() throws Exception { + createTestProject(project + "replica"); + setReplicationDestination("foo", "replica", ALL_PROJECTS, Integer.MAX_VALUE); + reloadConfig(); + + String newBranch = "refs/heads/foo_branch"; + createBranch(project, "refs/heads/master", newBranch); + + deleteWaitingReplicationTasks(newBranch); // This simulates the work being started by other node + + assertThat(waitForProjectTaskCount(0, Duration.ofSeconds(TEST_DISTRIBUTION_CYCLE_SECONDS))) + .isTrue(); + } + + private List<WorkQueue.Task<?>> getProjectTasks() { + return getInstance(WorkQueue.class).getTasks().stream() + .filter(t -> t instanceof WorkQueue.ProjectTask) + .collect(Collectors.toList()); + } + + private void createBranch(Project.NameKey project, String fromRef, String refToCreate) + throws Exception { + try (Repository repo = repoManager.openRepository(project)) { + Ref from = repo.exactRef(fromRef); + RefUpdate createBranch = repo.updateRef(refToCreate); + createBranch.setNewObjectId(from.getObjectId()); + createBranch.update(); + } + } + + private boolean waitForProjectTaskCount(int count, Duration duration) { + try { + WaitUtil.waitUntil(() -> getProjectTasks().size() == count, duration); + return true; + } catch (InterruptedException e) { + return false; + } + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java new file mode 100644 index 0000000..2d69a47 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationEventsIT.java
@@ -0,0 +1,355 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.base.Objects; +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.Sandboxed; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.acceptance.WaitUtil; +import com.google.gerrit.entities.BranchNameKey; +import com.google.gerrit.entities.Project; +import com.google.gerrit.extensions.api.projects.BranchInput; +import com.google.gerrit.extensions.events.ProjectDeletedListener; +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.events.EventDispatcher; +import com.google.gerrit.server.events.EventGsonProvider; +import com.google.gerrit.server.events.ProjectEvent; +import com.google.gerrit.server.events.RefEvent; +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationDoneEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationFailedEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationScheduledEvent; +import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent; +import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent; +import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent; +import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.function.Supplier; +import org.eclipse.jgit.transport.URIish; +import org.junit.Before; +import org.junit.Test; + +@UseLocalDisk +@Sandboxed +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationEventsIT extends ReplicationDaemon { + private static final Duration TEST_POST_EVENT_TIMEOUT = Duration.ofSeconds(1); + + @Inject private DynamicSet<ProjectDeletedListener> deletedListeners; + @Inject private DynamicItem<EventDispatcher> eventDispatcher; + private TestDispatcher testDispatcher; + private Gson eventGson; + + @Before + public void setup() throws Exception { + initConfig(); + setReplicationDestination( + "remote1", + "suffix1", + Optional.of("not-used-project")); // Simulates a full replication.config initialization + setUpTestPlugin(); + testDispatcher = new TestDispatcher(); + eventDispatcher.set(testDispatcher, eventDispatcher.getPluginName()); + eventGson = new EventGsonProvider().get(); + } + + @Test + public void replicateNewChangeSendsEvents() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Result pushResult = createChange(); + String sourceRef = pushResult.getPatchSet().refName(); + String metaRef = pushResult.getChange().notes().getRefName(); + BranchNameKey changeBranch = BranchNameKey.create(project, sourceRef); + BranchNameKey metaBranch = BranchNameKey.create(project, metaRef); + + assertThat(testDispatcher.getEvents(changeBranch, ReplicationScheduledEvent.class)).hasSize(1); + assertThat(testDispatcher.getEvents(metaBranch, ReplicationScheduledEvent.class)).hasSize(1); + + isPushCompleted(targetProject, sourceRef, TEST_PUSH_TIMEOUT); + isPushCompleted(targetProject, metaRef, TEST_PUSH_TIMEOUT); + + waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), metaRef); + waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), sourceRef); + assertThat(testDispatcher.getEvents(RefReplicatedEvent.class).size()).isEqualTo(2); + + waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), metaRef); + waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), sourceRef); + assertThat(testDispatcher.getEvents(RefReplicationDoneEvent.class).size()).isEqualTo(2); + } + + @Test + public void replicateNewBranchSendsEvents() throws Exception { + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Project.NameKey targetProject = createTestProject(project + "replica"); + String newBranch = "refs/heads/mybranch"; + BranchNameKey branchName = BranchNameKey.create(project, newBranch); + String master = "refs/heads/master"; + BranchInput input = new BranchInput(); + input.revision = master; + gApi.projects().name(project.get()).branch(newBranch).create(input); + + assertThat(testDispatcher.getEvents(branchName, ReplicationScheduledEvent.class)).hasSize(1); + + isPushCompleted(targetProject, newBranch, TEST_PUSH_TIMEOUT); + + waitForRefEvent(() -> testDispatcher.getEvents(RefReplicatedEvent.class), newBranch); + assertThat(testDispatcher.getEvents(RefReplicatedEvent.class).size()).isEqualTo(1); + + waitForRefEvent(() -> testDispatcher.getEvents(RefReplicationDoneEvent.class), newBranch); + assertThat(testDispatcher.getEvents(RefReplicationDoneEvent.class).size()).isEqualTo(1); + } + + @Test + public void shouldEmitProjectDeletionEventsForOneRemote() throws Exception { + String projectName = project.get(); + setReplicationTarget("replica", project.get()); + + reloadConfig(); + + for (ProjectDeletedListener l : deletedListeners) { + l.onProjectDeleted(projectDeletedEvent(projectName)); + } + + List<ProjectDeletionReplicationScheduledEvent> scheduledEvents = + testDispatcher.getEvents(project, ProjectDeletionReplicationScheduledEvent.class); + assertThat(scheduledEvents).hasSize(1); + + assertThatAnyMatch( + scheduledEvents, + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica.git")); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), 1); + + assertThatAnyMatch( + testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica.git")); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), 1); + + assertThatAnyMatch( + testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), + e -> project.equals(e.getProjectNameKey())); + } + + @Test + public void shouldEmitProjectDeletionEventsForMultipleRemotesWhenSucceeding() throws Exception { + String projectName = project.get(); + setReplicationTarget("replica1", projectName); + setReplicationTarget("replica2", projectName); + + reloadConfig(); + + for (ProjectDeletedListener l : deletedListeners) { + l.onProjectDeleted(projectDeletedEvent(projectName)); + } + + List<ProjectDeletionReplicationScheduledEvent> scheduledEvents = + testDispatcher.getEvents(project, ProjectDeletionReplicationScheduledEvent.class); + assertThat(scheduledEvents).hasSize(2); + + assertThatAnyMatch( + scheduledEvents, + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git")); + assertThatAnyMatch( + scheduledEvents, + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica2.git")); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), 2); + + List<ProjectDeletionReplicationSucceededEvent> successEvents = + testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class); + + assertThatAnyMatch( + successEvents, + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git")); + assertThatAnyMatch( + successEvents, + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica2.git")); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), 1); + + assertThatAnyMatch( + testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), + e -> project.equals(e.getProjectNameKey())); + } + + @Test + public void shouldEmitProjectDeletionEventsForMultipleRemotesWhenFailing() throws Exception { + String projectName = project.get(); + setReplicationTarget("replica1", projectName); + + setReplicationDestination( + "not-existing-replica", "not-existing-replica", Optional.of(projectName)); + setProjectDeletionReplication("not-existing-replica", true); + + reloadConfig(); + + for (ProjectDeletedListener l : deletedListeners) { + l.onProjectDeleted(projectDeletedEvent(projectName)); + } + + List<ProjectDeletionReplicationScheduledEvent> scheduledEvents = + testDispatcher.getEvents(project, ProjectDeletionReplicationScheduledEvent.class); + assertThat(scheduledEvents).hasSize(2); + + assertThatAnyMatch( + scheduledEvents, + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git")); + assertThatAnyMatch( + scheduledEvents, + e -> + project.equals(e.getProjectNameKey()) + && e.getTargetUri().endsWith("not-existing-replica.git")); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), 1); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationFailedEvent.class), 1); + + assertThatAnyMatch( + testDispatcher.getEvents(project, ProjectDeletionReplicationSucceededEvent.class), + e -> project.equals(e.getProjectNameKey()) && e.getTargetUri().endsWith("replica1.git")); + + assertThatAnyMatch( + testDispatcher.getEvents(project, ProjectDeletionReplicationFailedEvent.class), + e -> + project.equals(e.getProjectNameKey()) + && e.getTargetUri().endsWith("not-existing-replica.git")); + + waitForProjectEvent( + () -> testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), 1); + + assertThatAnyMatch( + testDispatcher.getEvents(project, ProjectDeletionReplicationDoneEvent.class), + e -> project.equals(e.getProjectNameKey())); + } + + @Test + public void shouldSerializeObjectsHavingProjectDeletionReplicationScheduledEventAsField() + throws Exception { + EventWrapper origEvent = + new EventWrapper( + new ProjectDeletionReplicationScheduledEvent( + project.get(), new URIish(String.format("git://someHost/%s.git", project.get())))); + + EventWrapper gotEvent = eventGson.fromJson(eventGson.toJson(origEvent), origEvent.getClass()); + + assertThat(origEvent).isEqualTo(gotEvent); + } + + @Test + public void shouldSerializeObjectsHavingProjectDeletionReplicationSucceededEventAsField() + throws Exception { + EventWrapper origEvent = + new EventWrapper( + new ProjectDeletionReplicationSucceededEvent( + project.get(), new URIish(String.format("git://someHost/%s.git", project.get())))); + + EventWrapper gotEvent = eventGson.fromJson(eventGson.toJson(origEvent), origEvent.getClass()); + + assertThat(origEvent).isEqualTo(gotEvent); + } + + @Test + public void shouldSerializeObjectsHavingProjectDeletionReplicationFailedEventAsField() + throws Exception { + EventWrapper origEvent = + new EventWrapper( + new ProjectDeletionReplicationFailedEvent( + project.get(), new URIish(String.format("git://someHost/%s.git", project.get())))); + + EventWrapper gotEvent = eventGson.fromJson(eventGson.toJson(origEvent), origEvent.getClass()); + + assertThat(origEvent).isEqualTo(gotEvent); + } + + @Test + public void shouldSerializeObjectsHavingProjectDeletionReplicationDoneEventAsField() { + EventWrapper origEvent = + new EventWrapper(new ProjectDeletionReplicationDoneEvent(project.get())); + + EventWrapper gotEvent = eventGson.fromJson(eventGson.toJson(origEvent), origEvent.getClass()); + + assertThat(origEvent).isEqualTo(gotEvent); + } + + private <T extends RefEvent> void waitForRefEvent(Supplier<List<T>> events, String refName) + throws InterruptedException { + WaitUtil.waitUntil( + () -> events.get().stream().filter(e -> refName.equals(e.getRefName())).count() == 1, + TEST_POST_EVENT_TIMEOUT); + } + + private <T extends ProjectEvent> void waitForProjectEvent(Supplier<List<T>> events, int count) + throws InterruptedException { + WaitUtil.waitUntil(() -> events.get().size() == count, TEST_POST_EVENT_TIMEOUT); + } + + private Project.NameKey setReplicationTarget(String replica, String ofProject) throws Exception { + Project.NameKey replicaProject = createTestProject(String.format("%s%s", ofProject, replica)); + setReplicationDestination(replica, replica, Optional.of(ofProject)); + setProjectDeletionReplication(replica, true); + return replicaProject; + } + + private <T extends ProjectEvent> void assertThatAnyMatch(List<T> events, Predicate<T> p) { + assertThat(events.stream().anyMatch(p)).isTrue(); + } + + private static class EventWrapper { + private final Event event; + + public EventWrapper(Event event) { + this.event = event; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof EventWrapper)) { + return false; + } + EventWrapper eventWrapper = (EventWrapper) o; + return Objects.equal(event, eventWrapper.event); + } + + @Override + public int hashCode() { + return Objects.hashCode(event); + } + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java index 536080e..1c1f983 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFanoutIT.java
@@ -189,8 +189,9 @@ private List<ReplicateRefUpdate> listWaitingTasks(String refRegex) { Pattern refmaskPattern = Pattern.compile(refRegex); - return tasksStorage.listWaiting().stream() - .filter(task -> refmaskPattern.matcher(task.ref).matches()) + return tasksStorage + .streamWaiting() + .filter(task -> refmaskPattern.matcher(task.ref()).matches()) .collect(toList()); } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index ba6f94d..17c8933 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -23,16 +23,23 @@ import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.acceptance.WaitUtil; import com.google.gerrit.entities.Project; -import com.google.gerrit.extensions.api.changes.NotifyHandling; import com.google.gerrit.extensions.api.projects.BranchInput; import com.google.gerrit.extensions.common.ProjectInfo; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; -import java.io.IOException; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; @@ -87,21 +94,8 @@ setProjectDeletionReplication("foo", true); reloadConfig(); - ProjectDeletedListener.Event event = - new ProjectDeletedListener.Event() { - @Override - public String getProjectName() { - return projectNameDeleted; - } - - @Override - public NotifyHandling getNotify() { - return NotifyHandling.NONE; - } - }; - for (ProjectDeletedListener l : deletedListeners) { - l.onProjectDeleted(event); + l.onProjectDeleted(projectDeletedEvent(projectNameDeleted)); } waitUntil(() -> !nonEmptyProjectExists(replicaProject)); @@ -232,6 +226,75 @@ } @Test + public void pushAllWait() throws Exception { + createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + ReplicationState state = new ReplicationState(NO_OP); + + Future<?> future = + plugin + .getSysInjector() + .getInstance(PushAll.Factory.class) + .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false) + .schedule(0, TimeUnit.SECONDS); + + future.get(); + state.waitForReplication(); + } + + @Test + public void pushAllWaitCancelNotRunningTask() throws Exception { + createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + ReplicationState state = new ReplicationState(NO_OP); + + Future<?> future = + plugin + .getSysInjector() + .getInstance(PushAll.Factory.class) + .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false) + .schedule(0, TimeUnit.SECONDS); + + CountDownLatch latch = new CountDownLatch(1); + Executor service = Executors.newSingleThreadExecutor(); + service.execute( + new Runnable() { + @Override + public void run() { + try { + future.get(); + state.waitForReplication(); + latch.countDown(); + } catch (Exception e) { + // fails the test because we don't countDown + } + } + }); + + // Cancel the replication task + waitUntil(() -> getProjectTasks().size() != 0); + WorkQueue.Task<?> task = getProjectTasks().get(0); + assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING); + task.cancel(false); + + // Confirm our waiting thread completed + boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout + assertThat(receivedSignal).isTrue(); + } + + private List<WorkQueue.Task<?>> getProjectTasks() { + return getInstance(WorkQueue.class).getTasks().stream() + .filter(t -> t instanceof WorkQueue.ProjectTask) + .collect(Collectors.toList()); + } + + @Test public void shouldReplicateHeadUpdate() throws Exception { setReplicationDestination("foo", "replica", ALL_PROJECTS); reloadConfig(); @@ -363,12 +426,6 @@ } } - private void setProjectDeletionReplication(String remoteName, boolean replicateProjectDeletion) - throws IOException { - config.setBoolean("remote", remoteName, "replicateProjectDeletions", replicateProjectDeletion); - config.save(); - } - private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java new file mode 100644 index 0000000..f549f47 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageDaemon.java
@@ -0,0 +1,87 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static java.util.stream.Collectors.toList; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class can be extended by any ReplicationStorage*IT class and provides common setup and + * helper methods. + */ +public class ReplicationStorageDaemon extends ReplicationDaemon { + protected static final int TEST_TASK_FINISH_SECONDS = 1; + protected static final int TEST_REPLICATION_MAX_RETRIES = 1; + protected static final Duration TEST_TASK_FINISH_TIMEOUT = + Duration.ofSeconds(TEST_TASK_FINISH_SECONDS); + protected static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT = + Duration.ofSeconds( + (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + * TEST_REPLICATION_MAX_RETRIES + + 10); + protected ReplicationTasksStorage tasksStorage; + protected DestinationsCollection destinationCollection; + protected ReplicationConfig replicationConfig; + + @Override + public void setUpTestPlugin() throws Exception { + initConfig(); + setReplicationDestination( + "remote1", + "suffix1", + Optional.of("not-used-project")); // Simulates a full replication.config initialization + super.setUpTestPlugin(); + tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); + destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class); + replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class); + } + + protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaitingReplicationTasks( + String refRegex) { + Pattern refmaskPattern = Pattern.compile(refRegex); + return tasksStorage + .streamWaiting() + .filter(task -> refmaskPattern.matcher(task.ref()).matches()) + .collect(toList()); + } + + protected void deleteWaitingReplicationTasks(String refRegex) { + Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates"); + Path waitingUpdates = refUpdates.resolve("waiting"); + for (ReplicationTasksStorage.ReplicateRefUpdate r : listWaitingReplicationTasks(refRegex)) { + try { + Files.deleteIfExists(waitingUpdates.resolve(r.sha1())); + } catch (IOException e) { + throw new RuntimeException("Couldn't delete waiting task", e); + } + } + } + + protected List<ReplicationTasksStorage.ReplicateRefUpdate> listWaiting() { + return tasksStorage.streamWaiting().collect(Collectors.toList()); + } + + protected List<ReplicationTasksStorage.ReplicateRefUpdate> listRunning() { + return tasksStorage.streamRunning().collect(Collectors.toList()); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java index 01f20b4..e2e1e21 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -16,7 +16,6 @@ import static com.google.common.truth.Truth.assertThat; import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.NO_OP; -import static java.util.stream.Collectors.toList; import com.google.gerrit.acceptance.TestPlugin; import com.google.gerrit.acceptance.UseLocalDisk; @@ -30,7 +29,6 @@ import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -51,32 +49,7 @@ @TestPlugin( name = "replication", sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") -public class ReplicationStorageIT extends ReplicationDaemon { - protected static final int TEST_TASK_FINISH_SECONDS = 1; - protected static final int TEST_REPLICATION_MAX_RETRIES = 1; - protected static final Duration TEST_TASK_FINISH_TIMEOUT = - Duration.ofSeconds(TEST_TASK_FINISH_SECONDS); - private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT = - Duration.ofSeconds( - (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) - * TEST_REPLICATION_MAX_RETRIES - + 10); - protected ReplicationTasksStorage tasksStorage; - private DestinationsCollection destinationCollection; - private ReplicationConfig replicationConfig; - - @Override - public void setUpTestPlugin() throws Exception { - initConfig(); - setReplicationDestination( - "remote1", - "suffix1", - Optional.of("not-used-project")); // Simulates a full replication.config initialization - super.setUpTestPlugin(); - tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); - destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class); - replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class); - } +public class ReplicationStorageIT extends ReplicationStorageDaemon { @Test public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception { @@ -119,7 +92,7 @@ reloadConfig(); String changeRef = createChange().getPatchSet().refName(); - changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1) + changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1) .forEach( (update) -> { try { @@ -148,7 +121,7 @@ reloadConfig(); String changeRef = createChange().getPatchSet().refName(); - changeReplicationTasksForRemote(tasksStorage.listWaiting().stream(), changeRef, remote1) + changeReplicationTasksForRemote(tasksStorage.streamWaiting(), changeRef, remote1) .forEach( (update) -> { try { @@ -232,11 +205,14 @@ .getInstance(ReplicationQueue.class) .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false); - assertThat(tasksStorage.listWaiting()).hasSize(1); - for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) { - assertThat(task.uri).isEqualTo(expectedURI); - assertThat(task.ref).isEqualTo(PushOne.ALL_REFS); - } + assertThat(listWaiting()).hasSize(1); + tasksStorage + .streamWaiting() + .forEach( + (task) -> { + assertThat(task.uri()).isEqualTo(expectedURI); + assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS); + }); } @Test @@ -254,11 +230,14 @@ .getInstance(ReplicationQueue.class) .scheduleFullSync(project, urlMatch, new ReplicationState(NO_OP), false); - assertThat(tasksStorage.listWaiting()).hasSize(1); - for (ReplicationTasksStorage.ReplicateRefUpdate task : tasksStorage.listWaiting()) { - assertThat(task.uri).isEqualTo(expectedURI); - assertThat(task.ref).isEqualTo(PushOne.ALL_REFS); - } + assertThat(listWaiting()).hasSize(1); + tasksStorage + .streamWaiting() + .forEach( + (task) -> { + assertThat(task.uri()).isEqualTo(expectedURI); + assertThat(task.ref()).isEqualTo(PushOne.ALL_REFS); + }); } @Test @@ -277,13 +256,13 @@ config.setInt("remote", "task_cleanup_project", "replicationRetry", 0); config.save(); reloadConfig(); - assertThat(tasksStorage.listRunning()).hasSize(0); + assertThat(listRunning()).hasSize(0); Project.NameKey sourceProject = createTestProject("task_cleanup_project"); WaitUtil.waitUntil( () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")), TEST_NEW_PROJECT_TIMEOUT); - WaitUtil.waitUntil(() -> tasksStorage.listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT); + WaitUtil.waitUntil(() -> listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT); } @Test @@ -292,7 +271,7 @@ config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0); config.save(); reloadConfig(); - assertThat(tasksStorage.listRunning()).hasSize(0); + assertThat(listRunning()).hasSize(0); Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project"); WaitUtil.waitUntil( @@ -370,22 +349,16 @@ private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote( String changeRef, String remote) { - return tasksStorage.listWaiting().stream() - .filter(task -> changeRef.equals(task.ref)) - .filter(task -> remote.equals(task.remote)); + return tasksStorage + .streamWaiting() + .filter(task -> changeRef.equals(task.ref())) + .filter(task -> remote.equals(task.remote())); } private Stream<ReplicateRefUpdate> changeReplicationTasksForRemote( Stream<ReplicateRefUpdate> updates, String changeRef, String remote) { return updates - .filter(task -> changeRef.equals(task.ref)) - .filter(task -> remote.equals(task.remote)); - } - - private List<ReplicateRefUpdate> listWaitingReplicationTasks(String refRegex) { - Pattern refmaskPattern = Pattern.compile(refRegex); - return tasksStorage.listWaiting().stream() - .filter(task -> refmaskPattern.matcher(task.ref).matches()) - .collect(toList()); + .filter(task -> changeRef.equals(task.ref())) + .filter(task -> remote.equals(task.remote())); } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java new file mode 100644 index 0000000..1001e6c --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageMPIT.java
@@ -0,0 +1,75 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.acceptance.WaitUtil; +import com.google.gerrit.entities.Project; +import com.google.gerrit.extensions.api.projects.BranchInput; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.junit.Test; + +/** + * The tests in this class ensure that events in the storage are correctly managed under multi- + * primary scenarios. + * + * @see com.googlesource.gerrit.plugins.replication.ReplicationStorageIT + */ +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationStorageMPIT extends ReplicationStorageDaemon { + + @Test + public void workFromOnlyWaitingIsPerformed() throws Exception { + Project.NameKey targetProject = createTestProject(project + "replica"); + setReplicationDestination("foo", "replica", ALL_PROJECTS, TEST_LONG_REPLICATION_DELAY_SECONDS); + reloadConfig(); + + String newBranchA = "refs/heads/foo_branch_a"; + String newBranchB = "refs/heads/foo_branch_b"; + String master = "refs/heads/master"; + BranchInput input = new BranchInput(); + input.revision = master; + gApi.projects().name(project.get()).branch(newBranchA).create(input); + gApi.projects().name(project.get()).branch(newBranchB).create(input); + + deleteWaitingReplicationTasks( + newBranchA); // This simulates the work being completed by other node + assertThat(listWaitingReplicationTasks("refs/heads/foo_branch_.*")).hasSize(1); + + try (Repository repo = repoManager.openRepository(targetProject); + Repository sourceRepo = repoManager.openRepository(project)) { + WaitUtil.waitUntil( + () -> checkedGetRef(repo, newBranchA) == null && checkedGetRef(repo, newBranchB) != null, + TEST_PUSH_TIMEOUT_LONG); + + Ref masterRef = getRef(sourceRepo, master); + Ref targetBranchRefA = getRef(repo, newBranchA); + Ref targetBranchRefB = getRef(repo, newBranchB); + assertThat(targetBranchRefA).isNull(); + assertThat(targetBranchRefB).isNotNull(); + assertThat(targetBranchRefB.getObjectId()).isEqualTo(masterRef.getObjectId()); + } + + WaitUtil.waitUntil(() -> listRunning().isEmpty(), TEST_TASK_FINISH_TIMEOUT); + assertThat(listWaiting()).isEmpty(); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java index 42c0914..5cfb2d0 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageMPTest.java
@@ -14,11 +14,13 @@ package com.googlesource.gerrit.plugins.replication; -import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertContainsExactly; +import static com.google.common.truth.Truth.assertThat; import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertNoIncompleteTasks; +import static com.googlesource.gerrit.plugins.replication.ReplicationTasksStorageTest.assertThatStream; import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.net.URISyntaxException; import java.nio.file.FileSystem; import java.nio.file.Path; @@ -33,8 +35,8 @@ protected static final String REMOTE = "myDest"; protected static final URIish URISH = ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git"); - protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE = - new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE); + protected static final ReplicateRefUpdate REF_UPDATE = + ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE); protected static final UriUpdates URI_UPDATES = getUriUpdates(REF_UPDATE); protected ReplicationTasksStorage nodeA; @@ -64,7 +66,7 @@ nodeA.create(REF_UPDATE); nodeB.create(REF_UPDATE); - assertContainsExactly(persistedView.listWaiting(), REF_UPDATE); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); } @Test @@ -72,7 +74,7 @@ nodeA.create(REF_UPDATE); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeB.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); @@ -84,10 +86,10 @@ nodeA.start(URI_UPDATES); nodeA.reset(URI_UPDATES); - assertContainsExactly(persistedView.listWaiting(), REF_UPDATE); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeB.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); @@ -101,10 +103,10 @@ nodeB.start(URI_UPDATES); nodeB.reset(URI_UPDATES); - assertContainsExactly(persistedView.listWaiting(), REF_UPDATE); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeB.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); @@ -119,22 +121,22 @@ nodeB.reset(URI_UPDATES); nodeA.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeA.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); } @Test - public void canBeResetAllAndCompletedByOtherNode() { + public void canBeRecoveredAndCompletedByOtherNode() { nodeA.create(REF_UPDATE); nodeA.start(URI_UPDATES); - nodeB.resetAll(); - assertContainsExactly(persistedView.listWaiting(), REF_UPDATE); + nodeB.recoverAll(); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeA.finish(URI_UPDATES); // Bug: https://crbug.com/gerrit/12973 @@ -145,29 +147,29 @@ } @Test - public void canBeResetAllAndCompletedByOtherNodeFastOriginalNode() { + public void canBeRecoveredAndCompletedByOtherNodeFastOriginalNode() { nodeA.create(REF_UPDATE); nodeA.start(URI_UPDATES); - nodeB.resetAll(); + nodeB.recoverAll(); nodeA.finish(URI_UPDATES); - assertContainsExactly(persistedView.listWaiting(), REF_UPDATE); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeB.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); } @Test - public void canBeResetAllAndCompletedByOtherNodeSlowOriginalNode() { + public void canBeRecoveredAndCompletedByOtherNodeSlowOriginalNode() { nodeA.create(REF_UPDATE); nodeA.start(URI_UPDATES); - nodeB.resetAll(); + nodeB.recoverAll(); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeB.finish(URI_UPDATES); ReplicationTasksStorageTest.assertNoIncompleteTasks(persistedView); @@ -180,19 +182,31 @@ public void multipleNodesCanReplicateSameRef() { nodeA.create(REF_UPDATE); nodeA.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeA.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); nodeB.create(REF_UPDATE); nodeB.start(URI_UPDATES); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); nodeB.finish(URI_UPDATES); assertNoIncompleteTasks(persistedView); } + @Test + public void duplicateWorkIsNotPerformed() { + nodeA.create(REF_UPDATE); + nodeB.create(REF_UPDATE); + + assertThat(nodeA.start(URI_UPDATES)).containsExactly(REF_UPDATE.ref()); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); + + assertThat(nodeB.start(URI_UPDATES)).isEmpty(); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); + } + public static UriUpdates getUriUpdates(ReplicationTasksStorage.ReplicateRefUpdate refUpdate) { try { return TestUriUpdates.create(refUpdate);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java index 23d6759..202cac9 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskMPTest.java
@@ -21,6 +21,7 @@ import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.nio.file.FileSystem; import java.nio.file.Path; import org.eclipse.jgit.transport.URIish; @@ -34,8 +35,8 @@ protected static final String REMOTE = "myDest"; protected static final URIish URISH = ReplicationTasksStorageTest.getUrish("http://example.com/" + PROJECT + ".git"); - protected static final ReplicationTasksStorage.ReplicateRefUpdate REF_UPDATE = - new ReplicationTasksStorage.ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE); + protected static final ReplicateRefUpdate REF_UPDATE = + ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE); protected FileSystem fileSystem; protected Path storageSite; @@ -131,11 +132,11 @@ } @Test - public void canBeResetAllAndCompletedByOtherNode() { + public void canBeRecoveredAndCompletedByOtherNode() { taskA.create(); taskA.start(); - nodeB.resetAll(); + nodeB.recoverAll(); assertIsWaiting(taskA); taskB.create(); @@ -154,10 +155,10 @@ } @Test - public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() { + public void recoveredAndCompletedByOtherNodeWhenTaskAFinishesBeforeTaskB() { taskA.create(); taskA.start(); - nodeB.resetAll(); + nodeB.recoverAll(); taskA.finish(); assertIsWaiting(taskA); @@ -173,10 +174,10 @@ } @Test - public void resetAllAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() { + public void recoveredAndCompletedByOtherNodeWhenTaskAFinishesAfterTaskB() { taskA.create(); taskA.start(); - nodeB.resetAll(); + nodeB.recoverAll(); taskB.start(); assertIsRunning(taskA);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java index d9fbbe5..a2e5e4d 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -36,7 +36,7 @@ protected static final String REMOTE = "myDest"; protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git"); protected static final ReplicateRefUpdate REF_UPDATE = - new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE); + ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE); protected ReplicationTasksStorage tasksStorage; protected FileSystem fileSystem; @@ -200,8 +200,8 @@ @Test public void canHaveTwoWaitingTasksForDifferentRefs() throws Exception { - Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE)); - Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE)); + Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE)); + Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE)); updateA.create(); updateB.create(); assertIsWaiting(updateA); @@ -210,8 +210,8 @@ @Test public void canHaveTwoRunningTasksForDifferentRefs() throws Exception { - Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE)); - Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE)); + Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE)); + Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE)); updateA.create(); updateB.create(); updateA.start(); @@ -225,12 +225,12 @@ Task updateA = tasksStorage .new Task( - new ReplicateRefUpdate( + ReplicateRefUpdate.create( "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE)); Task updateB = tasksStorage .new Task( - new ReplicateRefUpdate( + ReplicateRefUpdate.create( "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE)); updateA.create(); updateB.create(); @@ -243,12 +243,12 @@ Task updateA = tasksStorage .new Task( - new ReplicateRefUpdate( + ReplicateRefUpdate.create( "projectA", REF, getUrish("http://example.com/projectA.git"), REMOTE)); Task updateB = tasksStorage .new Task( - new ReplicateRefUpdate( + ReplicateRefUpdate.create( "projectB", REF, getUrish("http://example.com/projectB.git"), REMOTE)); updateA.create(); updateB.create(); @@ -260,8 +260,8 @@ @Test public void canHaveTwoWaitingTasksForDifferentRemotes() throws Exception { - Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA")); - Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB")); + Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA")); + Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB")); updateA.create(); updateB.create(); assertIsWaiting(updateA); @@ -270,8 +270,8 @@ @Test public void canHaveTwoRunningTasksForDifferentRemotes() throws Exception { - Task updateA = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteA")); - Task updateB = tasksStorage.new Task(new ReplicateRefUpdate(PROJECT, REF, URISH, "remoteB")); + Task updateA = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteA")); + Task updateB = tasksStorage.new Task(ReplicateRefUpdate.create(PROJECT, REF, URISH, "remoteB")); updateA.create(); updateB.create(); updateA.start();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java index 141f739..16a0363 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -22,12 +22,13 @@ import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; +import com.google.common.truth.IterableSubject; import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.net.URISyntaxException; import java.nio.file.FileSystem; import java.nio.file.Path; -import java.util.List; -import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.eclipse.jgit.transport.URIish; import org.junit.After; import org.junit.Before; @@ -39,7 +40,7 @@ protected static final String REMOTE = "myDest"; protected static final URIish URISH = getUrish("http://example.com/" + PROJECT + ".git"); protected static final ReplicateRefUpdate REF_UPDATE = - new ReplicateRefUpdate(PROJECT, REF, URISH, REMOTE); + ReplicateRefUpdate.create(PROJECT, REF, URISH, REMOTE); protected ReplicationTasksStorage storage; protected FileSystem fileSystem; @@ -61,14 +62,14 @@ @Test public void canListEmptyStorage() throws Exception { - assertThat(storage.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test public void canListWaitingUpdate() throws Exception { storage.create(REF_UPDATE); - assertContainsExactly(storage.listWaiting(), REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); } @Test @@ -83,10 +84,10 @@ @Test public void canStartWaitingUpdate() throws Exception { storage.create(REF_UPDATE); - storage.start(uriUpdates); - assertThat(storage.listWaiting()).isEmpty(); + assertThat(storage.start(uriUpdates)).containsExactly(REF_UPDATE.ref()); + assertThatStream(storage.streamWaiting()).isEmpty(); assertFalse(storage.isWaiting(uriUpdates)); - assertContainsExactly(storage.listRunning(), REF_UPDATE); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); } @Test @@ -101,22 +102,22 @@ public void instancesOfTheSameStorageHaveTheSameElements() throws Exception { ReplicationTasksStorage persistedView = new ReplicationTasksStorage(storageSite); - assertThat(storage.listWaiting()).isEmpty(); - assertThat(persistedView.listWaiting()).isEmpty(); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(persistedView.streamWaiting()).isEmpty(); storage.create(REF_UPDATE); - assertContainsExactly(storage.listWaiting(), REF_UPDATE); - assertContainsExactly(persistedView.listWaiting(), REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); + assertThatStream(persistedView.streamWaiting()).containsExactly(REF_UPDATE); storage.start(uriUpdates); - assertThat(storage.listWaiting()).isEmpty(); - assertThat(persistedView.listWaiting()).isEmpty(); - assertContainsExactly(storage.listRunning(), REF_UPDATE); - assertContainsExactly(persistedView.listRunning(), REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(persistedView.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(persistedView.streamRunning()).containsExactly(REF_UPDATE); storage.finish(uriUpdates); - assertThat(storage.listRunning()).isEmpty(); - assertThat(persistedView.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); + assertThatStream(persistedView.streamRunning()).isEmpty(); } @Test @@ -124,13 +125,13 @@ String key = storage.create(REF_UPDATE); String secondKey = storage.create(REF_UPDATE); assertEquals(key, secondKey); - assertContainsExactly(storage.listWaiting(), REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); } @Test public void canCreateDifferentUris() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -138,7 +139,7 @@ String keyA = storage.create(REF_UPDATE); String keyB = storage.create(updateB); - assertThat(storage.listWaiting()).hasSize(2); + assertThatStream(storage.streamWaiting()).hasSize(2); assertTrue(storage.isWaiting(uriUpdates)); assertTrue(storage.isWaiting(TestUriUpdates.create(updateB))); assertNotEquals(keyA, keyB); @@ -147,7 +148,7 @@ @Test public void canStartDifferentUris() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -157,18 +158,18 @@ storage.create(updateB); storage.start(uriUpdates); - assertContainsExactly(storage.listWaiting(), updateB); - assertContainsExactly(storage.listRunning(), REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(updateB); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); storage.start(uriUpdatesB); - assertThat(storage.listWaiting()).isEmpty(); - assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB); } @Test public void canFinishDifferentUris() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -180,16 +181,16 @@ storage.start(uriUpdatesB); storage.finish(uriUpdates); - assertContainsExactly(storage.listRunning(), updateB); + assertThatStream(storage.streamRunning()).containsExactly(updateB); storage.finish(uriUpdatesB); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test public void differentUrisCreatedTwiceIsStoredOnce() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -199,19 +200,19 @@ storage.create(updateB); storage.create(REF_UPDATE); storage.create(updateB); - assertThat(storage.listWaiting()).hasSize(2); + assertThatStream(storage.streamWaiting()).hasSize(2); assertTrue(storage.isWaiting(uriUpdates)); assertTrue(storage.isWaiting(TestUriUpdates.create(updateB))); } @Test public void canCreateMulipleRefsForSameUri() throws Exception { - ReplicateRefUpdate refA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE); - ReplicateRefUpdate refB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE); + ReplicateRefUpdate refA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE); + ReplicateRefUpdate refB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE); String keyA = storage.create(refA); String keyB = storage.create(refB); - assertThat(storage.listWaiting()).hasSize(2); + assertThatStream(storage.streamWaiting()).hasSize(2); assertNotEquals(keyA, keyB); assertTrue(storage.isWaiting(TestUriUpdates.create(refA))); assertTrue(storage.isWaiting(TestUriUpdates.create(refB))); @@ -219,8 +220,8 @@ @Test public void canFinishMulipleRefsForSameUri() throws Exception { - ReplicateRefUpdate refUpdateA = new ReplicateRefUpdate(PROJECT, "refA", URISH, REMOTE); - ReplicateRefUpdate refUpdateB = new ReplicateRefUpdate(PROJECT, "refB", URISH, REMOTE); + ReplicateRefUpdate refUpdateA = ReplicateRefUpdate.create(PROJECT, "refA", URISH, REMOTE); + ReplicateRefUpdate refUpdateB = ReplicateRefUpdate.create(PROJECT, "refB", URISH, REMOTE); UriUpdates uriUpdatesA = TestUriUpdates.create(refUpdateA); UriUpdates uriUpdatesB = TestUriUpdates.create(refUpdateB); storage.create(refUpdateA); @@ -229,10 +230,10 @@ storage.start(uriUpdatesB); storage.finish(uriUpdatesA); - assertContainsExactly(storage.listRunning(), refUpdateB); + assertThatStream(storage.streamRunning()).containsExactly(refUpdateB); storage.finish(uriUpdatesB); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test @@ -241,8 +242,8 @@ storage.start(uriUpdates); storage.reset(uriUpdates); - assertContainsExactly(storage.listWaiting(), REF_UPDATE); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamRunning()).isEmpty(); } @Test @@ -252,40 +253,40 @@ storage.reset(uriUpdates); storage.start(uriUpdates); - assertContainsExactly(storage.listRunning(), REF_UPDATE); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); assertFalse(storage.isWaiting(uriUpdates)); - assertThat(storage.listWaiting()).isEmpty(); storage.finish(uriUpdates); assertNoIncompleteTasks(storage); } @Test - public void canResetAllEmpty() throws Exception { - storage.resetAll(); + public void canRecoverEmpty() throws Exception { + storage.recoverAll(); assertNoIncompleteTasks(storage); } @Test - public void canResetAllUpdate() throws Exception { + public void canRecoverUpdate() throws Exception { storage.create(REF_UPDATE); storage.start(uriUpdates); - storage.resetAll(); - assertContainsExactly(storage.listWaiting(), REF_UPDATE); + storage.recoverAll(); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamRunning()).isEmpty(); assertTrue(storage.isWaiting(uriUpdates)); - assertThat(storage.listRunning()).isEmpty(); } @Test - public void canCompleteResetAllUpdate() throws Exception { + public void canCompleteRecoveredUpdate() throws Exception { storage.create(REF_UPDATE); storage.start(uriUpdates); - storage.resetAll(); + storage.recoverAll(); storage.start(uriUpdates); - assertContainsExactly(storage.listRunning(), REF_UPDATE); - assertThat(storage.listWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).isEmpty(); assertFalse(storage.isWaiting(uriUpdates)); storage.finish(uriUpdates); @@ -293,9 +294,9 @@ } @Test - public void canResetAllMultipleUpdates() throws Exception { + public void canRecoverMultipleUpdates() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -306,14 +307,14 @@ storage.start(uriUpdates); storage.start(uriUpdatesB); - storage.resetAll(); - assertContainsExactly(storage.listWaiting(), REF_UPDATE, updateB); + storage.recoverAll(); + assertThatStream(storage.streamWaiting()).containsExactly(REF_UPDATE, updateB); } @Test - public void canCompleteMultipleResetAllUpdates() throws Exception { + public void canCompleteMultipleRecoveredUpdates() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -323,15 +324,15 @@ storage.create(updateB); storage.start(uriUpdates); storage.start(uriUpdatesB); - storage.resetAll(); + storage.recoverAll(); storage.start(uriUpdates); - assertContainsExactly(storage.listRunning(), REF_UPDATE); - assertContainsExactly(storage.listWaiting(), updateB); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE); + assertThatStream(storage.streamWaiting()).containsExactly(updateB); storage.start(uriUpdatesB); - assertContainsExactly(storage.listRunning(), REF_UPDATE, updateB); - assertThat(storage.listWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).containsExactly(REF_UPDATE, updateB); + assertThatStream(storage.streamWaiting()).isEmpty(); storage.finish(uriUpdates); storage.finish(uriUpdatesB); @@ -355,7 +356,7 @@ @Test(expected = Test.None.class /* no exception expected */) public void illegalDoubleFinishDifferentUriIsGraceful() throws Exception { ReplicateRefUpdate updateB = - new ReplicateRefUpdate( + ReplicateRefUpdate.create( PROJECT, REF, getUrish("ssh://example.com/" + PROJECT + ".git"), // uses ssh not http @@ -370,30 +371,16 @@ storage.finish(uriUpdates); storage.finish(uriUpdatesB); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } protected static void assertNoIncompleteTasks(ReplicationTasksStorage storage) { - assertThat(storage.listWaiting()).isEmpty(); - assertThat(storage.listRunning()).isEmpty(); + assertThatStream(storage.streamWaiting()).isEmpty(); + assertThatStream(storage.streamRunning()).isEmpty(); } - protected static void assertContainsExactly( - List<ReplicateRefUpdate> all, ReplicateRefUpdate... refUpdates) { - assertThat(all).hasSize(refUpdates.length); - for (int i = 0; i < refUpdates.length; i++) { - assertTrue(equals(all.get(i), refUpdates[i])); - } - } - - public static boolean equals(ReplicateRefUpdate one, ReplicateRefUpdate two) { - return (one == null && two == null) - || (one != null - && two != null - && Objects.equals(one.project, two.project) - && Objects.equals(one.ref, two.ref) - && Objects.equals(one.remote, two.remote) - && Objects.equals(one.uri, two.uri)); + protected static IterableSubject assertThatStream(Stream<?> stream) { + return assertThat(stream.collect(Collectors.toList())); } public static URIish getUrish(String uri) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java new file mode 100644 index 0000000..901200b --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestDispatcher.java
@@ -0,0 +1,78 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.gerrit.entities.BranchNameKey; +import com.google.gerrit.entities.Change; +import com.google.gerrit.entities.Project; +import com.google.gerrit.server.events.ChangeEvent; +import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.events.EventDispatcher; +import com.google.gerrit.server.events.ProjectEvent; +import com.google.gerrit.server.events.RefEvent; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +public class TestDispatcher implements EventDispatcher { + private final List<ProjectEvent> projectEvents = new LinkedList<>(); + private final List<RefEvent> refEvents = new LinkedList<>(); + private final List<Event> events = new LinkedList<>(); + + @Override + public void postEvent(Change change, ChangeEvent event) {} // Not used in replication + + @Override + public void postEvent(BranchNameKey branchName, RefEvent event) { + refEvents.add(event); + } + + @Override + public void postEvent(Project.NameKey projectName, ProjectEvent event) { + projectEvents.add(event); + } + + @Override + public void postEvent(Event event) { + events.add(event); + } + + public List<RefEvent> getEvents(BranchNameKey branch, Class<? extends RefEvent> clazz) { + return getEvents(branch).stream().filter(clazz::isInstance).collect(Collectors.toList()); + } + + public <T extends ProjectEvent> List<T> getEvents(Project.NameKey project, Class<T> clazz) { + return getEvents(project).stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .collect(Collectors.toList()); + } + + public <T extends RefEvent> List<T> getEvents(Class<T> clazz) { + return events.stream().filter(clazz::isInstance).map(clazz::cast).collect(Collectors.toList()); + } + + private List<RefEvent> getEvents(BranchNameKey branch) { + return refEvents.stream() + .filter(e -> e.getBranchNameKey().equals(branch)) + .collect(Collectors.toList()); + } + + private List<ProjectEvent> getEvents(Project.NameKey project) { + return projectEvents.stream() + .filter(e -> e.getProjectNameKey().equals(project)) + .collect(Collectors.toList()); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java index 2fd3ee3..f61114e 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/TestUriUpdates.java
@@ -26,10 +26,10 @@ public abstract class TestUriUpdates implements UriUpdates { public static TestUriUpdates create(ReplicateRefUpdate update) throws URISyntaxException { return create( - Project.nameKey(update.project), - new URIish(update.uri), - update.remote, - Collections.singleton(update.ref)); + Project.nameKey(update.project()), + new URIish(update.uri()), + update.remote(), + Collections.singleton(update.ref())); } public static TestUriUpdates create(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/events/EventFieldsTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/events/EventFieldsTest.java new file mode 100644 index 0000000..9f0fb9f --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/events/EventFieldsTest.java
@@ -0,0 +1,79 @@ +// Copyright (C) 2021 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.events; + +import static org.junit.Assert.assertEquals; + +import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import java.net.URISyntaxException; +import org.eclipse.jgit.transport.RemoteRefUpdate; +import org.eclipse.jgit.transport.URIish; +import org.junit.Test; + +public class EventFieldsTest { + @SuppressWarnings("deprecation") + @Test + public void refReplicatedEventFields() throws URISyntaxException { + URIish completeUri = new URIish("git://someHost:9417/basePath/someProject.git"); + RefReplicatedEvent event = + new RefReplicatedEvent( + "someProject", + "refs/heads/master", + completeUri, + RefPushResult.SUCCEEDED, + RemoteRefUpdate.Status.OK); + + assertEquals("someProject", event.project); + assertEquals("refs/heads/master", event.ref); + assertEquals("someHost:9417", event.targetNode); + assertEquals("git://someHost:9417/basePath/someProject.git", event.targetUri); + assertEquals("succeeded", event.status); + assertEquals(RemoteRefUpdate.Status.OK, event.refStatus); + } + + @SuppressWarnings("deprecation") + @Test + public void scheduledEventFields() throws URISyntaxException { + URIish completeUri = new URIish("git://someHost:9417/basePath/someProject.git"); + ReplicationScheduledEvent event = + new ReplicationScheduledEvent("someProject", "refs/heads/master", completeUri); + + assertEquals("someProject", event.project); + assertEquals("git://someHost:9417/basePath/someProject.git", event.targetUri); + assertEquals("someHost:9417", event.targetNode); + } + + @Test + public void refReplicatedEventsEqual() throws URISyntaxException { + URIish completeUri = new URIish("git://someHost:9417/basePath/someProject.git"); + RefReplicatedEvent first = + new RefReplicatedEvent( + "someProject", + "refs/heads/master", + completeUri, + RefPushResult.SUCCEEDED, + RemoteRefUpdate.Status.OK); + + RefReplicatedEvent second = + new RefReplicatedEvent( + "someProject", + "refs/heads/master", + completeUri, + RefPushResult.SUCCEEDED, + RemoteRefUpdate.Status.OK); + + assertEquals(first, second); + } +}