Merge branch 'stable-3.0' into stable-3.1
* stable-3.0:
Add publisher metrics
Change-Id: I213a2fe90144459f9a4f9c19d394da3f7eb7c4ae
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index c9a28ba..1be52cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -22,8 +22,11 @@
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import org.apache.kafka.clients.producer.KafkaProducer;
class Module extends AbstractModule {
@@ -40,6 +43,9 @@
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
+ bind(new TypeLiteral<KafkaProducer<String, String>>() {})
+ .toProvider(KafkaProducerProvider.class);
+
install(kafkaBrokerModule);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
new file mode 100644
index 0000000..083d032
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaEventsPublisherMetrics.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class KafkaEventsPublisherMetrics {
+ private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_success_counter";
+ private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
+
+ private final Counter1<String> brokerPublisherSuccessCounter;
+ private final Counter1<String> brokerPublisherFailureCounter;
+
+ @Inject
+ public KafkaEventsPublisherMetrics(MetricMaker metricMaker) {
+
+ this.brokerPublisherSuccessCounter =
+ metricMaker.newCounter(
+ "kafka/broker/broker_message_publisher_counter",
+ new Description("Number of successfully published messages by the broker publisher")
+ .setRate()
+ .setUnit("messages"),
+ Field.ofString(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+ this.brokerPublisherFailureCounter =
+ metricMaker.newCounter(
+ "kafka/broker/broker_message_publisher_failure_counter",
+ new Description("Number of messages failed to publish by the broker publisher")
+ .setRate()
+ .setUnit("errors"),
+ Field.ofString(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+ }
+
+ public void incrementBrokerPublishedMessage() {
+ brokerPublisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+ }
+
+ public void incrementBrokerFailedToPublishMessage() {
+ brokerPublisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
new file mode 100644
index 0000000..b1f11f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.session;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+public class KafkaProducerProvider implements Provider<KafkaProducer<String, String>> {
+ private final KafkaProperties properties;
+
+ @Inject
+ public KafkaProducerProvider(KafkaProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public KafkaProducer<String, String> get() {
+ return new KafkaProducer<>(properties);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index a50ae0a..bb79cb5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -15,8 +15,9 @@
package com.googlesource.gerrit.plugins.kafka.session;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
-import java.util.concurrent.ExecutionException;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -29,11 +30,18 @@
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
private final KafkaProperties properties;
+ private final Provider<KafkaProducer<String, String>> producerProvider;
+ private final KafkaEventsPublisherMetrics publisherMetrics;
private volatile Producer<String, String> producer;
@Inject
- public KafkaSession(KafkaProperties properties) {
+ public KafkaSession(
+ Provider<KafkaProducer<String, String>> producerProvider,
+ KafkaProperties properties,
+ KafkaEventsPublisherMetrics publisherMetrics) {
+ this.producerProvider = producerProvider;
this.properties = properties;
+ this.publisherMetrics = publisherMetrics;
}
public boolean isOpen() {
@@ -55,7 +63,7 @@
* ClassNotFoundExceptions
*/
setConnectionClassLoader();
- producer = new KafkaProducer<>(properties);
+ producer = producerProvider.get();
LOGGER.info("Connection established.");
}
@@ -84,29 +92,40 @@
}
private boolean publishSync(String topic, String messageBody) {
- Future<RecordMetadata> future =
- producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
+
try {
+ Future<RecordMetadata> future =
+ producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
RecordMetadata metadata = future.get();
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+ publisherMetrics.incrementBrokerPublishedMessage();
return true;
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
+ publisherMetrics.incrementBrokerFailedToPublishMessage();
return false;
}
}
private boolean publishAsync(String topic, String messageBody) {
- Future<RecordMetadata> future =
- producer.send(
- new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
- (metadata, e) -> {
- if (metadata != null && e == null) {
- LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
- } else {
- LOGGER.error("Cannot send the message", e);
- }
- });
- return future != null;
+ try {
+ Future<RecordMetadata> future =
+ producer.send(
+ new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
+ (metadata, e) -> {
+ if (metadata != null && e == null) {
+ LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+ publisherMetrics.incrementBrokerPublishedMessage();
+ } else {
+ LOGGER.error("Cannot send the message", e);
+ publisherMetrics.incrementBrokerFailedToPublishMessage();
+ }
+ });
+ return future != null;
+ } catch (Throwable e) {
+ LOGGER.error("Cannot send the message", e);
+ publisherMetrics.incrementBrokerFailedToPublishMessage();
+ return false;
+ }
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index fab4d3b..48350f9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -32,8 +32,10 @@
import com.google.inject.Injector;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
import java.util.ArrayList;
import java.util.List;
@@ -41,6 +43,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.After;
import org.junit.AfterClass;
@@ -95,6 +98,9 @@
new KafkaSubscriberProperties(
TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+ bind(new TypeLiteral<KafkaProducer<String, String>>() {})
+ .toProvider(KafkaProducerProvider.class);
+
bind(WorkQueue.class).to(TestWorkQueue.class);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
new file mode 100644
index 0000000..5aa9ca8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -0,0 +1,127 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.Futures;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSessionTest {
+ KafkaSession objectUnderTest;
+ @Mock KafkaProducer<String, String> kafkaProducer;
+ @Mock KafkaProducerProvider producerProvider;
+ @Mock KafkaProperties properties;
+ @Mock KafkaEventsPublisherMetrics publisherMetrics;
+ @Captor ArgumentCaptor<Callback> callbackCaptor;
+
+ RecordMetadata recordMetadata;
+ String message = "sample_message";
+ private String topic = "index";
+
+ @Before
+ public void setUp() {
+ when(producerProvider.get()).thenReturn(kafkaProducer);
+ when(properties.getTopic()).thenReturn(topic);
+
+ recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
+
+ objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
+ objectUnderTest.connect();
+ }
+
+ @Test
+ public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
+ when(properties.isSendAsync()).thenReturn(false);
+ when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+ objectUnderTest.publish(message);
+ verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
+ when(properties.isSendAsync()).thenReturn(false);
+ when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
+ objectUnderTest.publish(message);
+ verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedExceptionInSyncMode() {
+ when(properties.isSendAsync()).thenReturn(false);
+ when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
+ try {
+ objectUnderTest.publish(message);
+ } catch (RuntimeException e) {
+ // expected
+ }
+ verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInAsyncMode() {
+ when(properties.isSendAsync()).thenReturn(true);
+ when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
+
+ objectUnderTest.publish(message);
+
+ verify(kafkaProducer).send(any(), callbackCaptor.capture());
+ callbackCaptor.getValue().onCompletion(recordMetadata, null);
+ verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInAsyncMode() {
+ when(properties.isSendAsync()).thenReturn(true);
+ when(kafkaProducer.send(any(), any()))
+ .thenReturn(Futures.immediateFailedFuture(new Exception()));
+
+ objectUnderTest.publish(message);
+
+ verify(kafkaProducer).send(any(), callbackCaptor.capture());
+ callbackCaptor.getValue().onCompletion(null, new Exception());
+ verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenUnexpectedExceptionInAsyncMode() {
+ when(properties.isSendAsync()).thenReturn(true);
+ when(kafkaProducer.send(any(), any()))
+ .thenThrow(new RuntimeException("Unexpected runtime exception"));
+ try {
+ objectUnderTest.publish(message);
+ } catch (RuntimeException e) {
+ // expected
+ }
+ verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
+}