Use volatile and AtomicIntegers to be thread safe

Modify the fields in ReplicationState class to be volatile and
AtomicIntegers so that changes to them are reflected to other
threads. By not doing so, modifications made by one thread to
these fields may not be reflected instantly depending on
cpu caching thus resulting in incorrect state

Change-Id: I76512b17c19cc68e4f1e6a5223899f9a184bb549
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index df8f3f4..b948be0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -17,6 +17,7 @@
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
@@ -24,7 +25,7 @@
 
 public class ReplicationState {
 
-  private boolean allScheduled;
+  private volatile boolean allScheduled;
   private final PushResultProcessing pushResultProcessing;
 
   private final Lock countingLock = new ReentrantLock();
@@ -33,8 +34,8 @@
   private static class RefReplicationStatus {
     private final String project;
     private final String ref;
-    private int nodesToReplicateCount;
-    private int replicatedNodesCount;
+    private final AtomicInteger nodesToReplicateCount = new AtomicInteger();
+    private final AtomicInteger replicatedNodesCount = new AtomicInteger();
 
     RefReplicationStatus(String project, String ref) {
       this.project = project;
@@ -42,13 +43,13 @@
     }
 
     public boolean allDone() {
-      return replicatedNodesCount == nodesToReplicateCount;
+      return replicatedNodesCount.get() == nodesToReplicateCount.get();
     }
   }
 
   private final Table<String, String, RefReplicationStatus> statusByProjectRef;
-  private int totalPushTasksCount;
-  private int finishedPushTasksCount;
+  private final AtomicInteger totalPushTasksCount = new AtomicInteger();
+  private final AtomicInteger finishedPushTasksCount = new AtomicInteger();
 
   ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
@@ -58,15 +59,15 @@
   public void increasePushTaskCount(String project, String ref) {
     countingLock.lock();
     try {
-      getRefStatus(project, ref).nodesToReplicateCount++;
-      totalPushTasksCount++;
+      getRefStatus(project, ref).nodesToReplicateCount.getAndIncrement();
+      totalPushTasksCount.getAndIncrement();
     } finally {
       countingLock.unlock();
     }
   }
 
   public boolean hasPushTask() {
-    return totalPushTasksCount != 0;
+    return totalPushTasksCount.get() != 0;
   }
 
   public void notifyRefReplicated(
@@ -82,14 +83,14 @@
     countingLock.lock();
     try {
       RefReplicationStatus refStatus = getRefStatus(project, ref);
-      refStatus.replicatedNodesCount++;
-      finishedPushTasksCount++;
+      refStatus.replicatedNodesCount.getAndIncrement();
+      finishedPushTasksCount.getAndIncrement();
 
       if (allScheduled) {
         if (refStatus.allDone()) {
           completedRefStatus = statusByProjectRef.remove(project, ref);
         }
-        allPushTaksCompleted = finishedPushTasksCount == totalPushTasksCount;
+        allPushTaksCompleted = finishedPushTasksCount.get() == totalPushTasksCount.get();
       }
     } finally {
       countingLock.unlock();
@@ -108,7 +109,7 @@
     countingLock.lock();
     try {
       allScheduled = true;
-      if (finishedPushTasksCount < totalPushTasksCount) {
+      if (finishedPushTasksCount.get() < totalPushTasksCount.get()) {
         return;
       }
     } finally {
@@ -120,7 +121,7 @@
 
   private void doAllPushTasksCompleted() {
     fireRemainingOnRefReplicatedToAllNodes();
-    pushResultProcessing.onAllRefsReplicatedToAllNodes(totalPushTasksCount);
+    pushResultProcessing.onAllRefsReplicatedToAllNodes(totalPushTasksCount.get());
     allPushTasksFinished.countDown();
   }
 
@@ -135,7 +136,7 @@
 
   private void doRefPushTasksCompleted(RefReplicationStatus refStatus) {
     pushResultProcessing.onRefReplicatedToAllNodes(
-        refStatus.project, refStatus.ref, refStatus.nodesToReplicateCount);
+        refStatus.project, refStatus.ref, refStatus.nodesToReplicateCount.get());
   }
 
   private RefReplicationStatus getRefStatus(String project, String ref) {