Merge branch 'stable-2.16' into stable-3.0

* stable-2.16:
  Broker publisher metric for number of message published
  Use KafkaConfiguration for Kafka related properties and functionality

Change-Id: Idd9b90cbc446a1e8be4cd0f35878035bf32aa1fc
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 3226a42..7348137 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -18,28 +18,20 @@
 import static com.google.common.base.Suppliers.ofInstance;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.MultimapBuilder;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement.EnforcePolicy;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -72,7 +64,6 @@
   private final Supplier<SharedRefDatabase> sharedRefDb;
   private final Supplier<Collection<Message>> replicationConfigValidation;
   private final Config multiSiteConfig;
-  private final KafkaConfiguration kafkaConfig;
 
   @Inject
   Configuration(SitePaths sitePaths) {
@@ -83,7 +74,6 @@
   public Configuration(Config multiSiteConfig, Config replicationConfig) {
     Supplier<Config> lazyMultiSiteCfg = lazyLoad(multiSiteConfig);
     this.multiSiteConfig = multiSiteConfig;
-    this.kafkaConfig = new KafkaConfiguration(multiSiteConfig);
     replicationConfigValidation = lazyValidateReplicatioConfig(replicationConfig);
     cache = memoize(() -> new Cache(lazyMultiSiteCfg));
     event = memoize(() -> new Event(lazyMultiSiteCfg));
@@ -99,14 +89,6 @@
     return sharedRefDb.get();
   }
 
-  public Kafka getKafka() {
-    return kafkaConfig.getKafka();
-  }
-
-  public KafkaPublisher kafkaPublisher() {
-    return kafkaConfig.kafkaPublisher();
-  }
-
   public Cache cache() {
     return cache.get();
   }
@@ -119,10 +101,6 @@
     return index.get();
   }
 
-  public KafkaSubscriber kafkaSubscriber() {
-    return kafkaConfig.kafkaSubscriber();
-  }
-
   public Collection<Message> validate() {
     return replicationConfigValidation.get();
   }
@@ -182,179 +160,6 @@
     }
   }
 
-  public static class Kafka {
-    private final Map<EventFamily, String> eventTopics;
-    private final String bootstrapServers;
-
-    private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
-        ImmutableMap.of(
-            EventFamily.INDEX_EVENT,
-            "GERRIT.EVENT.INDEX",
-            EventFamily.STREAM_EVENT,
-            "GERRIT.EVENT.STREAM",
-            EventFamily.CACHE_EVENT,
-            "GERRIT.EVENT.CACHE",
-            EventFamily.PROJECT_LIST_EVENT,
-            "GERRIT.EVENT.PROJECT.LIST");
-
-    Kafka(Supplier<Config> config) {
-      this.bootstrapServers =
-          getString(
-              config,
-              KafkaConfiguration.KAFKA_SECTION,
-              null,
-              "bootstrapServers",
-              KafkaConfiguration.DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
-
-      this.eventTopics = new HashMap<>();
-      for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
-        String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
-        eventTopics.put(
-            topicDefault.getKey(),
-            getString(
-                config,
-                KafkaConfiguration.KAFKA_SECTION,
-                null,
-                topicConfigKey,
-                topicDefault.getValue()));
-      }
-    }
-
-    public String getTopic(EventFamily eventType) {
-      return eventTopics.get(eventType);
-    }
-
-    public String getBootstrapServers() {
-      return bootstrapServers;
-    }
-
-    private static String getString(
-        Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
-      String value = cfg.get().getString(section, subsection, name);
-      if (!Strings.isNullOrEmpty(value)) {
-        return value;
-      }
-      return defaultValue;
-    }
-  }
-
-  public static class KafkaPublisher extends Properties {
-    private static final long serialVersionUID = 0L;
-
-    public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
-
-    public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
-    public static final boolean DEFAULT_BROKER_ENABLED = false;
-
-    private final boolean enabled;
-    private final Map<EventFamily, Boolean> eventsEnabled;
-
-    KafkaPublisher(Supplier<Config> cfg) {
-      enabled =
-          cfg.get()
-              .getBoolean(
-                  KafkaConfiguration.KAFKA_SECTION,
-                  KAFKA_PUBLISHER_SUBSECTION,
-                  KafkaConfiguration.ENABLE_KEY,
-                  DEFAULT_BROKER_ENABLED);
-
-      eventsEnabled = KafkaConfiguration.eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
-
-      if (enabled) {
-        setDefaults();
-        KafkaConfiguration.applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
-      }
-    }
-
-    private void setDefaults() {
-      put("acks", "all");
-      put("retries", 0);
-      put("batch.size", 16384);
-      put("linger.ms", 1);
-      put("buffer.memory", 33554432);
-      put("key.serializer", KAFKA_STRING_SERIALIZER);
-      put("value.serializer", KAFKA_STRING_SERIALIZER);
-      put("reconnect.backoff.ms", 5000L);
-    }
-
-    public boolean enabled() {
-      return enabled;
-    }
-
-    public boolean enabledEvent(EventFamily eventType) {
-      return eventsEnabled.get(eventType);
-    }
-  }
-
-  public static class KafkaSubscriber extends Properties {
-    private static final long serialVersionUID = 1L;
-
-    static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
-
-    private final boolean enabled;
-    private final Integer pollingInterval;
-    private Map<EventFamily, Boolean> eventsEnabled;
-    private final Config cfg;
-
-    public KafkaSubscriber(Supplier<Config> configSupplier) {
-      this.cfg = configSupplier.get();
-
-      this.pollingInterval =
-          cfg.getInt(
-              KafkaConfiguration.KAFKA_SECTION,
-              KAFKA_SUBSCRIBER_SUBSECTION,
-              "pollingIntervalMs",
-              KafkaConfiguration.DEFAULT_POLLING_INTERVAL_MS);
-
-      enabled =
-          cfg.getBoolean(
-              KafkaConfiguration.KAFKA_SECTION,
-              KAFKA_SUBSCRIBER_SUBSECTION,
-              KafkaConfiguration.ENABLE_KEY,
-              false);
-
-      eventsEnabled = KafkaConfiguration.eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
-
-      if (enabled) {
-        KafkaConfiguration.applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
-      }
-    }
-
-    public boolean enabled() {
-      return enabled;
-    }
-
-    public boolean enabledEvent(EventFamily eventFamily) {
-      return eventsEnabled.get(eventFamily);
-    }
-
-    public Properties initPropsWith(UUID instanceId) {
-      String groupId =
-          getString(
-              cfg,
-              KafkaConfiguration.KAFKA_SECTION,
-              KAFKA_SUBSCRIBER_SUBSECTION,
-              "groupId",
-              instanceId.toString());
-      this.put("group.id", groupId);
-
-      return this;
-    }
-
-    public Integer getPollingInterval() {
-      return pollingInterval;
-    }
-
-    private String getString(
-        Config cfg, String section, String subsection, String name, String defaultValue) {
-      String value = cfg.getString(section, subsection, name);
-      if (!Strings.isNullOrEmpty(value)) {
-        return value;
-      }
-      return defaultValue;
-    }
-  }
-
   public static class SharedRefDatabase {
     public static final String SECTION = "ref-database";
     public static final String SUBSECTION_ENFORCEMENT_RULES = "enforcementRules";
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
index 3376b49..b88699a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
@@ -21,34 +21,44 @@
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
-import com.googlesource.gerrit.plugins.multisite.Configuration.Kafka;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.google.common.collect.ImmutableMap;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Singleton
 public class KafkaConfiguration {
-  private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
 
-  static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+  private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
+  public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
   static final String KAFKA_SECTION = "kafka";
   static final String ENABLE_KEY = "enabled";
   static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
   static final boolean DEFAULT_ENABLE_PROCESSING = true;
-  static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
+  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
 
   private final Supplier<KafkaSubscriber> subscriber;
   private final Supplier<Kafka> kafka;
   private final Supplier<KafkaPublisher> publisher;
 
+  @Inject
+  KafkaConfiguration(SitePaths sitePaths) {
+    this(getConfigFile(sitePaths, Configuration.MULTI_SITE_CONFIG));
+  }
+
   @VisibleForTesting
   public KafkaConfiguration(Config kafkaConfig) {
     Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
@@ -57,6 +67,10 @@
     subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
   }
 
+  private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
+    return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
+  }
+
   public Kafka getKafka() {
     return kafka.get();
   }
@@ -65,7 +79,7 @@
     return subscriber.get();
   }
 
-  static void applyKafkaConfig(
+  private static void applyKafkaConfig(
       Supplier<Config> configSupplier, String subsectionName, Properties target) {
     Config config = configSupplier.get();
     for (String section : config.getSubsections(KAFKA_SECTION)) {
@@ -103,7 +117,8 @@
     return defaultValue;
   }
 
-  static Map<EventFamily, Boolean> eventsEnabled(Supplier<Config> config, String subsection) {
+  private static Map<EventFamily, Boolean> eventsEnabled(
+      Supplier<Config> config, String subsection) {
     Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
     for (EventFamily eventFamily : EventFamily.values()) {
       String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
@@ -112,11 +127,7 @@
           eventFamily,
           config
               .get()
-              .getBoolean(
-                  KafkaConfiguration.KAFKA_SECTION,
-                  subsection,
-                  enabledConfigKey,
-                  KafkaConfiguration.DEFAULT_ENABLE_PROCESSING));
+              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
     }
     return eventsEnabled;
   }
@@ -142,4 +153,156 @@
     }
     return ofInstance(config);
   }
+
+  public static class Kafka {
+    private final Map<EventFamily, String> eventTopics;
+    private final String bootstrapServers;
+
+    private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
+        ImmutableMap.of(
+            EventFamily.INDEX_EVENT,
+            "GERRIT.EVENT.INDEX",
+            EventFamily.STREAM_EVENT,
+            "GERRIT.EVENT.STREAM",
+            EventFamily.CACHE_EVENT,
+            "GERRIT.EVENT.CACHE",
+            EventFamily.PROJECT_LIST_EVENT,
+            "GERRIT.EVENT.PROJECT.LIST");
+
+    Kafka(Supplier<Config> config) {
+      this.bootstrapServers =
+          getString(
+              config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
+
+      this.eventTopics = new HashMap<>();
+      for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
+        String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
+        eventTopics.put(
+            topicDefault.getKey(),
+            getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+      }
+    }
+
+    public String getTopic(EventFamily eventType) {
+      return eventTopics.get(eventType);
+    }
+
+    public String getBootstrapServers() {
+      return bootstrapServers;
+    }
+
+    private static String getString(
+        Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+      String value = cfg.get().getString(section, subsection, name);
+      if (!Strings.isNullOrEmpty(value)) {
+        return value;
+      }
+      return defaultValue;
+    }
+  }
+
+  public static class KafkaPublisher extends Properties {
+    private static final long serialVersionUID = 0L;
+
+    public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
+
+    public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
+    public static final boolean DEFAULT_BROKER_ENABLED = false;
+
+    private final boolean enabled;
+    private final Map<EventFamily, Boolean> eventsEnabled;
+
+    private KafkaPublisher(Supplier<Config> cfg) {
+      enabled =
+          cfg.get()
+              .getBoolean(
+                  KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+
+      eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
+
+      if (enabled) {
+        setDefaults();
+        applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
+      }
+    }
+
+    private void setDefaults() {
+      put("acks", "all");
+      put("retries", 0);
+      put("batch.size", 16384);
+      put("linger.ms", 1);
+      put("buffer.memory", 33554432);
+      put("key.serializer", KAFKA_STRING_SERIALIZER);
+      put("value.serializer", KAFKA_STRING_SERIALIZER);
+      put("reconnect.backoff.ms", 5000L);
+    }
+
+    public boolean enabled() {
+      return enabled;
+    }
+
+    public boolean enabledEvent(EventFamily eventType) {
+      return eventsEnabled.get(eventType);
+    }
+  }
+
+  public static class KafkaSubscriber extends Properties {
+    private static final long serialVersionUID = 1L;
+
+    static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
+
+    private final boolean enabled;
+    private final Integer pollingInterval;
+    private Map<EventFamily, Boolean> eventsEnabled;
+    private final Config cfg;
+
+    public KafkaSubscriber(Supplier<Config> configSupplier) {
+      this.cfg = configSupplier.get();
+
+      this.pollingInterval =
+          cfg.getInt(
+              KAFKA_SECTION,
+              KAFKA_SUBSCRIBER_SUBSECTION,
+              "pollingIntervalMs",
+              DEFAULT_POLLING_INTERVAL_MS);
+
+      enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+
+      eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
+
+      if (enabled) {
+        applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
+      }
+    }
+
+    public boolean enabled() {
+      return enabled;
+    }
+
+    public boolean enabledEvent(EventFamily eventFamily) {
+      return eventsEnabled.get(eventFamily);
+    }
+
+    public Properties initPropsWith(UUID instanceId) {
+      String groupId =
+          getString(
+              cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
+      this.put("group.id", groupId);
+
+      return this;
+    }
+
+    public Integer getPollingInterval() {
+      return pollingInterval;
+    }
+
+    private String getString(
+        Config cfg, String section, String subsection, String name, String defaultValue) {
+      String value = cfg.getString(section, subsection, name);
+      if (!Strings.isNullOrEmpty(value)) {
+        return value;
+      }
+      return defaultValue;
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 620d658..c9d7c4b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
@@ -47,11 +48,31 @@
 
 public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
-  private final Configuration config;
+  private Configuration config;
+  private final boolean disableGitRepositoryValidation;
+  private KafkaConfiguration kafkaConfig;
 
   @Inject
-  public Module(Configuration config) {
+  public Module(Configuration config, KafkaConfiguration kafkaConfig) {
+    this(config, kafkaConfig, false);
+  }
+
+  // TODO: It is not possible to properly test the libModules in Gerrit.
+  // Disable the Git repository validation during integration test and then build the necessary
+  // support
+  // in Gerrit for it.
+  @VisibleForTesting
+  public Module(
+      Configuration config,
+      KafkaConfiguration kafkaConfig,
+      boolean disableGitRepositoryValidation) {
+    init(config, kafkaConfig);
+    this.disableGitRepositoryValidation = disableGitRepositoryValidation;
+  }
+
+  private void init(Configuration config, KafkaConfiguration kafkaConfig) {
     this.config = config;
+    this.kafkaConfig = kafkaConfig;
   }
 
   @Override
@@ -77,12 +98,12 @@
       install(new IndexModule());
     }
 
-    if (config.kafkaSubscriber().enabled()) {
-      install(new KafkaConsumerModule(config.kafkaSubscriber()));
+    if (kafkaConfig.kafkaSubscriber().enabled()) {
+      install(new KafkaConsumerModule(kafkaConfig.kafkaSubscriber()));
       install(new ForwardedEventRouterModule());
     }
-    if (config.kafkaPublisher().enabled()) {
-      install(new BrokerForwarderModule(config.kafkaPublisher()));
+    if (kafkaConfig.kafkaPublisher().enabled()) {
+      install(new BrokerForwarderModule(kafkaConfig.kafkaPublisher()));
     }
 
     if (config.getSharedRefDb().isEnabled()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java
new file mode 100644
index 0000000..f6be65a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class BrokerMetrics {
+  private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_counter";
+  private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
+
+  private final Counter1<String> brokerPublisherSuccessCounter;
+  private final Counter1<String> brokerPublisherFailureCounter;
+
+  @Inject
+  public BrokerMetrics(MetricMaker metricMaker) {
+
+    this.brokerPublisherSuccessCounter =
+        metricMaker.newCounter(
+            "multi_site/broker/broker_message_publisher_counter",
+            new Description("Number of messages published by the broker publisher")
+                .setRate()
+                .setUnit("messages"),
+            Field.ofString(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+    this.brokerPublisherFailureCounter =
+        metricMaker.newCounter(
+            "multi_site/broker/broker_message_publisher_failure_counter",
+            new Description("Number of messages failed to publish by the broker publisher")
+                .setRate()
+                .setUnit("errors"),
+            Field.ofString(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+  }
+
+  public void incrementBrokerPublishedMessage() {
+    brokerPublisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+  }
+
+  public void incrementBrokerFailedToPublishMessage() {
+    brokerPublisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
index 4dca9e9..62716e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
@@ -39,17 +39,20 @@
   private final Gson gson;
   private final UUID instanceId;
   private final MessageLogger msgLog;
+  private final BrokerMetrics brokerMetrics;
 
   @Inject
   public BrokerPublisher(
       BrokerSession session,
       @BrokerGson Gson gson,
       @InstanceId UUID instanceId,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      BrokerMetrics brokerMetrics) {
     this.session = session;
     this.gson = gson;
     this.instanceId = instanceId;
     this.msgLog = msgLog;
+    this.brokerMetrics = brokerMetrics;
   }
 
   @Override
@@ -73,7 +76,13 @@
 
     SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
     msgLog.log(Direction.PUBLISH, brokerEvent);
-    return session.publishEvent(eventType, getPayload(brokerEvent));
+    Boolean eventPublished = session.publishEvent(eventType, getPayload(brokerEvent));
+    if (eventPublished) {
+      brokerMetrics.incrementBrokerPublishedMessage();
+    } else {
+      brokerMetrics.incrementBrokerFailedToPublishMessage();
+    }
+    return eventPublished;
   }
 
   private String getPayload(SourceAwareEventWrapper event) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 6c3c3cf..7c83346 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -15,8 +15,8 @@
 package com.googlesource.gerrit.plugins.multisite.broker.kafka;
 
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import java.util.UUID;
@@ -31,13 +31,13 @@
 
 public class KafkaSession implements BrokerSession {
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
-  private final Configuration properties;
+  private KafkaConfiguration properties;
   private final UUID instanceId;
   private volatile Producer<String, String> producer;
 
   @Inject
-  public KafkaSession(Configuration configuration, @InstanceId UUID instanceId) {
-    this.properties = configuration;
+  public KafkaSession(KafkaConfiguration kafkaConfig, @InstanceId UUID instanceId) {
+    this.properties = kafkaConfig;
     this.instanceId = instanceId;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 7c6bef3..b7ee20b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -16,7 +16,7 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index e6c73f1..4116c06 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -23,8 +23,8 @@
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
@@ -52,12 +52,12 @@
   private final UUID instanceId;
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
-  private final Configuration configuration;
+  private final KafkaConfiguration configuration;
   private final OneOffRequestContext oneOffCtx;
   private final MessageLogger msgLog;
 
   public AbstractKafkaSubcriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ForwardedEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
index 8e724be..12b2790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
@@ -19,8 +19,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
 public class CacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public CacheEvictionEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index e52b624..9020486 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -19,8 +19,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
 public class IndexEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public IndexEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       IndexEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 5bc7669..3e0e5d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -17,7 +17,7 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
 import java.util.concurrent.Executor;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index a949cd2..9e5db3a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -19,8 +19,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
 public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ProjectListUpdateRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index d43a3c3..66070b6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -19,8 +19,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
 public class StreamEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public StreamEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 5c5d75a..83082b2 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -86,3 +86,16 @@
 
 For further information and supported options, refer to [config](config.md)
 documentation.
+
+## Metrics
+
+@PLUGIN@ plugin exposes following metrics:
+
+### Broker message publisher
+* Broker message published count
+
+`metric=multi_site/broker/broker_message_publisher_counter/broker_msg_publisher_counter, type=com.codahale.metrics.Meter`
+
+* Broker failed to publish message count
+
+`metric=multi_site/broker/broker_message_publisher_failure_counter/broker_msg_publisher_failure_counter, type=com.codahale.metrics.Meter`
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index ec3a431..0863f55 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -18,16 +18,11 @@
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.CACHE_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.PATTERN_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_THREAD_POOL_SIZE;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
 
 import com.google.common.collect.ImmutableList;
 import org.eclipse.jgit.lib.Config;
@@ -129,84 +124,6 @@
   }
 
   @Test
-  public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property.equals(kafkaPropertyValue)).isTrue();
-  }
-
-  @Test
-  public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
-    final String kafkaPropertyName = "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property.equals(kafkaPropertyValue)).isTrue();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
-    final String kafkaPropertyName = "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
   public void shouldReturnValidationErrorsWhenReplicationOnStartupIsEnabled() throws Exception {
     Config replicationConfig = new Config();
     replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
new file mode 100644
index 0000000..ab0d3bc
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
@@ -0,0 +1,121 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConfigurationTest {
+
+  private Config globalPluginConfig;
+
+  @Before
+  public void setUp() {
+    globalPluginConfig = new Config();
+  }
+
+  private KafkaConfiguration getConfiguration() {
+    return new KafkaConfiguration(globalPluginConfig);
+  }
+
+  @Test
+  public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+    assertThat(property.equals(kafkaPropertyValue)).isTrue();
+  }
+
+  @Test
+  public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+
+  @Test
+  public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
+    final String kafkaPropertyName = "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+
+  @Test
+  public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+    assertThat(property.equals(kafkaPropertyValue)).isTrue();
+  }
+
+  @Test
+  public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
+    final String kafkaPropertyName = "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+
+  @Test
+  public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 5281371..556917b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -36,13 +36,16 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private KafkaConfiguration kafkaConfig;
+
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private Module module;
 
   @Before
   public void setUp() {
-    module = new Module(configMock);
+    module = new Module(configMock, kafkaConfig);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index 8131a3f..36408ba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -15,6 +15,10 @@
 package com.googlesource.gerrit.plugins.multisite.broker.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.gerrit.extensions.client.ChangeKind;
 import com.google.gerrit.reviewdb.client.Account;
@@ -23,12 +27,14 @@
 import com.google.gerrit.server.data.AccountAttribute;
 import com.google.gerrit.server.data.ApprovalAttribute;
 import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.googlesource.gerrit.plugins.multisite.DisabledMessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
@@ -36,65 +42,44 @@
 import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class BrokerPublisherTest {
+
+  @Mock private BrokerMetrics brokerMetrics;
+  @Mock private BrokerSession session;
   private BrokerPublisher publisher;
+
   private MessageLogger NO_MSG_LOG = new DisabledMessageLogger();
   private Gson gson = new GsonProvider().get();
 
+  private String accountName = "Foo Bar";
+  private String accountEmail = "foo@bar.com";
+  private String accountUsername = "foobar";
+  private String approvalType = ChangeKind.REWORK.toString();
+
+  private String approvalDescription = "ApprovalDescription";
+  private String approvalValue = "+2";
+  private String oldApprovalValue = "+1";
+  private Long approvalGrantedOn = 123L;
+  private String commentDescription = "Patch Set 1: Code-Review+2";
+  private String projectName = "project";
+  private String refName = "refs/heads/master";
+  private String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
+  private Long eventCreatedOn = 123L;
+
   @Before
   public void setUp() {
-    publisher = new BrokerPublisher(new TestBrokerSession(), gson, UUID.randomUUID(), NO_MSG_LOG);
+    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), NO_MSG_LOG, brokerMetrics);
   }
 
   @Test
   public void shouldSerializeCommentAddedEvent() {
 
-    final String accountName = "Foo Bar";
-    final String accountEmail = "foo@bar.com";
-    final String accountUsername = "foobar";
-    final String approvalType = ChangeKind.REWORK.toString();
-
-    final String approvalDescription = "ApprovalDescription";
-    final String approvalValue = "+2";
-    final String oldApprovalValue = "+1";
-    final Long approvalGrantedOn = 123L;
-    final String commentDescription = "Patch Set 1: Code-Review+2";
-    final String projectName = "project";
-    final String refName = "refs/heads/master";
-    final String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
-    final Long eventCreatedOn = 123L;
-
-    final Change change =
-        new Change(
-            new Change.Key(changeId),
-            new Change.Id(1),
-            new Account.Id(1),
-            new Branch.NameKey(projectName, refName),
-            TimeUtil.nowTs());
-
-    CommentAddedEvent event = new CommentAddedEvent(change);
-    AccountAttribute accountAttribute = new AccountAttribute();
-    accountAttribute.email = accountEmail;
-    accountAttribute.name = accountName;
-    accountAttribute.username = accountUsername;
-
-    event.eventCreatedOn = eventCreatedOn;
-    event.approvals =
-        () -> {
-          ApprovalAttribute approvalAttribute = new ApprovalAttribute();
-          approvalAttribute.value = approvalValue;
-          approvalAttribute.oldValue = oldApprovalValue;
-          approvalAttribute.description = approvalDescription;
-          approvalAttribute.by = accountAttribute;
-          approvalAttribute.type = ChangeKind.REWORK.toString();
-          approvalAttribute.grantedOn = approvalGrantedOn;
-
-          return new ApprovalAttribute[] {approvalAttribute};
-        };
-
-    event.author = () -> accountAttribute;
-    event.comment = commentDescription;
+    Event event = createSampleEvent();
 
     String expectedSerializedCommentEvent =
         "{\"author\": {\"name\": \""
@@ -140,22 +125,55 @@
     assertThat(publisher.eventToJson(event).equals(expectedCommentEventJsonObject)).isTrue();
   }
 
-  private static class TestBrokerSession implements BrokerSession {
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+    Event event = createSampleEvent();
+    when(session.publishEvent(any(), any())).thenReturn(true);
+    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+  }
 
-    @Override
-    public boolean isOpen() {
-      return false;
-    }
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
+    Event event = createSampleEvent();
+    when(session.publishEvent(any(), any())).thenReturn(false);
 
-    @Override
-    public void connect() {}
+    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
 
-    @Override
-    public void disconnect() {}
+  private Event createSampleEvent() {
+    final Change change =
+        new Change(
+            new Change.Key(changeId),
+            new Change.Id(1),
+            new Account.Id(1),
+            new Branch.NameKey(projectName, refName),
+            TimeUtil.nowTs());
 
-    @Override
-    public boolean publishEvent(EventFamily eventFamily, String payload) {
-      return false;
-    }
+    CommentAddedEvent event = new CommentAddedEvent(change);
+    AccountAttribute accountAttribute = new AccountAttribute();
+    accountAttribute.email = accountEmail;
+    accountAttribute.name = accountName;
+    accountAttribute.username = accountUsername;
+
+    event.eventCreatedOn = eventCreatedOn;
+    event.approvals =
+        () -> {
+          ApprovalAttribute approvalAttribute = new ApprovalAttribute();
+          approvalAttribute.value = approvalValue;
+          approvalAttribute.oldValue = oldApprovalValue;
+          approvalAttribute.description = approvalDescription;
+          approvalAttribute.by = accountAttribute;
+          approvalAttribute.type = ChangeKind.REWORK.toString();
+          approvalAttribute.grantedOn = approvalGrantedOn;
+
+          return new ApprovalAttribute[] {approvalAttribute};
+        };
+
+    event.author = () -> accountAttribute;
+    event.comment = commentDescription;
+
+    return event;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 6f7aa74..a65a455 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -38,6 +38,7 @@
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
@@ -96,7 +97,8 @@
       this.config =
           new FileBasedConfig(
               sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
-      this.multiSiteModule = new Module(new Configuration(config, new Config()));
+      this.multiSiteModule =
+          new Module(new Configuration(config, new Config()), new KafkaConfiguration(config), true);
     }
 
     @Override