Add ReplicationScheduledEvent to indicate ref replication is scheduled.

Assure that event is posted only once each time a ref is scheduled.
Covers the corner case when the ref is added to already scheduled push.

Change-Id: I175940f8d4c6b9164c41bbec8c52d221ec30001b
Signed-off-by: Eryk Szymanski <eryksz@gmail.com>
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index b20f8eb..5e9f01c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,15 +14,20 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName;
+
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
+import com.google.gerrit.common.EventDispatcher;
 import com.google.gerrit.common.data.GroupReference;
 import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.reviewdb.client.AccountGroup;
+import com.google.gerrit.reviewdb.client.Branch;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.reviewdb.server.ReviewDb;
@@ -76,6 +81,7 @@
   private volatile WorkQueue.Executor pool;
   private final PerThreadRequestScope.Scoper threadScoper;
   private final DestinationConfiguration config;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
 
   protected enum RetryReason {
     TRANSPORT_ERROR, COLLISION, REPOSITORY_MISSING;
@@ -99,8 +105,10 @@
       GitRepositoryManager gitRepositoryManager,
       GroupBackend groupBackend,
       ReplicationStateListener stateLog,
-      GroupIncludeCache groupIncludeCache) {
+      GroupIncludeCache groupIncludeCache,
+      DynamicItem<EventDispatcher> eventDispatcher) {
     config = cfg;
+    this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
     this.stateLog = stateLog;
 
@@ -283,10 +291,12 @@
       PushOne e = pending.get(uri);
       if (e == null) {
         e = opFactory.create(project, uri);
+        addRef(e, ref);
         pool.schedule(e, config.getDelay(), TimeUnit.SECONDS);
         pending.put(uri, e);
+      } else if (!e.getRefs().contains(ref)) {
+        addRef(e, ref);
       }
-      e.addRef(ref);
       state.increasePushTaskCount(project.get(), ref);
       e.addState(ref, state);
       repLog.info("scheduled {}:{} => {} to run after {}s", project, ref,
@@ -301,6 +311,11 @@
     }
   }
 
+  private void addRef(PushOne e, String ref) {
+    e.addRef(ref);
+    postEvent(e, ref);
+  }
+
   /**
    * It schedules again a PushOp instance.
    * <p>
@@ -556,4 +571,12 @@
     }
     return uri.toString().contains(urlMatch);
   }
+
+  private void postEvent(PushOne pushOp, String ref) {
+    Project.NameKey project = pushOp.getProjectNameKey();
+    String targetNode = resolveNodeName(pushOp.getURI());
+    ReplicationScheduledEvent event =
+        new ReplicationScheduledEvent(project.get(), ref, targetNode);
+    eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
index dd82aa0..970a018 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication;
 
+import com.google.gerrit.common.EventDispatcher;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.PluginUser;
 import com.google.gerrit.server.account.GroupBackend;
 import com.google.gerrit.server.account.GroupIncludeCache;
@@ -31,6 +33,7 @@
   private final GroupBackend groupBackend;
   private final ReplicationStateListener stateLog;
   private final GroupIncludeCache groupIncludeCache;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
 
   @Inject
   public DestinationFactory(Injector injector,
@@ -39,7 +42,8 @@
       GitRepositoryManager gitRepositoryManager,
       GroupBackend groupBackend,
       ReplicationStateListener stateLog,
-      GroupIncludeCache groupIncludeCache) {
+      GroupIncludeCache groupIncludeCache,
+      DynamicItem<EventDispatcher> eventDispatcher) {
     this.injector = injector;
     this.replicationUserFactory = replicationUserFactory;
     this.pluginUser = pluginUser;
@@ -47,10 +51,12 @@
     this.groupBackend = groupBackend;
     this.stateLog = stateLog;
     this.groupIncludeCache = groupIncludeCache;
+    this.eventDispatcher = eventDispatcher;
   }
 
   Destination create(DestinationConfiguration config) {
     return new Destination(injector, config, replicationUserFactory, pluginUser,
-        gitRepositoryManager, groupBackend, stateLog, groupIncludeCache);
+        gitRepositoryManager, groupBackend, stateLog, groupIncludeCache,
+        eventDispatcher);
   }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index 6717660..8df1c1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -55,7 +55,7 @@
     // Default doing nothing
   }
 
-  private static String resolveNodeName(URIish uri) {
+  static String resolveNodeName(URIish uri) {
     StringBuilder sb = new StringBuilder();
     if (uri.isRemote()) {
       sb.append(uri.getHost());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 5434619..c8957e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -67,6 +67,8 @@
 
     EventTypes.register(RefReplicatedEvent.TYPE, RefReplicatedEvent.class);
     EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class);
+    EventTypes.register(
+        ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class);
     bind(SshSessionFactory.class).toProvider(
         ReplicationSshSessionFactoryProvider.class);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
new file mode 100644
index 0000000..d24fbb5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -0,0 +1,45 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.reviewdb.client.Project.NameKey;
+import com.google.gerrit.server.events.RefEvent;
+
+public class ReplicationScheduledEvent extends RefEvent {
+  static final String TYPE = "ref-replication-scheduled";
+
+  final String project;
+  final String ref;
+  final String targetNode;
+
+  public ReplicationScheduledEvent(String project, String ref,
+      String targetNode) {
+    super(TYPE);
+    this.project = project;
+    this.ref = ref;
+    this.targetNode = targetNode;
+  }
+
+  @Override
+  public String getRefName() {
+    return ref;
+  }
+
+  @Override
+  public NameKey getProjectNameKey() {
+    return new Project.NameKey(project);
+  }
+}