Merge branch 'stable-3.2'

* stable-3.2:
  Adapt kafka publisher metrics to the metrics framework
  Add publisher metrics

Change-Id: I09cac819532dbdb72656ca788745a304d71eed75
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/KafkaEventsMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/KafkaEventsMetrics.java
new file mode 100644
index 0000000..2226ca2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/KafkaEventsMetrics.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka;
+
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.server.logging.PluginMetadata;
+
+public abstract class KafkaEventsMetrics {
+
+  public Field<String> stringField(String metadataKey, String description) {
+    return Field.ofString(
+            metadataKey,
+            (metadataBuilder, fieldValue) ->
+                metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
+        .description(description)
+        .build();
+  }
+
+  public Description rateDescription(String unit, String description) {
+    return new Description(description).setRate().setUnit(unit);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index c9a28ba..1be52cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -22,8 +22,11 @@
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import org.apache.kafka.clients.producer.KafkaProducer;
 
 class Module extends AbstractModule {
 
@@ -40,6 +43,9 @@
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
     DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
 
+    bind(new TypeLiteral<KafkaProducer<String, String>>() {})
+        .toProvider(KafkaProducerProvider.class);
+
     install(kafkaBrokerModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
new file mode 100644
index 0000000..fbfd1fc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.KafkaEventsMetrics;
+
+@Singleton
+public class KafkaEventsPublisherMetrics extends KafkaEventsMetrics {
+  private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_success_counter";
+  private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
+
+  private final Counter1<String> brokerPublisherSuccessCounter;
+  private final Counter1<String> brokerPublisherFailureCounter;
+
+  @Inject
+  public KafkaEventsPublisherMetrics(MetricMaker metricMaker) {
+
+    this.brokerPublisherSuccessCounter =
+        metricMaker.newCounter(
+            "kafka/broker/broker_message_publisher_counter",
+            new Description("Number of successfully published messages by the broker publisher")
+                .setRate()
+                .setUnit("messages"),
+            stringField(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+    this.brokerPublisherFailureCounter =
+        metricMaker.newCounter(
+            "kafka/broker/broker_message_publisher_failure_counter",
+            new Description("Number of messages failed to publish by the broker publisher")
+                .setRate()
+                .setUnit("errors"),
+            stringField(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+  }
+
+  public void incrementBrokerPublishedMessage() {
+    brokerPublisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+  }
+
+  public void incrementBrokerFailedToPublishMessage() {
+    brokerPublisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
new file mode 100644
index 0000000..b1f11f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.session;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+public class KafkaProducerProvider implements Provider<KafkaProducer<String, String>> {
+  private final KafkaProperties properties;
+
+  @Inject
+  public KafkaProducerProvider(KafkaProperties properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public KafkaProducer<String, String> get() {
+    return new KafkaProducer<>(properties);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index a50ae0a..bb79cb5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -15,8 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka.session;
 
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
-import java.util.concurrent.ExecutionException;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
 import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -29,11 +30,18 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
   private final KafkaProperties properties;
+  private final Provider<KafkaProducer<String, String>> producerProvider;
+  private final KafkaEventsPublisherMetrics publisherMetrics;
   private volatile Producer<String, String> producer;
 
   @Inject
-  public KafkaSession(KafkaProperties properties) {
+  public KafkaSession(
+      Provider<KafkaProducer<String, String>> producerProvider,
+      KafkaProperties properties,
+      KafkaEventsPublisherMetrics publisherMetrics) {
+    this.producerProvider = producerProvider;
     this.properties = properties;
+    this.publisherMetrics = publisherMetrics;
   }
 
   public boolean isOpen() {
@@ -55,7 +63,7 @@
      * ClassNotFoundExceptions
      */
     setConnectionClassLoader();
-    producer = new KafkaProducer<>(properties);
+    producer = producerProvider.get();
     LOGGER.info("Connection established.");
   }
 
@@ -84,29 +92,40 @@
   }
 
   private boolean publishSync(String topic, String messageBody) {
-    Future<RecordMetadata> future =
-        producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
+
     try {
+      Future<RecordMetadata> future =
+          producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
       RecordMetadata metadata = future.get();
       LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+      publisherMetrics.incrementBrokerPublishedMessage();
       return true;
-    } catch (InterruptedException | ExecutionException e) {
+    } catch (Throwable e) {
       LOGGER.error("Cannot send the message", e);
+      publisherMetrics.incrementBrokerFailedToPublishMessage();
       return false;
     }
   }
 
   private boolean publishAsync(String topic, String messageBody) {
-    Future<RecordMetadata> future =
-        producer.send(
-            new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
-            (metadata, e) -> {
-              if (metadata != null && e == null) {
-                LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
-              } else {
-                LOGGER.error("Cannot send the message", e);
-              }
-            });
-    return future != null;
+    try {
+      Future<RecordMetadata> future =
+          producer.send(
+              new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
+              (metadata, e) -> {
+                if (metadata != null && e == null) {
+                  LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+                  publisherMetrics.incrementBrokerPublishedMessage();
+                } else {
+                  LOGGER.error("Cannot send the message", e);
+                  publisherMetrics.incrementBrokerFailedToPublishMessage();
+                }
+              });
+      return future != null;
+    } catch (Throwable e) {
+      LOGGER.error("Cannot send the message", e);
+      publisherMetrics.incrementBrokerFailedToPublishMessage();
+      return false;
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
index 7a7055e..54a6590 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
@@ -2,14 +2,13 @@
 
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
-import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
-import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.KafkaEventsMetrics;
 
 @Singleton
-class KafkaEventSubscriberMetrics {
+class KafkaEventSubscriberMetrics extends KafkaEventsMetrics {
 
   private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
       "subscriber_msg_consumer_poll_failure_counter";
@@ -45,17 +44,4 @@
   public void incrementSubscriberFailedToConsumeMessage() {
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
-
-  public Field<String> stringField(String metadataKey, String description) {
-    return Field.ofString(
-            metadataKey,
-            (metadataBuilder, fieldValue) ->
-                metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
-        .description(description)
-        .build();
-  }
-
-  public Description rateDescription(String unit, String description) {
-    return new Description(description).setRate().setUnit(unit);
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index fab4d3b..48350f9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -32,8 +32,10 @@
 import com.google.inject.Injector;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 import java.util.ArrayList;
 import java.util.List;
@@ -41,6 +43,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -95,6 +98,9 @@
           new KafkaSubscriberProperties(
               TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
       bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+      bind(new TypeLiteral<KafkaProducer<String, String>>() {})
+          .toProvider(KafkaProducerProvider.class);
+
       bind(WorkQueue.class).to(TestWorkQueue.class);
     }
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
new file mode 100644
index 0000000..5aa9ca8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -0,0 +1,127 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.Futures;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSessionTest {
+  KafkaSession objectUnderTest;
+  @Mock KafkaProducer<String, String> kafkaProducer;
+  @Mock KafkaProducerProvider producerProvider;
+  @Mock KafkaProperties properties;
+  @Mock KafkaEventsPublisherMetrics publisherMetrics;
+  @Captor ArgumentCaptor<Callback> callbackCaptor;
+
+  RecordMetadata recordMetadata;
+  String message = "sample_message";
+  private String topic = "index";
+
+  @Before
+  public void setUp() {
+    when(producerProvider.get()).thenReturn(kafkaProducer);
+    when(properties.getTopic()).thenReturn(topic);
+
+    recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
+
+    objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
+    objectUnderTest.connect();
+  }
+
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+    objectUnderTest.publish(message);
+    verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
+    objectUnderTest.publish(message);
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedExceptionInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
+    try {
+      objectUnderTest.publish(message);
+    } catch (RuntimeException e) {
+      // expected
+    }
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
+
+    objectUnderTest.publish(message);
+
+    verify(kafkaProducer).send(any(), callbackCaptor.capture());
+    callbackCaptor.getValue().onCompletion(recordMetadata, null);
+    verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any()))
+        .thenReturn(Futures.immediateFailedFuture(new Exception()));
+
+    objectUnderTest.publish(message);
+
+    verify(kafkaProducer).send(any(), callbackCaptor.capture());
+    callbackCaptor.getValue().onCompletion(null, new Exception());
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedExceptionInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any()))
+        .thenThrow(new RuntimeException("Unexpected runtime exception"));
+    try {
+      objectUnderTest.publish(message);
+    } catch (RuntimeException e) {
+      // expected
+    }
+    verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
+}