Prevent concurrent event replaying

If the events distributor is enabled and startup events replaying takes
longer than the distributor delay, the two threads will be replaying
events concurrently. Use an AtomicBoolean to ensure only one runs at a
time.

Change-Id: Idce111dfb54a88ab6ace0f52e70b61a711395855
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 66be6dd..65e4a72 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -40,6 +40,7 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.transport.URIish;
 
 /** Manages automatic replication to remote repositories. */
@@ -60,7 +61,7 @@
   private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
   private final ReplicationTasksStorage replicationTasksStorage;
   private volatile boolean running;
-  private volatile boolean replaying;
+  private final AtomicBoolean replaying = new AtomicBoolean();
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
   private Distributor distributor;
 
@@ -110,7 +111,7 @@
 
   @Override
   public boolean isReplaying() {
-    return replaying;
+    return replaying.get();
   }
 
   public void scheduleFullSync(
@@ -187,33 +188,34 @@
   }
 
   private void firePendingEvents() {
-    replaying = true;
-    new ChainedScheduler.StreamScheduler<>(
-        workQueue.getDefaultQueue(),
-        replicationTasksStorage.streamWaiting(),
-        new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
-          @Override
-          public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
-            try {
-              fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
-            } catch (URISyntaxException e) {
-              repLog.atSevere().withCause(e).log(
-                  "Encountered malformed URI for persisted event %s", u);
-            } catch (Throwable e) {
-              repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
+    if (replaying.compareAndSet(false, true)) {
+      new ChainedScheduler.StreamScheduler<>(
+          workQueue.getDefaultQueue(),
+          replicationTasksStorage.streamWaiting(),
+          new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
+            @Override
+            public void run(ReplicationTasksStorage.ReplicateRefUpdate u) {
+              try {
+                fire(new URIish(u.uri()), Project.nameKey(u.project()), u.ref());
+              } catch (URISyntaxException e) {
+                repLog.atSevere().withCause(e).log(
+                    "Encountered malformed URI for persisted event %s", u);
+              } catch (Throwable e) {
+                repLog.atSevere().withCause(e).log("Unexpected error while firing pending events");
+              }
             }
-          }
 
-          @Override
-          public void onDone() {
-            replaying = false;
-          }
+            @Override
+            public void onDone() {
+              replaying.set(false);
+            }
 
-          @Override
-          public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
-            return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
-          }
-        });
+            @Override
+            public String toString(ReplicationTasksStorage.ReplicateRefUpdate u) {
+              return "Scheduling push to " + String.format("%s:%s", u.project(), u.ref());
+            }
+          });
+    }
   }
 
   private void pruneCompleted() {