Merge branch 'stable-3.0' into stable-3.1
* stable-3.0:
Allow to send message to Kafka asynchronously
Fix Kafka container-test when running on CI
Increase default number of subscribers
Re-run consumer when exited because of an error
Trace errors that cause a consumer thread to exit
Add singleton scope to KafkaSubscriberProperties
Change-Id: I0bb5e8c36d17d57a86806132acddf2d3039a1894
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 4f599f7..bbc1da5 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.0.4",
- sha1 = "350b438f532678b1f9a277b7e7b6fa9da4b725b3",
+ artifact = "com.gerritforge:events-broker:3.1.3",
+ sha1 = "a12ef44f9b75a5dbecac9f1f0acf0f236b220252",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
index 81174f9..7a7055e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
@@ -4,6 +4,7 @@
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.Field;
import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.logging.PluginMetadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -26,7 +27,7 @@
new Description("Number of failed attempts to poll messages by the subscriber")
.setRate()
.setUnit("errors"),
- Field.ofString(
+ stringField(
SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
this.subscriberFailureCounter =
metricMaker.newCounter(
@@ -34,8 +35,7 @@
new Description("Number of messages failed to consume by the subscriber consumer")
.setRate()
.setUnit("errors"),
- Field.ofString(
- SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+ stringField(SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
}
public void incrementSubscriberFailedToPollMessages() {
@@ -45,4 +45,17 @@
public void incrementSubscriberFailedToConsumeMessage() {
subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
}
+
+ public Field<String> stringField(String metadataKey, String description) {
+ return Field.ofString(
+ metadataKey,
+ (metadataBuilder, fieldValue) ->
+ metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
+ .description(description)
+ .build();
+ }
+
+ public Description rateDescription(String unit, String description) {
+ return new Description(description).setRate().setUnit(unit);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 95d2603..06261ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.kafka;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.google.common.collect.Iterables;