Prevent concurrent event replaying If the events distributor is enabled and startup events replaying takes longer than the distributor delay, the two threads will be replaying events concurrently. Use an AtomicBoolean to ensure only one runs at a time. Change-Id: Idce111dfb54a88ab6ace0f52e70b61a711395855
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 66be6dd..65e4a72 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.transport.URIish; /** Manages automatic replication to remote repositories. */ @@ -60,7 +61,7 @@ private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency private final ReplicationTasksStorage replicationTasksStorage; private volatile boolean running; - private volatile boolean replaying; + private final AtomicBoolean replaying = new AtomicBoolean(); private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; private Distributor distributor; @@ -110,7 +111,7 @@ @Override public boolean isReplaying() { - return replaying; + return replaying.get(); } public void scheduleFullSync( @@ -187,33 +188,34 @@ } private void firePendingEvents() { - replaying = true; - new ChainedScheduler.StreamScheduler<>( - workQueue.getDefaultQueue(), - replicationTasksStorage.streamWaiting(), - new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() { - @Override - public void run(ReplicationTasksStorage.ReplicateRefUpdate u) { - try { - fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref()); - } catch (URISyntaxException e) { - repLog.atSevere().withCause(e).log( - "Encountered malformed URI for persisted event %s", u); - } catch (Throwable e) { - repLog.atSevere().withCause(e).log("Unexpected error while firing pending events"); + if (replaying.compareAndSet(false, true)) { + new ChainedScheduler.StreamScheduler<>( + workQueue.getDefaultQueue(), + replicationTasksStorage.streamWaiting(), + new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() { + @Override + public void run(ReplicationTasksStorage.ReplicateRefUpdate u) { + try { + fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref()); + } catch (URISyntaxException e) { + repLog.atSevere().withCause(e).log( + "Encountered malformed URI for persisted event %s", u); + } catch (Throwable e) { + repLog.atSevere().withCause(e).log("Unexpected error while firing pending events"); + } } - } - @Override - public void onDone() { - replaying = false; - } + @Override + public void onDone() { + replaying.set(false); + } - @Override - public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) { - return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref()); - } - }); + @Override + public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) { + return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref()); + } + }); + } } private void pruneCompleted() {