Merge branch 'stable-3.4' * stable-3.4: Ensure states are updated for canceled replication tasks Change-Id: Ifc514a151819f5d809a1834d382857baf334a8a2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index d2f7213..a6a162b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -460,6 +460,7 @@ synchronized (stateLock) { URIish uri = pushOp.getURI(); pending.remove(uri); + pushOp.notifyNotAttempted(pushOp.getRefs()); } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index 33bd91d..eb2b999 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -27,10 +27,19 @@ import com.google.gerrit.extensions.common.ProjectInfo; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; @@ -217,6 +226,75 @@ } @Test + public void pushAllWait() throws Exception { + createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + ReplicationState state = new ReplicationState(NO_OP); + + Future<?> future = + plugin + .getSysInjector() + .getInstance(PushAll.Factory.class) + .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false) + .schedule(0, TimeUnit.SECONDS); + + future.get(); + state.waitForReplication(); + } + + @Test + public void pushAllWaitCancelNotRunningTask() throws Exception { + createTestProject(project + "replica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + ReplicationState state = new ReplicationState(NO_OP); + + Future<?> future = + plugin + .getSysInjector() + .getInstance(PushAll.Factory.class) + .create(null, new ReplicationFilter(Arrays.asList(project.get())), state, false) + .schedule(0, TimeUnit.SECONDS); + + CountDownLatch latch = new CountDownLatch(1); + Executor service = Executors.newSingleThreadExecutor(); + service.execute( + new Runnable() { + @Override + public void run() { + try { + future.get(); + state.waitForReplication(); + latch.countDown(); + } catch (Exception e) { + // fails the test because we don't countDown + } + } + }); + + // Cancel the replication task + waitUntil(() -> getProjectTasks().size() != 0); + WorkQueue.Task<?> task = getProjectTasks().get(0); + assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING); + task.cancel(false); + + // Confirm our waiting thread completed + boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout + assertThat(receivedSignal).isTrue(); + } + + private List<WorkQueue.Task<?>> getProjectTasks() { + return getInstance(WorkQueue.class).getTasks().stream() + .filter(t -> t instanceof WorkQueue.ProjectTask) + .collect(Collectors.toList()); + } + + @Test public void shouldReplicateHeadUpdate() throws Exception { setReplicationDestination("foo", "replica", ALL_PROJECTS); reloadConfig();