Merge branch 'stable-3.4'
* stable-3.4:
Ensure states are updated for canceled replication tasks
Change-Id: Ifc514a151819f5d809a1834d382857baf334a8a2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index d2f7213..a6a162b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -460,6 +460,7 @@
synchronized (stateLock) {
URIish uri = pushOp.getURI();
pending.remove(uri);
+ pushOp.notifyNotAttempted(pushOp.getRefs());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 33bd91d..eb2b999 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -217,6 +226,75 @@
}
@Test
+ public void pushAllWait() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ future.get();
+ state.waitForReplication();
+ }
+
+ @Test
+ public void pushAllWaitCancelNotRunningTask() throws Exception {
+ createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ ReplicationState state = new ReplicationState(NO_OP);
+
+ Future<?> future =
+ plugin
+ .getSysInjector()
+ .getInstance(PushAll.Factory.class)
+ .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false)
+ .schedule(0, TimeUnit.SECONDS);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Executor service = Executors.newSingleThreadExecutor();
+ service.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ state.waitForReplication();
+ latch.countDown();
+ } catch (Exception e) {
+ // fails the test because we don't countDown
+ }
+ }
+ });
+
+ // Cancel the replication task
+ waitUntil(() -> getProjectTasks().size() != 0);
+ WorkQueue.Task<?> task = getProjectTasks().get(0);
+ assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+ task.cancel(false);
+
+ // Confirm our waiting thread completed
+ boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+ assertThat(receivedSignal).isTrue();
+ }
+
+ private List<WorkQueue.Task<?>> getProjectTasks() {
+ return getInstance(WorkQueue.class).getTasks().stream()
+ .filter(t -> t instanceof WorkQueue.ProjectTask)
+ .collect(Collectors.toList());
+ }
+
+ @Test
public void shouldReplicateHeadUpdate() throws Exception {
setReplicationDestination("foo", "replica", ALL_PROJECTS);
reloadConfig();