Always update replication status metrics

In the following scenario the replication status metrics become stale:
* Node1 is serving traffic for projectX,replication lag metrics are empty:
  => projectX_Node1_local_version = t1,
     projectX_remote_version = t1,
     node1_lag=0
  => replication lag metric added for projectX-node1
* Node1 is briefly unhelthy and Node2 serves traffic for projectX
  => projectX_Node1_local_version = t1,
     projectX_remote_version = t2,
     node1_lag=t2-t1
  => replication lag metric updated for projectX-node1
* Node1 is back and serves traffic for projectX:
  => projectX_Node1_local_version = t3,
     projectX_remote_version = t3,
     node1_lag=0
  => replication lag metric is NOT updated for projectX-node1,
     since the event is coming from Node1
* Node1 continue serving traffic
  => replication lag metric for projectX-node1 will never be updated
     and it will always be stale

Always update them regardless the origin of the event.
Bug: Issue 15168

Change-Id: Ibe4ff24583ccad7b14640f7b251094c3fd2d8f95
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 1f3b092..7107b87 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -84,7 +84,6 @@
         msgLog.log(Direction.CONSUME, topic, event);
         eventRouter.route(event);
         subscriberMetrics.incrementSubscriberConsumedMessage();
-        subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
         logger.atSevere().withCause(e).log("Malformed event '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
@@ -93,5 +92,6 @@
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
     }
+    subscriberMetrics.updateReplicationStatusMetrics(event);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
index 14298ca..1e11bf9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -103,6 +103,34 @@
     }
   }
 
+  @Test
+  public void shouldUpdateReplicationMetricsWithLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = NODE_INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+    }
+  }
+
+  @Test
+  public void shouldUpdateReplicationMetricsWithNonLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+    }
+  }
+
   protected abstract AbstractSubcriber objectUnderTest();
 
   protected abstract List<Event> events();