Publish metrics when producing and consuming messages

Expose the count of successes and failures when consuming and producing
messages on gcloud-pubsub.

Bug: Issue 14408
Change-Id: I1878f82fdf8a578660b480ba6433aed18438bb48
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index b8d8c49..a6058ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -18,6 +18,7 @@
 import com.google.cloud.pubsub.v1.AckReplyConsumer;
 import com.google.cloud.pubsub.v1.MessageReceiver;
 import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.flogger.FluentLogger;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -37,6 +38,7 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final Gson gson;
+  private final PubSubSubscriberMetrics subscriberMetrics;
   private final String topic;
   private final Consumer<EventMessage> messageProcessor;
   private final SubscriberProvider subscriberProvider;
@@ -48,9 +50,11 @@
       Gson gson,
       SubscriberProvider subscriberProvider,
       PubSubConfiguration config,
+      PubSubSubscriberMetrics subscriberMetrics,
       @Assisted String topic,
       @Assisted Consumer<EventMessage> messageProcessor) {
     this.gson = gson;
+    this.subscriberMetrics = subscriberMetrics;
     this.topic = topic;
     this.messageProcessor = messageProcessor;
     this.subscriberProvider = subscriberProvider;
@@ -58,15 +62,8 @@
   }
 
   public void subscribe() {
-    MessageReceiver receiver =
-        (PubsubMessage message, AckReplyConsumer consumer) -> {
-          EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
-          messageProcessor.accept(event);
-          consumer.ack();
-        };
-
     try {
-      subscriber = subscriberProvider.get(topic, receiver);
+      subscriber = subscriberProvider.get(topic, getMessageReceiver());
       subscriber
           .startAsync()
           .awaitRunning(config.getSubscribtionTimeoutInSeconds(), TimeUnit.SECONDS);
@@ -98,4 +95,21 @@
       logger.atSevere().withCause(e).log("Timeout during subscriber shutdown");
     }
   }
+
+  @VisibleForTesting
+  MessageReceiver getMessageReceiver() {
+    return (PubsubMessage message, AckReplyConsumer consumer) -> {
+      try {
+        EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
+        messageProcessor.accept(event);
+        consumer.ack();
+        subscriberMetrics.incrementSucceedToConsumeMessage();
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log(
+            "Exception when consuming message %s from topic %s [message: %s]",
+            message.getMessageId(), topic, message.getData().toStringUtf8());
+        subscriberMetrics.incrementFailedToConsumeMessage();
+      }
+    };
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventsMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventsMetrics.java
new file mode 100644
index 0000000..44968f4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventsMetrics.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.server.logging.PluginMetadata;
+
+public abstract class PubSubEventsMetrics {
+
+  public Field<String> stringField(String metadataKey, String description) {
+    return Field.ofString(
+            metadataKey,
+            (metadataBuilder, fieldValue) ->
+                metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
+        .description(description)
+        .build();
+  }
+
+  public Description rateDescription(String unit, String description) {
+    return new Description(description).setRate().setUnit(unit);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index 24ef602..a802d71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -16,8 +16,11 @@
 
 import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -37,6 +40,8 @@
   }
 
   private final Gson gson;
+  private final PubSubPublisherMetrics publisherMetrics;
+  private final String topic;
   private final Publisher publisher;
   private final PubSubConfiguration pubSubProperties;
 
@@ -45,9 +50,12 @@
       PubSubConfiguration pubSubProperties,
       PublisherProvider publisherProvider,
       Gson gson,
+      PubSubPublisherMetrics publisherMetrics,
       @Assisted String topic)
       throws IOException {
     this.gson = gson;
+    this.publisherMetrics = publisherMetrics;
+    this.topic = topic;
     this.publisher = publisherProvider.get(topic);
     this.pubSubProperties = pubSubProperties;
   }
@@ -72,7 +80,29 @@
   }
 
   private ApiFuture<String> publishAsync(PubsubMessage pubsubMessage) {
-    return publisher.publish(pubsubMessage);
+    ApiFuture<String> publish = publisher.publish(pubsubMessage);
+    ApiFutures.addCallback(
+        publish,
+        new ApiFutureCallback<String>() {
+          @Override
+          public void onFailure(Throwable t) {
+            logger.atSevere().withCause(t).log(
+                "Exception when publishing message (id:%s) to topic '%s' [message: %s]",
+                pubsubMessage.getMessageId(), topic, pubsubMessage.getData().toStringUtf8());
+            publisherMetrics.incrementFailedToPublishMessage();
+          }
+
+          @Override
+          public void onSuccess(String messageId) {
+            logger.atFine().log(
+                "Successfully published message (id:%s) to topic '%s' [message: %s]",
+                messageId, topic, pubsubMessage.getData().toStringUtf8());
+
+            publisherMetrics.incrementSucceedToPublishMessage();
+          }
+        },
+        MoreExecutors.directExecutor());
+    return publish;
   }
 
   private void publishSync(PubsubMessage pubsubMessage) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherMetrics.java
new file mode 100644
index 0000000..a205238
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherMetrics.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PubSubPublisherMetrics extends PubSubEventsMetrics {
+  private static final String PUBLISHER_SUCCESS_COUNTER = "publisher_success_counter";
+  private static final String PUBLISHER_FAILURE_COUNTER = "publisher_failure_counter";
+
+  private final Counter1<String> publisherSuccessCounter;
+  private final Counter1<String> publisherFailureCounter;
+
+  @Inject
+  public PubSubPublisherMetrics(MetricMaker metricMaker, @PluginName String pluginName) {
+
+    this.publisherSuccessCounter =
+        metricMaker.newCounter(
+            String.join("/", pluginName, PUBLISHER_SUCCESS_COUNTER),
+            rateDescription("messages", "Number of successfully published messages"),
+            stringField(PUBLISHER_SUCCESS_COUNTER, "Count of published messages"));
+    this.publisherFailureCounter =
+        metricMaker.newCounter(
+            String.join("/", pluginName, PUBLISHER_FAILURE_COUNTER),
+            rateDescription("errors", "Number of messages failed to publish"),
+            stringField(PUBLISHER_FAILURE_COUNTER, "Count of messages failed to publish"));
+  }
+
+  public void incrementSucceedToPublishMessage() {
+    publisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+  }
+
+  public void incrementFailedToPublishMessage() {
+    publisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubSubscriberMetrics.java
new file mode 100644
index 0000000..bbde683
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubSubscriberMetrics.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class PubSubSubscriberMetrics extends PubSubEventsMetrics {
+
+  private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_success_counter";
+  private static final String SUBSCRIBER_FAILURE_COUNTER = "subscriber_failure_counter";
+
+  private final Counter1<String> subscriberSuccessCounter;
+  private final Counter1<String> subscriberFailureCounter;
+
+  @Inject
+  public PubSubSubscriberMetrics(MetricMaker metricMaker, @PluginName String pluginName) {
+    this.subscriberSuccessCounter =
+        metricMaker.newCounter(
+            String.join("/", pluginName, SUBSCRIBER_SUCCESS_COUNTER),
+            rateDescription(
+                "messages", "Number of messages successfully consumed by the subscriber"),
+            stringField(SUBSCRIBER_SUCCESS_COUNTER, "Count of successfully consumed messages"));
+    this.subscriberFailureCounter =
+        metricMaker.newCounter(
+            String.join("/", pluginName, SUBSCRIBER_FAILURE_COUNTER),
+            rateDescription("errors", "Number of messages failed to consume by the subscriber"),
+            stringField(SUBSCRIBER_FAILURE_COUNTER, "Count of messages failed to consume"));
+  }
+
+  public void incrementFailedToConsumeMessage() {
+    subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
+  }
+
+  public void incrementSucceedToConsumeMessage() {
+    subscriberSuccessCounter.increment(SUBSCRIBER_SUCCESS_COUNTER);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
new file mode 100644
index 0000000..ec2dd69
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.gerrit.json.OutputFormat;
+import com.google.pubsub.v1.PubsubMessage;
+import java.util.function.Consumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubEventSubscriberTest {
+
+  @Mock PubSubConfiguration confMock;
+  @Mock SubscriberProvider subscriberProviderMock;
+  @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
+  @Mock AckReplyConsumer ackReplyConsumerMock;
+
+  private static final String TOPIC = "foo";
+
+  @Test
+  public void shouldIncrementFailedToConsumeMessageWhenReceivingFails() {
+    Consumer<EventMessage> failingConsumer =
+        (message) -> {
+          throw new RuntimeException("Error receiving message");
+        };
+
+    messageReceiver(failingConsumer)
+        .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock);
+
+    verify(pubSubSubscriberMetricsMock, only()).incrementFailedToConsumeMessage();
+  }
+
+  @Test
+  public void shouldIncrementSucceedToConsumeMessageWhenReceivingSucceeds() {
+    Consumer<EventMessage> succeedingConsumer = (message) -> {};
+
+    messageReceiver(succeedingConsumer)
+        .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock);
+
+    verify(pubSubSubscriberMetricsMock, only()).incrementSucceedToConsumeMessage();
+  }
+
+  private MessageReceiver messageReceiver(Consumer<EventMessage> consumer) {
+    return new PubSubEventSubscriber(
+            OutputFormat.JSON_COMPACT.newGson(),
+            subscriberProviderMock,
+            confMock,
+            pubSubSubscriberMetricsMock,
+            TOPIC,
+            consumer)
+        .getMessageReceiver();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
new file mode 100644
index 0000000..c009973
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.gerrit.json.OutputFormat;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import java.io.IOException;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubPublisherTest {
+  PubSubPublisher objectUnderTest;
+
+  @Mock PubSubConfiguration confMock;
+  @Mock PublisherProvider publisherProviderMock;
+  @Mock Publisher publisherMock;
+  @Mock PubSubPublisherMetrics pubSubPublisherMetricsMock;
+
+  private static final String TOPIC = "foo";
+  private static final EventMessage eventMessage =
+      new EventMessage(
+          new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+
+  @Before
+  public void setUp() throws IOException {
+    when(publisherProviderMock.get(TOPIC)).thenReturn(publisherMock);
+    objectUnderTest =
+        new PubSubPublisher(
+            confMock,
+            publisherProviderMock,
+            OutputFormat.JSON_COMPACT.newGson(),
+            pubSubPublisherMetricsMock,
+            TOPIC);
+  }
+
+  @Test
+  public void shouldIncrementFailedToPublishMessageWhenAsyncPublishFails() {
+    when(confMock.isSendAsync()).thenReturn(true);
+    when(publisherMock.publish(any()))
+        .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
+
+    objectUnderTest.publish(eventMessage);
+
+    verify(pubSubPublisherMetricsMock, only()).incrementFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementFailedToPublishMessageWhenSyncPublishFails() {
+    when(confMock.isSendAsync()).thenReturn(false);
+    when(publisherMock.publish(any()))
+        .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
+
+    objectUnderTest.publish(eventMessage);
+
+    verify(pubSubPublisherMetricsMock, only()).incrementFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementSuccessToPublishMessageWhenAsyncPublishSucceeds() {
+    when(confMock.isSendAsync()).thenReturn(true);
+    when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
+
+    objectUnderTest.publish(eventMessage);
+
+    verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementSuccessToPublishMessageWhenSyncPublishSucceeds() {
+    when(confMock.isSendAsync()).thenReturn(false);
+    when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
+
+    objectUnderTest.publish(eventMessage);
+
+    verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
+  }
+}