Move broker publish metrics to the BrokerApiWrapper

Decouple the BrokerMetrics from Kafka for the publishing
of messages to topics.

Change-Id: Ib288d838a99ba2e2860874cab0afe9dd954b8d10
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 9a41cc5..6dc9577 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -21,15 +21,27 @@
 
 public class BrokerApiWrapper implements BrokerApi {
   private final DynamicItem<BrokerApi> apiDelegate;
+  private final BrokerMetrics metrics;
 
   @Inject
-  public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate) {
+  public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate, BrokerMetrics metrics) {
     this.apiDelegate = apiDelegate;
+    this.metrics = metrics;
   }
 
   @Override
   public boolean send(String topic, Event event) {
-    return apiDelegate.get().send(topic, event);
+    boolean succeeded = false;
+    try {
+      succeeded = apiDelegate.get().send(topic, event);
+    } finally {
+      if (succeeded) {
+        metrics.incrementBrokerPublishedMessage();
+      } else {
+        metrics.incrementBrokerFailedToPublishMessage();
+      }
+    }
+    return succeeded;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
index d9069b4..729ac3f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
@@ -25,7 +25,6 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
@@ -41,20 +40,17 @@
   private final Gson gson;
   private final UUID instanceId;
   private final MessageLogger msgLog;
-  private final BrokerMetrics brokerMetrics;
 
   @Inject
   public BrokerPublisher(
       BrokerSession session,
       @BrokerGson Gson gson,
       @InstanceId UUID instanceId,
-      MessageLogger msgLog,
-      BrokerMetrics brokerMetrics) {
+      MessageLogger msgLog) {
     this.session = session;
     this.gson = gson;
     this.instanceId = instanceId;
     this.msgLog = msgLog;
-    this.brokerMetrics = brokerMetrics;
   }
 
   @Override
@@ -80,9 +76,6 @@
     Boolean eventPublished = session.publish(topic, getPayload(brokerEvent));
     if (eventPublished) {
       msgLog.log(Direction.PUBLISH, brokerEvent);
-      brokerMetrics.incrementBrokerPublishedMessage();
-    } else {
-      brokerMetrics.incrementBrokerFailedToPublishMessage();
     }
     return eventPublished;
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
new file mode 100644
index 0000000..2abc7b0
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -0,0 +1,56 @@
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerApiWrapperTest {
+  @Mock private BrokerMetrics brokerMetrics;
+  @Mock private BrokerApi brokerApi;
+  @Mock Event event;
+
+  private BrokerApiWrapper objectUnderTest;
+
+  @Before
+  public void setUp() {
+    objectUnderTest =
+        new BrokerApiWrapper(DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics);
+  }
+
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+    when(brokerApi.send(any(), any())).thenReturn(true);
+    objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
+    when(brokerApi.send(any(), any())).thenReturn(false);
+    objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedException() {
+    when(brokerApi.send(any(), any()))
+        .thenThrow(new RuntimeException("Unexpected runtime exception"));
+    try {
+      objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+    } catch (RuntimeException e) {
+      // expected
+    }
+    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index ee07f8f..c751a84 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -72,7 +72,7 @@
 
   @Before
   public void setUp() {
-    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog, brokerMetrics);
+    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog);
   }
 
   @Test
@@ -125,23 +125,6 @@
   }
 
   @Test
-  public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
-    Event event = createSampleEvent();
-    when(session.publish(any(), any())).thenReturn(true);
-    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
-    verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
-  }
-
-  @Test
-  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
-    Event event = createSampleEvent();
-    when(session.publish(any(), any())).thenReturn(false);
-
-    publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
-    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
-  }
-
-  @Test
   public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
     Event event = createSampleEvent();
     when(session.publish(any(), any())).thenReturn(true);