Move broker publish metrics to the BrokerApiWrapper
Decouple the BrokerMetrics from Kafka for the publishing
of messages to topics.
Change-Id: Ib288d838a99ba2e2860874cab0afe9dd954b8d10
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 9a41cc5..6dc9577 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -21,15 +21,27 @@
public class BrokerApiWrapper implements BrokerApi {
private final DynamicItem<BrokerApi> apiDelegate;
+ private final BrokerMetrics metrics;
@Inject
- public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate) {
+ public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate, BrokerMetrics metrics) {
this.apiDelegate = apiDelegate;
+ this.metrics = metrics;
}
@Override
public boolean send(String topic, Event event) {
- return apiDelegate.get().send(topic, event);
+ boolean succeeded = false;
+ try {
+ succeeded = apiDelegate.get().send(topic, event);
+ } finally {
+ if (succeeded) {
+ metrics.incrementBrokerPublishedMessage();
+ } else {
+ metrics.incrementBrokerFailedToPublishMessage();
+ }
+ }
+ return succeeded;
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
index d9069b4..729ac3f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
@@ -25,7 +25,6 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
@@ -41,20 +40,17 @@
private final Gson gson;
private final UUID instanceId;
private final MessageLogger msgLog;
- private final BrokerMetrics brokerMetrics;
@Inject
public BrokerPublisher(
BrokerSession session,
@BrokerGson Gson gson,
@InstanceId UUID instanceId,
- MessageLogger msgLog,
- BrokerMetrics brokerMetrics) {
+ MessageLogger msgLog) {
this.session = session;
this.gson = gson;
this.instanceId = instanceId;
this.msgLog = msgLog;
- this.brokerMetrics = brokerMetrics;
}
@Override
@@ -80,9 +76,6 @@
Boolean eventPublished = session.publish(topic, getPayload(brokerEvent));
if (eventPublished) {
msgLog.log(Direction.PUBLISH, brokerEvent);
- brokerMetrics.incrementBrokerPublishedMessage();
- } else {
- brokerMetrics.incrementBrokerFailedToPublishMessage();
}
return eventPublished;
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
new file mode 100644
index 0000000..2abc7b0
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -0,0 +1,56 @@
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerApiWrapperTest {
+ @Mock private BrokerMetrics brokerMetrics;
+ @Mock private BrokerApi brokerApi;
+ @Mock Event event;
+
+ private BrokerApiWrapper objectUnderTest;
+
+ @Before
+ public void setUp() {
+ objectUnderTest =
+ new BrokerApiWrapper(DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics);
+ }
+
+ @Test
+ public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+ when(brokerApi.send(any(), any())).thenReturn(true);
+ objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+ verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
+ when(brokerApi.send(any(), any())).thenReturn(false);
+ objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+ verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedException() {
+ when(brokerApi.send(any(), any()))
+ .thenThrow(new RuntimeException("Unexpected runtime exception"));
+ try {
+ objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+ } catch (RuntimeException e) {
+ // expected
+ }
+ verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index ee07f8f..c751a84 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -72,7 +72,7 @@
@Before
public void setUp() {
- publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog, brokerMetrics);
+ publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog);
}
@Test
@@ -125,23 +125,6 @@
}
@Test
- public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
- Event event = createSampleEvent();
- when(session.publish(any(), any())).thenReturn(true);
- publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
- verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
- }
-
- @Test
- public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
- Event event = createSampleEvent();
- when(session.publish(any(), any())).thenReturn(false);
-
- publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
- verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
- }
-
- @Test
public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
Event event = createSampleEvent();
when(session.publish(any(), any())).thenReturn(true);