Add method onRefReplicatedToAllNodes to PushResultProcessing

PushResultProcessing had 2 methods:

  onOneNodeReplicated->called when one ref is done replicating to a specific node
  onAllNodesReplicated->called when all refs are done replicating to all the nodes

It was missing a method for when replication is completed to all the nodes for one
ref. Add this method and renamed the 2 other methods for naming consistency:

  onRefReplicatedToOneNode->called when one ref is done replicating to a specific node
  onRefReplicatedToAllNodes->called when one ref is done replicating to all the nodes
  onAllRefsReplicatedToAllNodes->called when all the refs are done replicating to all the nodes

Sending stream event for ref replicated requires this change to prevent
implementing similar logic in all the implementations of PushResultProcessing.
Will simplify code in change 50920 and allow sending ref replicated stream
events for replication started by the command.

Change-Id: I89d162ea286c3ad20166843f5c3253662d9ae893
diff --git a/BUCK b/BUCK
index d8a9358..cd61988 100644
--- a/BUCK
+++ b/BUCK
@@ -17,6 +17,7 @@
   srcs = glob(['src/test/java/**/*.java']),
   deps = [
     ':replication__plugin__compile',
+    '//lib:easymock',
     '//lib:junit',
     '//lib/jgit:jgit',
   ],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 19a9359..9edf501 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -256,7 +256,7 @@
         pending.put(uri, e);
       }
       e.addRef(ref);
-      state.increasePushTaskCount();
+      state.increasePushTaskCount(project.get(), ref);
       e.addState(ref, state);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 0452b69..7bfc28a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -22,9 +22,12 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class PushResultProcessing {
-  abstract void onOneNodeReplicated(String project, String ref, URIish uri, RefPushResult status);
 
-  abstract void onAllNodesReplicated(int totalPushTasksCount);
+  abstract void onRefReplicatedToOneNode(String project, String ref, URIish uri, RefPushResult status);
+
+  abstract void onRefReplicatedToAllNodes(String project, String ref, int nodesCount);
+
+  abstract void onAllRefsReplicatedToAllNodes(int totalPushTasksCount);
 
   void writeStdOut(final String message) {
     // Default doing nothing
@@ -57,11 +60,13 @@
     }
 
     @Override
-    void onOneNodeReplicated(String project, String ref, URIish uri,
+    void onRefReplicatedToOneNode(String project, String ref, URIish uri,
         RefPushResult status) {
       StringBuilder sb = new StringBuilder();
       sb.append("Replicate ");
       sb.append(project);
+      sb.append(" ref ");
+      sb.append(ref);
       sb.append(" to ");
       sb.append(resolveNodeName(uri));
       sb.append(", ");
@@ -84,7 +89,20 @@
     }
 
     @Override
-    void onAllNodesReplicated(int totalPushTasksCount) {
+    void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Replication of ");
+      sb.append(project);
+      sb.append(" ref ");
+      sb.append(ref);
+      sb.append(" completed to ");
+      sb.append(nodesCount);
+      sb.append(" nodes, ");
+      writeStdOut(sb.toString());
+    }
+
+    @Override
+    void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {
       if (totalPushTasksCount == 0) {
         return;
       }
@@ -115,25 +133,34 @@
 
   public static class GitUpdateProcessing extends PushResultProcessing {
     @Override
-    void onOneNodeReplicated(String project, String ref, URIish uri,
+    void onRefReplicatedToOneNode(String project, String ref, URIish uri,
         RefPushResult status) {
       //TODO: send stream events
     }
 
     @Override
-    void onAllNodesReplicated(int totalPushTasksCount) {
+    void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {
+      //TODO: send stream events
+    }
+
+    @Override
+    void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {
       //TODO: send stream events
     }
   }
 
   public static class NoopProcessing extends PushResultProcessing {
     @Override
-    void onOneNodeReplicated(String project, String ref, URIish uri,
+    void onRefReplicatedToOneNode(String project, String ref, URIish uri,
         RefPushResult status) {
     }
 
     @Override
-    void onAllNodesReplicated(int totalPushTasksCount) {
+    void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {
+    }
+
+    @Override
+    void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 2258534..9632d5f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -14,14 +14,17 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+import com.googlesource.gerrit.plugins.replication.PushResultProcessing.NoopProcessing;
+
 import org.eclipse.jgit.transport.URIish;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.googlesource.gerrit.plugins.replication.PushResultProcessing.NoopProcessing;
-
 public class ReplicationState {
   private boolean allScheduled;
   private final PushResultProcessing pushResultProcessing;
@@ -29,20 +32,38 @@
   private final Lock countingLock = new ReentrantLock();
   private final CountDownLatch allPushTasksFinished = new CountDownLatch(1);
 
+  private static class RefReplicationStatus {
+    private final String project;
+    private final String ref;
+    private int nodesToReplicateCount;
+    private int replicatedNodesCount;
+
+    public RefReplicationStatus(String project, String ref) {
+      this.project = project;
+      this.ref = ref;
+    }
+
+    public boolean allDone() {
+      return replicatedNodesCount == nodesToReplicateCount;
+    }
+  }
+  private final Table<String, String, RefReplicationStatus> statusByProjectRef;
   private int totalPushTasksCount;
   private int finishedPushTasksCount;
 
   public ReplicationState() {
-    pushResultProcessing = new NoopProcessing();
+    this(new NoopProcessing());
   }
 
   public ReplicationState(PushResultProcessing processing) {
     pushResultProcessing = processing;
+    statusByProjectRef = HashBasedTable.create();
   }
 
-  public void increasePushTaskCount() {
+  public void increasePushTaskCount(String project, String ref) {
     countingLock.lock();
     try {
+      getRefStatus(project, ref).nodesToReplicateCount++;
       totalPushTasksCount++;
     } finally {
       countingLock.unlock();
@@ -55,22 +76,33 @@
 
   public void notifyRefReplicated(String project, String ref, URIish uri,
       RefPushResult status) {
-    pushResultProcessing.onOneNodeReplicated(project, ref, uri, status);
+    pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status);
 
+    RefReplicationStatus completedRefStatus = null;
+    boolean allPushTaksCompleted = false;
     countingLock.lock();
     try {
+      RefReplicationStatus refStatus = getRefStatus(project, ref);
+      refStatus.replicatedNodesCount++;
       finishedPushTasksCount++;
-      if (!allScheduled) {
-        return;
-      }
-      if (finishedPushTasksCount < totalPushTasksCount) {
-        return;
+
+      if (allScheduled) {
+        if (refStatus.allDone()) {
+          completedRefStatus = statusByProjectRef.remove(project, ref);
+        }
+        allPushTaksCompleted = finishedPushTasksCount == totalPushTasksCount;
       }
     } finally {
       countingLock.unlock();
     }
 
-    doAllPushTasksCompleted();
+    if (completedRefStatus != null) {
+      doRefPushTasksCompleted(completedRefStatus);
+    }
+
+    if (allPushTaksCompleted) {
+      doAllPushTasksCompleted();
+    }
   }
 
   public void markAllPushTasksScheduled() {
@@ -88,10 +120,35 @@
   }
 
   private void doAllPushTasksCompleted() {
-    pushResultProcessing.onAllNodesReplicated(totalPushTasksCount);
+    fireRemainingOnRefReplicatedToAllNodes();
+    pushResultProcessing.onAllRefsReplicatedToAllNodes(totalPushTasksCount);
     allPushTasksFinished.countDown();
   }
 
+  /**
+   * Some could be remaining if replication of a ref is completed before all
+   * tasks are scheduled.
+   */
+  private void fireRemainingOnRefReplicatedToAllNodes() {
+    for (RefReplicationStatus refStatus : statusByProjectRef.values()) {
+      doRefPushTasksCompleted(refStatus);
+    }
+  }
+
+  private void doRefPushTasksCompleted(RefReplicationStatus refStatus) {
+    pushResultProcessing.onRefReplicatedToAllNodes(refStatus.project,
+        refStatus.ref, refStatus.nodesToReplicateCount);
+  }
+
+  private RefReplicationStatus getRefStatus(String project, String ref) {
+    if (!statusByProjectRef.contains(project, ref)) {
+      RefReplicationStatus refStatus = new RefReplicationStatus(project, ref);
+      statusByProjectRef.put(project, ref, refStatus);
+      return refStatus;
+    }
+    return statusByProjectRef.get(project, ref);
+  }
+
   public void waitForReplication() throws InterruptedException {
     allPushTasksFinished.await();
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
new file mode 100644
index 0000000..a0bf576
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -0,0 +1,219 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.resetToDefault;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+
+public class ReplicationStateTest {
+
+  private ReplicationState replicationState;
+  private PushResultProcessing pushResultProcessingMock;
+
+  @Before
+  public void setUp() throws Exception {
+    pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
+    replay(pushResultProcessingMock);
+    replicationState = new ReplicationState(pushResultProcessingMock);
+  }
+
+  @Test
+  public void shouldNotHavePushTask() {
+    assertFalse(replicationState.hasPushTask());
+  }
+
+  @Test
+  public void shouldHavePushTask() {
+    replicationState.increasePushTaskCount("someProject", "someRef");
+    assertTrue(replicationState.hasPushTask());
+  }
+
+  @Test
+  public void shouldFireOneReplicationEventWhenNothingToReplicate() {
+    resetToDefault(pushResultProcessingMock);
+
+    //expected event
+    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0);
+    replay(pushResultProcessingMock);
+
+    //actual test
+    replicationState.markAllPushTasksScheduled();
+    verify(pushResultProcessingMock);
+  }
+
+  @Test
+  public void shouldFireEventsForReplicationOfOneRefToOneNode()
+      throws URISyntaxException {
+    resetToDefault(pushResultProcessingMock);
+    URIish uri = new URIish("git://someHost/someRepo.git");
+
+    //expected events
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "someRef",
+        uri, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject",
+        "someRef", 1);
+    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1);
+    replay(pushResultProcessingMock);
+
+    //actual test
+    replicationState.increasePushTaskCount("someProject", "someRef");
+    replicationState.markAllPushTasksScheduled();
+    replicationState.notifyRefReplicated("someProject", "someRef", uri,
+        RefPushResult.SUCCEEDED);
+    verify(pushResultProcessingMock);
+  }
+
+  @Test
+  public void shouldFireEventsForReplicationOfOneRefToMultipleNodes()
+      throws URISyntaxException {
+    resetToDefault(pushResultProcessingMock);
+    URIish uri1 = new URIish("git://someHost1/someRepo.git");
+    URIish uri2 = new URIish("git://someHost2/someRepo.git");
+
+    //expected events
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "someRef",
+        uri1, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "someRef",
+        uri2, RefPushResult.FAILED);
+    pushResultProcessingMock.onRefReplicatedToAllNodes("someProject",
+        "someRef", 2);
+    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
+    replay(pushResultProcessingMock);
+
+    //actual test
+    replicationState.increasePushTaskCount("someProject", "someRef");
+    replicationState.increasePushTaskCount("someProject", "someRef");
+    replicationState.markAllPushTasksScheduled();
+    replicationState.notifyRefReplicated("someProject", "someRef", uri1,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("someProject", "someRef", uri2,
+        RefPushResult.FAILED);
+    verify(pushResultProcessingMock);
+  }
+
+  @Test
+  public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
+      throws URISyntaxException {
+    resetToDefault(pushResultProcessingMock);
+    URIish uri1 = new URIish("git://host1/someRepo.git");
+    URIish uri2 = new URIish("git://host2/someRepo.git");
+    URIish uri3 = new URIish("git://host3/someRepo.git");
+
+    //expected events
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref1",
+        uri1, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref1",
+        uri2, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref1",
+        uri3, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref2",
+        uri1, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref2",
+        uri2, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock
+        .onRefReplicatedToAllNodes("someProject", "ref1", 3);
+    pushResultProcessingMock
+        .onRefReplicatedToAllNodes("someProject", "ref2", 2);
+    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5);
+    replay(pushResultProcessingMock);
+
+    //actual test
+    replicationState.increasePushTaskCount("someProject", "ref1");
+    replicationState.increasePushTaskCount("someProject", "ref1");
+    replicationState.increasePushTaskCount("someProject", "ref1");
+    replicationState.increasePushTaskCount("someProject", "ref2");
+    replicationState.increasePushTaskCount("someProject", "ref2");
+    replicationState.markAllPushTasksScheduled();
+    replicationState.notifyRefReplicated("someProject", "ref1", uri1,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("someProject", "ref1", uri2,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("someProject", "ref1", uri3,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("someProject", "ref2", uri1,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("someProject", "ref2", uri2,
+        RefPushResult.SUCCEEDED);
+    verify(pushResultProcessingMock);
+  }
+
+  @Test
+  public void shouldFireEventsForReplicationSameRefDifferentProjects()
+      throws URISyntaxException {
+    resetToDefault(pushResultProcessingMock);
+    URIish uri = new URIish("git://host1/someRepo.git");
+
+    //expected events
+    pushResultProcessingMock.onRefReplicatedToOneNode("project1", "ref1", uri,
+        RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("project2", "ref2", uri,
+        RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1);
+    pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1);
+    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
+    replay(pushResultProcessingMock);
+
+    //actual test
+    replicationState.increasePushTaskCount("project1", "ref1");
+    replicationState.increasePushTaskCount("project2", "ref2");
+    replicationState.markAllPushTasksScheduled();
+    replicationState.notifyRefReplicated("project1", "ref1", uri,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("project2", "ref2", uri,
+        RefPushResult.SUCCEEDED);
+    verify(pushResultProcessingMock);
+  }
+
+  @Test
+  public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
+      throws URISyntaxException {
+    resetToDefault(pushResultProcessingMock);
+    URIish uri1 = new URIish("git://host1/someRepo.git");
+
+   //expected events
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref1",
+        uri1, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock.onRefReplicatedToOneNode("someProject", "ref2",
+        uri1, RefPushResult.SUCCEEDED);
+    pushResultProcessingMock
+        .onRefReplicatedToAllNodes("someProject", "ref1", 1);
+    pushResultProcessingMock
+        .onRefReplicatedToAllNodes("someProject", "ref2", 1);
+    pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
+    replay(pushResultProcessingMock);
+
+    //actual test
+    replicationState.increasePushTaskCount("someProject", "ref1");
+    replicationState.increasePushTaskCount("someProject", "ref2");
+    replicationState.notifyRefReplicated("someProject", "ref1", uri1,
+        RefPushResult.SUCCEEDED);
+    replicationState.notifyRefReplicated("someProject", "ref2", uri1,
+        RefPushResult.SUCCEEDED);
+    replicationState.markAllPushTasksScheduled();
+    verify(pushResultProcessingMock);
+  }
+}