Consume and expose pull-replication status

Consume all RemoteRefReplicationEvent events in order to consume both
push or pull-replication events.

Expose the type of the replication (PUSH or FETCH) to the REST-API
response payload.

Bug: Issue 14804
Change-Id: I3672c1ee49c676ae518e46b85e2472362b3b1970
diff --git a/README.md b/README.md
index a5c70e0..97a8736 100644
--- a/README.md
+++ b/README.md
@@ -32,28 +32,44 @@
     "https://github.com/some/project.git": {
       "status": {
         "refs/changes/01/1/meta": {
+          "type": "PUSH",
           "status": "SUCCEEDED",
           "when": 1626688830
         },
         "refs/changes/03/3/meta": {
+          "type": "PUSH",
           "status": "SUCCEEDED",
           "when": 1626688854
         },
         "refs/changes/03/3/1": {
+          "type": "PUSH",
           "status": "SUCCEEDED",
           "when": 1626688854
         },
         "refs/changes/02/2/1": {
+          "type": "PUSH",
           "status": "SUCCEEDED",
           "when": 1626688844
         },
         "refs/changes/02/2/meta": {
+          "type": "PUSH",
           "status": "SUCCEEDED",
           "when": 1626688844
         },
         "refs/changes/01/1/1": {
+          "type": "PUSH",
           "status": "SUCCEEDED",
           "when": 1626688830
+        },
+        "refs/changes/04/4/meta": {
+          "type": "PULL",
+          "status": "SUCCEEDED",
+          "when": 1628000641
+        },
+        "refs/changes/04/4/1": {
+          "type": "PULL",
+          "status": "SUCCEEDED",
+          "when": 1628000641
         }
       }
     }
diff --git a/proto/cache.proto b/proto/cache.proto
index 6206ac0..b35d60d 100644
--- a/proto/cache.proto
+++ b/proto/cache.proto
@@ -27,10 +27,11 @@
 }
 
 // Serialized form of com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.
-// Next ID: 3
+// Next ID: 4
 message ReplicationStatusProto {
   int64 when = 1;
   string status = 2;
+  string type = 3;
 }
 
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/EventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/EventHandler.java
index a2e708f..1648bd8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/EventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/EventHandler.java
@@ -14,19 +14,24 @@
 
 package com.googlesource.gerrit.plugins.replicationstatus;
 
+import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.Key;
 import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationStatusResult;
 import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationStatusResult.SCHEDULED;
+import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationType;
+import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationType.PULL;
+import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationType.PUSH;
 
 import com.google.common.cache.Cache;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.RefEvent;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RemoteRefReplicationEvent;
 import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.util.Optional;
 
 class EventHandler implements EventListener {
   private final Cache<ReplicationStatus.Key, ReplicationStatus> replicationStatusCache;
@@ -43,30 +48,36 @@
 
   @Override
   public void onEvent(Event event) {
-    if (shouldConsume(event)) {
-      if (event instanceof RefReplicatedEvent) {
-        RefReplicatedEvent replEvent = (RefReplicatedEvent) event;
-        putCacheEntry(replEvent, replEvent.targetUri, replEvent.status);
-      } else if (event instanceof ReplicationScheduledEvent) {
-        ReplicationScheduledEvent replEvent = (ReplicationScheduledEvent) event;
-        putCacheEntry(replEvent, replEvent.targetUri, SCHEDULED.name());
-      }
+    if (shouldConsume(event) && (event instanceof RemoteRefReplicationEvent)) {
+      RemoteRefReplicationEvent replicationEvent = (RemoteRefReplicationEvent) event;
+      putCacheEntry(
+          replicationType(event),
+          replicationEvent,
+          replicationEvent.targetUri,
+          Optional.ofNullable(replicationEvent.status).orElse(SCHEDULED.name()));
     }
   }
 
-  private <T extends RefEvent> void putCacheEntry(T refEvent, String targetNode, String status) {
-    ReplicationStatus.Key cacheKey =
-        ReplicationStatus.Key.create(
-            refEvent.getProjectNameKey(), targetNode, refEvent.getRefName());
+  private <T extends RemoteRefReplicationEvent> void putCacheEntry(
+      ReplicationType type, T replicationEvent, String remote, String status) {
+    Key cacheKey =
+        Key.create(replicationEvent.getProjectNameKey(), remote, replicationEvent.getRefName());
 
     replicationStatusCache.put(
         cacheKey,
         ReplicationStatus.create(
-            ReplicationStatusResult.fromString(status), refEvent.eventCreatedOn));
+            type, ReplicationStatusResult.fromString(status), replicationEvent.eventCreatedOn));
   }
 
   private boolean shouldConsume(Event event) {
     return (nodeInstanceId == null && event.instanceId == null)
         || (nodeInstanceId != null && nodeInstanceId.equals(event.instanceId));
   }
+
+  private static ReplicationType replicationType(Event event) {
+    if (event instanceof ReplicationScheduledEvent || event instanceof RefReplicatedEvent) {
+      return PUSH;
+    }
+    return PULL;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatus.java
index 9287773..994e5ce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatus.java
@@ -26,10 +26,12 @@
   static final String CACHE_NAME = "replication_status";
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  static ReplicationStatus create(ReplicationStatusResult status, long when) {
-    return new AutoValue_ReplicationStatus(status, when);
+  static ReplicationStatus create(ReplicationType type, ReplicationStatusResult status, long when) {
+    return new AutoValue_ReplicationStatus(type, status, when);
   }
 
+  public abstract ReplicationType type();
+
   public abstract ReplicationStatusResult status();
 
   public abstract long when();
@@ -82,6 +84,7 @@
           Cache.ReplicationStatusProto.newBuilder()
               .setWhen(object.when())
               .setStatus(object.status().name())
+              .setType(object.type().name())
               .build());
     }
 
@@ -91,7 +94,9 @@
           Protos.parseUnchecked(Cache.ReplicationStatusProto.parser(), in);
 
       return ReplicationStatus.create(
-          ReplicationStatus.ReplicationStatusResult.valueOf(proto.getStatus()), proto.getWhen());
+          ReplicationStatus.ReplicationType.valueOf(proto.getType()),
+          ReplicationStatus.ReplicationStatusResult.valueOf(proto.getStatus()),
+          proto.getWhen());
     }
   }
 
@@ -123,4 +128,9 @@
       return this == FAILED || this == UNKNOWN;
     }
   }
+
+  enum ReplicationType {
+    PUSH,
+    PULL;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatusIT.java b/src/test/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatusIT.java
index 2dc81f1..1e35d58 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatusIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replicationstatus/ReplicationStatusIT.java
@@ -18,6 +18,8 @@
 import static com.google.gerrit.acceptance.testsuite.project.TestProjectUpdate.allow;
 import static com.google.gerrit.extensions.restapi.Url.encode;
 import static com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationType.PULL;
+import static com.googlesource.gerrit.plugins.replicationstatus.ReplicationStatus.ReplicationType.PUSH;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -38,6 +40,7 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RemoteRefReplicationEvent;
 import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -126,7 +129,7 @@
 
     result.assertOK();
     assertThat(contentWithoutMagicJson(result))
-        .isEqualTo(successReplicationStatus(REMOTE, project, eventCreatedOn));
+        .isEqualTo(successReplicationStatus(PUSH, REMOTE, project, eventCreatedOn));
   }
 
   @Test
@@ -138,7 +141,7 @@
 
     result.assertOK();
     assertThat(contentWithoutMagicJson(result))
-        .isEqualTo(scheduledReplicationStatus(REMOTE, project, eventCreatedOn));
+        .isEqualTo(scheduledReplicationStatus(PUSH, REMOTE, project, eventCreatedOn));
   }
 
   @Test
@@ -150,7 +153,7 @@
 
     result.assertOK();
     assertThat(contentWithoutMagicJson(result))
-        .isEqualTo(successReplicationStatus(REMOTE, project, eventCreatedOn));
+        .isEqualTo(successReplicationStatus(PUSH, REMOTE, project, eventCreatedOn));
   }
 
   @Test
@@ -173,7 +176,7 @@
 
     result.assertOK();
     assertThat(contentWithoutMagicJson(result))
-        .isEqualTo(successReplicationStatus(REMOTE, project, eventCreatedOn));
+        .isEqualTo(successReplicationStatus(PUSH, REMOTE, project, eventCreatedOn));
   }
 
   @Test
@@ -186,7 +189,19 @@
 
     result.assertOK();
     assertThat(contentWithoutMagicJson(result))
-        .isEqualTo(failedReplicationStatus(REMOTE, project, eventCreatedOn));
+        .isEqualTo(failedReplicationStatus(PUSH, REMOTE, project, eventCreatedOn));
+  }
+
+  @Test
+  public void shouldReturnScheduledProjectFetchReplicationStatus() throws Exception {
+    long eventCreatedOn = System.currentTimeMillis();
+
+    eventHandler.onEvent(fetchScheduledEvent(null, eventCreatedOn, REF_MASTER, REMOTE));
+    RestResponse result = adminRestSession.get(endpoint(project, REMOTE));
+
+    result.assertOK();
+    assertThat(contentWithoutMagicJson(result))
+        .isEqualTo(scheduledReplicationStatus(PULL, REMOTE, project, eventCreatedOn));
   }
 
   private String contentWithoutMagicJson(RestResponse response) throws IOException {
@@ -237,6 +252,17 @@
     return scheduledEvent;
   }
 
+  private RemoteRefReplicationEvent fetchScheduledEvent(
+      @Nullable String instanceId, long when, String ref, String remote) throws URISyntaxException {
+    RemoteRefReplicationEvent scheduledFetchEvent =
+        new RemoteRefReplicationEvent(
+            "fetch-ref-replication-scheduled", project.get(), ref, new URIish(remote), null);
+    scheduledFetchEvent.instanceId = instanceId;
+    scheduledFetchEvent.eventCreatedOn = when;
+
+    return scheduledFetchEvent;
+  }
+
   private RefReplicatedEvent successReplicatedEvent(
       @Nullable String instanceId, long when, String remoteUrl) throws URISyntaxException {
 
@@ -275,25 +301,29 @@
             project.get()));
   }
 
-  private String successReplicationStatus(String remote, Project.NameKey project, long when)
+  private String successReplicationStatus(
+      ReplicationStatus.ReplicationType type, String remote, Project.NameKey project, long when)
       throws URISyntaxException {
     return successReplicationStatus(
-        remote, project, when, ReplicationStatus.ReplicationStatusResult.SUCCEEDED);
+        type, remote, project, when, ReplicationStatus.ReplicationStatusResult.SUCCEEDED);
   }
 
-  private String scheduledReplicationStatus(String remote, Project.NameKey project, long when)
+  private String scheduledReplicationStatus(
+      ReplicationStatus.ReplicationType type, String remote, Project.NameKey project, long when)
       throws URISyntaxException {
     return successReplicationStatus(
-        remote, project, when, ReplicationStatus.ReplicationStatusResult.SCHEDULED);
+        type, remote, project, when, ReplicationStatus.ReplicationStatusResult.SCHEDULED);
   }
 
   private String successReplicationStatus(
+      ReplicationStatus.ReplicationType type,
       String remote,
       Project.NameKey project,
       long when,
       ReplicationStatus.ReplicationStatusResult replicationStatusResult)
       throws URISyntaxException {
     return projectReplicationStatus(
+        type,
         remote,
         project,
         when,
@@ -301,9 +331,11 @@
         replicationStatusResult);
   }
 
-  private String failedReplicationStatus(String remote, Project.NameKey project, long when)
+  private String failedReplicationStatus(
+      ReplicationStatus.ReplicationType type, String remote, Project.NameKey project, long when)
       throws URISyntaxException {
     return projectReplicationStatus(
+        type,
         remote,
         project,
         when,
@@ -312,19 +344,20 @@
   }
 
   private String projectReplicationStatus(
+      ReplicationStatus.ReplicationType type,
       String remoteUrl,
       Project.NameKey project,
       long when,
       ProjectReplicationStatus.ProjectReplicationStatusResult projectReplicationStatusResult,
-      ReplicationStatus.ReplicationStatusResult replicationStatusResult)
-      throws URISyntaxException {
+      ReplicationStatus.ReplicationStatusResult replicationStatusResult) {
     return gson.toJson(
         ProjectReplicationStatus.create(
             ImmutableMap.of(
                 remoteUrl,
                 RemoteReplicationStatus.create(
                     ImmutableMap.of(
-                        REF_MASTER, ReplicationStatus.create(replicationStatusResult, when)))),
+                        REF_MASTER,
+                        ReplicationStatus.create(type, replicationStatusResult, when)))),
             projectReplicationStatusResult,
             project.get()));
   }