Merge "Publish metrics when producing and consuming messages" into stable-3.3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
index b8d8c49..a6058ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -18,6 +18,7 @@
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.flogger.FluentLogger;
import com.google.gson.Gson;
import com.google.inject.Inject;
@@ -37,6 +38,7 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Gson gson;
+ private final PubSubSubscriberMetrics subscriberMetrics;
private final String topic;
private final Consumer<EventMessage> messageProcessor;
private final SubscriberProvider subscriberProvider;
@@ -48,9 +50,11 @@
Gson gson,
SubscriberProvider subscriberProvider,
PubSubConfiguration config,
+ PubSubSubscriberMetrics subscriberMetrics,
@Assisted String topic,
@Assisted Consumer<EventMessage> messageProcessor) {
this.gson = gson;
+ this.subscriberMetrics = subscriberMetrics;
this.topic = topic;
this.messageProcessor = messageProcessor;
this.subscriberProvider = subscriberProvider;
@@ -58,15 +62,8 @@
}
public void subscribe() {
- MessageReceiver receiver =
- (PubsubMessage message, AckReplyConsumer consumer) -> {
- EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
- messageProcessor.accept(event);
- consumer.ack();
- };
-
try {
- subscriber = subscriberProvider.get(topic, receiver);
+ subscriber = subscriberProvider.get(topic, getMessageReceiver());
subscriber
.startAsync()
.awaitRunning(config.getSubscribtionTimeoutInSeconds(), TimeUnit.SECONDS);
@@ -98,4 +95,21 @@
logger.atSevere().withCause(e).log("Timeout during subscriber shutdown");
}
}
+
+ @VisibleForTesting
+ MessageReceiver getMessageReceiver() {
+ return (PubsubMessage message, AckReplyConsumer consumer) -> {
+ try {
+ EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
+ messageProcessor.accept(event);
+ consumer.ack();
+ subscriberMetrics.incrementSucceedToConsumeMessage();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Exception when consuming message %s from topic %s [message: %s]",
+ message.getMessageId(), topic, message.getData().toStringUtf8());
+ subscriberMetrics.incrementFailedToConsumeMessage();
+ }
+ };
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventsMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventsMetrics.java
new file mode 100644
index 0000000..44968f4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventsMetrics.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.server.logging.PluginMetadata;
+
+public abstract class PubSubEventsMetrics {
+
+ public Field<String> stringField(String metadataKey, String description) {
+ return Field.ofString(
+ metadataKey,
+ (metadataBuilder, fieldValue) ->
+ metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
+ .description(description)
+ .build();
+ }
+
+ public Description rateDescription(String unit, String description) {
+ return new Description(description).setRate().setUnit(unit);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index 24ef602..a802d71 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -16,8 +16,11 @@
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.server.events.Event;
import com.google.gson.Gson;
import com.google.inject.Inject;
@@ -37,6 +40,8 @@
}
private final Gson gson;
+ private final PubSubPublisherMetrics publisherMetrics;
+ private final String topic;
private final Publisher publisher;
private final PubSubConfiguration pubSubProperties;
@@ -45,9 +50,12 @@
PubSubConfiguration pubSubProperties,
PublisherProvider publisherProvider,
Gson gson,
+ PubSubPublisherMetrics publisherMetrics,
@Assisted String topic)
throws IOException {
this.gson = gson;
+ this.publisherMetrics = publisherMetrics;
+ this.topic = topic;
this.publisher = publisherProvider.get(topic);
this.pubSubProperties = pubSubProperties;
}
@@ -72,7 +80,29 @@
}
private ApiFuture<String> publishAsync(PubsubMessage pubsubMessage) {
- return publisher.publish(pubsubMessage);
+ ApiFuture<String> publish = publisher.publish(pubsubMessage);
+ ApiFutures.addCallback(
+ publish,
+ new ApiFutureCallback<String>() {
+ @Override
+ public void onFailure(Throwable t) {
+ logger.atSevere().withCause(t).log(
+ "Exception when publishing message (id:%s) to topic '%s' [message: %s]",
+ pubsubMessage.getMessageId(), topic, pubsubMessage.getData().toStringUtf8());
+ publisherMetrics.incrementFailedToPublishMessage();
+ }
+
+ @Override
+ public void onSuccess(String messageId) {
+ logger.atFine().log(
+ "Successfully published message (id:%s) to topic '%s' [message: %s]",
+ messageId, topic, pubsubMessage.getData().toStringUtf8());
+
+ publisherMetrics.incrementSucceedToPublishMessage();
+ }
+ },
+ MoreExecutors.directExecutor());
+ return publish;
}
private void publishSync(PubsubMessage pubsubMessage) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherMetrics.java
new file mode 100644
index 0000000..a205238
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherMetrics.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PubSubPublisherMetrics extends PubSubEventsMetrics {
+ private static final String PUBLISHER_SUCCESS_COUNTER = "publisher_success_counter";
+ private static final String PUBLISHER_FAILURE_COUNTER = "publisher_failure_counter";
+
+ private final Counter1<String> publisherSuccessCounter;
+ private final Counter1<String> publisherFailureCounter;
+
+ @Inject
+ public PubSubPublisherMetrics(MetricMaker metricMaker, @PluginName String pluginName) {
+
+ this.publisherSuccessCounter =
+ metricMaker.newCounter(
+ String.join("/", pluginName, PUBLISHER_SUCCESS_COUNTER),
+ rateDescription("messages", "Number of successfully published messages"),
+ stringField(PUBLISHER_SUCCESS_COUNTER, "Count of published messages"));
+ this.publisherFailureCounter =
+ metricMaker.newCounter(
+ String.join("/", pluginName, PUBLISHER_FAILURE_COUNTER),
+ rateDescription("errors", "Number of messages failed to publish"),
+ stringField(PUBLISHER_FAILURE_COUNTER, "Count of messages failed to publish"));
+ }
+
+ public void incrementSucceedToPublishMessage() {
+ publisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+ }
+
+ public void incrementFailedToPublishMessage() {
+ publisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubSubscriberMetrics.java
new file mode 100644
index 0000000..bbde683
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubSubscriberMetrics.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class PubSubSubscriberMetrics extends PubSubEventsMetrics {
+
+ private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_success_counter";
+ private static final String SUBSCRIBER_FAILURE_COUNTER = "subscriber_failure_counter";
+
+ private final Counter1<String> subscriberSuccessCounter;
+ private final Counter1<String> subscriberFailureCounter;
+
+ @Inject
+ public PubSubSubscriberMetrics(MetricMaker metricMaker, @PluginName String pluginName) {
+ this.subscriberSuccessCounter =
+ metricMaker.newCounter(
+ String.join("/", pluginName, SUBSCRIBER_SUCCESS_COUNTER),
+ rateDescription(
+ "messages", "Number of messages successfully consumed by the subscriber"),
+ stringField(SUBSCRIBER_SUCCESS_COUNTER, "Count of successfully consumed messages"));
+ this.subscriberFailureCounter =
+ metricMaker.newCounter(
+ String.join("/", pluginName, SUBSCRIBER_FAILURE_COUNTER),
+ rateDescription("errors", "Number of messages failed to consume by the subscriber"),
+ stringField(SUBSCRIBER_FAILURE_COUNTER, "Count of messages failed to consume"));
+ }
+
+ public void incrementFailedToConsumeMessage() {
+ subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
+ }
+
+ public void incrementSucceedToConsumeMessage() {
+ subscriberSuccessCounter.increment(SUBSCRIBER_SUCCESS_COUNTER);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
new file mode 100644
index 0000000..ec2dd69
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriberTest.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.gerrit.json.OutputFormat;
+import com.google.pubsub.v1.PubsubMessage;
+import java.util.function.Consumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubEventSubscriberTest {
+
+ @Mock PubSubConfiguration confMock;
+ @Mock SubscriberProvider subscriberProviderMock;
+ @Mock PubSubSubscriberMetrics pubSubSubscriberMetricsMock;
+ @Mock AckReplyConsumer ackReplyConsumerMock;
+
+ private static final String TOPIC = "foo";
+
+ @Test
+ public void shouldIncrementFailedToConsumeMessageWhenReceivingFails() {
+ Consumer<EventMessage> failingConsumer =
+ (message) -> {
+ throw new RuntimeException("Error receiving message");
+ };
+
+ messageReceiver(failingConsumer)
+ .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock);
+
+ verify(pubSubSubscriberMetricsMock, only()).incrementFailedToConsumeMessage();
+ }
+
+ @Test
+ public void shouldIncrementSucceedToConsumeMessageWhenReceivingSucceeds() {
+ Consumer<EventMessage> succeedingConsumer = (message) -> {};
+
+ messageReceiver(succeedingConsumer)
+ .receiveMessage(PubsubMessage.getDefaultInstance(), ackReplyConsumerMock);
+
+ verify(pubSubSubscriberMetricsMock, only()).incrementSucceedToConsumeMessage();
+ }
+
+ private MessageReceiver messageReceiver(Consumer<EventMessage> consumer) {
+ return new PubSubEventSubscriber(
+ OutputFormat.JSON_COMPACT.newGson(),
+ subscriberProviderMock,
+ confMock,
+ pubSubSubscriberMetricsMock,
+ TOPIC,
+ consumer)
+ .getMessageReceiver();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
new file mode 100644
index 0000000..c009973
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.gerrit.json.OutputFormat;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import java.io.IOException;
+import java.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubPublisherTest {
+ PubSubPublisher objectUnderTest;
+
+ @Mock PubSubConfiguration confMock;
+ @Mock PublisherProvider publisherProviderMock;
+ @Mock Publisher publisherMock;
+ @Mock PubSubPublisherMetrics pubSubPublisherMetricsMock;
+
+ private static final String TOPIC = "foo";
+ private static final EventMessage eventMessage =
+ new EventMessage(
+ new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+
+ @Before
+ public void setUp() throws IOException {
+ when(publisherProviderMock.get(TOPIC)).thenReturn(publisherMock);
+ objectUnderTest =
+ new PubSubPublisher(
+ confMock,
+ publisherProviderMock,
+ OutputFormat.JSON_COMPACT.newGson(),
+ pubSubPublisherMetricsMock,
+ TOPIC);
+ }
+
+ @Test
+ public void shouldIncrementFailedToPublishMessageWhenAsyncPublishFails() {
+ when(confMock.isSendAsync()).thenReturn(true);
+ when(publisherMock.publish(any()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
+
+ objectUnderTest.publish(eventMessage);
+
+ verify(pubSubPublisherMetricsMock, only()).incrementFailedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementFailedToPublishMessageWhenSyncPublishFails() {
+ when(confMock.isSendAsync()).thenReturn(false);
+ when(publisherMock.publish(any()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
+
+ objectUnderTest.publish(eventMessage);
+
+ verify(pubSubPublisherMetricsMock, only()).incrementFailedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementSuccessToPublishMessageWhenAsyncPublishSucceeds() {
+ when(confMock.isSendAsync()).thenReturn(true);
+ when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
+
+ objectUnderTest.publish(eventMessage);
+
+ verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
+ }
+
+ @Test
+ public void shouldIncrementSuccessToPublishMessageWhenSyncPublishSucceeds() {
+ when(confMock.isSendAsync()).thenReturn(false);
+ when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
+
+ objectUnderTest.publish(eventMessage);
+
+ verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
+ }
+}