Merge branch 'stable-3.0' into stable-3.1

* stable-3.0:
  Add publisher metrics

Change-Id: I213a2fe90144459f9a4f9c19d394da3f7eb7c4ae
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 4f599f7..0945768 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,12 +9,12 @@
 
     maven_jar(
         name = "testcontainers-kafka",
-        artifact = "org.testcontainers:kafka:1.10.6",
-        sha1 = "5984e31306bd6c84a36092cdd19e0ef7e2268d98",
+        artifact = "org.testcontainers:kafka:1.13.0",
+        sha1 = "ab6b31212f6a0ce341aa907fceffb722d9b9f3f7",
     )
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.0.4",
-        sha1 = "350b438f532678b1f9a277b7e7b6fa9da4b725b3",
+        artifact = "com.gerritforge:events-broker:3.1.3",
+        sha1 = "a12ef44f9b75a5dbecac9f1f0acf0f236b220252",
     )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
index 81174f9..7a7055e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriberMetrics.java
@@ -4,6 +4,7 @@
 import com.google.gerrit.metrics.Description;
 import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.logging.PluginMetadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -26,7 +27,7 @@
             new Description("Number of failed attempts to poll messages by the subscriber")
                 .setRate()
                 .setUnit("errors"),
-            Field.ofString(
+            stringField(
                 SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
     this.subscriberFailureCounter =
         metricMaker.newCounter(
@@ -34,8 +35,7 @@
             new Description("Number of messages failed to consume by the subscriber consumer")
                 .setRate()
                 .setUnit("errors"),
-            Field.ofString(
-                SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
+            stringField(SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
   }
 
   public void incrementSubscriberFailedToPollMessages() {
@@ -45,4 +45,17 @@
   public void incrementSubscriberFailedToConsumeMessage() {
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
+
+  public Field<String> stringField(String metadataKey, String description) {
+    return Field.ofString(
+            metadataKey,
+            (metadataBuilder, fieldValue) ->
+                metadataBuilder.addPluginMetadata(PluginMetadata.create(metadataKey, fieldValue)))
+        .description(description)
+        .build();
+  }
+
+  public Description rateDescription(String unit, String description) {
+    return new Description(description).setRate().setUnit(unit);
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 95d2603..06261ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
 import com.google.common.collect.Iterables;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index eabc833..48350f9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -70,18 +70,6 @@
   private KafkaSession session;
   private Gson gson;
 
-  public static class TestKafkaContainer extends KafkaContainer {
-    public TestKafkaContainer() {
-      addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
-      addFixedExposedPort(ZOOKEEPER_PORT, ZOOKEEPER_PORT);
-    }
-
-    @Override
-    public String getBootstrapServers() {
-      return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), KAFKA_PORT);
-    }
-  }
-
   public static class TestWorkQueue extends WorkQueue {
 
     @Inject
@@ -149,7 +137,7 @@
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    kafka = new TestKafkaContainer();
+    kafka = new KafkaContainer();
     kafka.start();
     System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
   }