Merge branch 'stable-2.16' into stable-3.0
* stable-2.16:
Broker publisher metric for number of message published
Use KafkaConfiguration for Kafka related properties and functionality
Change-Id: Idd9b90cbc446a1e8be4cd0f35878035bf32aa1fc
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 3226a42..7348137 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -18,28 +18,20 @@
import static com.google.common.base.Suppliers.ofInstance;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement.EnforcePolicy;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -72,7 +64,6 @@
private final Supplier<SharedRefDatabase> sharedRefDb;
private final Supplier<Collection<Message>> replicationConfigValidation;
private final Config multiSiteConfig;
- private final KafkaConfiguration kafkaConfig;
@Inject
Configuration(SitePaths sitePaths) {
@@ -83,7 +74,6 @@
public Configuration(Config multiSiteConfig, Config replicationConfig) {
Supplier<Config> lazyMultiSiteCfg = lazyLoad(multiSiteConfig);
this.multiSiteConfig = multiSiteConfig;
- this.kafkaConfig = new KafkaConfiguration(multiSiteConfig);
replicationConfigValidation = lazyValidateReplicatioConfig(replicationConfig);
cache = memoize(() -> new Cache(lazyMultiSiteCfg));
event = memoize(() -> new Event(lazyMultiSiteCfg));
@@ -99,14 +89,6 @@
return sharedRefDb.get();
}
- public Kafka getKafka() {
- return kafkaConfig.getKafka();
- }
-
- public KafkaPublisher kafkaPublisher() {
- return kafkaConfig.kafkaPublisher();
- }
-
public Cache cache() {
return cache.get();
}
@@ -119,10 +101,6 @@
return index.get();
}
- public KafkaSubscriber kafkaSubscriber() {
- return kafkaConfig.kafkaSubscriber();
- }
-
public Collection<Message> validate() {
return replicationConfigValidation.get();
}
@@ -182,179 +160,6 @@
}
}
- public static class Kafka {
- private final Map<EventFamily, String> eventTopics;
- private final String bootstrapServers;
-
- private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
- ImmutableMap.of(
- EventFamily.INDEX_EVENT,
- "GERRIT.EVENT.INDEX",
- EventFamily.STREAM_EVENT,
- "GERRIT.EVENT.STREAM",
- EventFamily.CACHE_EVENT,
- "GERRIT.EVENT.CACHE",
- EventFamily.PROJECT_LIST_EVENT,
- "GERRIT.EVENT.PROJECT.LIST");
-
- Kafka(Supplier<Config> config) {
- this.bootstrapServers =
- getString(
- config,
- KafkaConfiguration.KAFKA_SECTION,
- null,
- "bootstrapServers",
- KafkaConfiguration.DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
-
- this.eventTopics = new HashMap<>();
- for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
- String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
- eventTopics.put(
- topicDefault.getKey(),
- getString(
- config,
- KafkaConfiguration.KAFKA_SECTION,
- null,
- topicConfigKey,
- topicDefault.getValue()));
- }
- }
-
- public String getTopic(EventFamily eventType) {
- return eventTopics.get(eventType);
- }
-
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- private static String getString(
- Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.get().getString(section, subsection, name);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- return defaultValue;
- }
- }
-
- public static class KafkaPublisher extends Properties {
- private static final long serialVersionUID = 0L;
-
- public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
-
- public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
- public static final boolean DEFAULT_BROKER_ENABLED = false;
-
- private final boolean enabled;
- private final Map<EventFamily, Boolean> eventsEnabled;
-
- KafkaPublisher(Supplier<Config> cfg) {
- enabled =
- cfg.get()
- .getBoolean(
- KafkaConfiguration.KAFKA_SECTION,
- KAFKA_PUBLISHER_SUBSECTION,
- KafkaConfiguration.ENABLE_KEY,
- DEFAULT_BROKER_ENABLED);
-
- eventsEnabled = KafkaConfiguration.eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
-
- if (enabled) {
- setDefaults();
- KafkaConfiguration.applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
- }
- }
-
- private void setDefaults() {
- put("acks", "all");
- put("retries", 0);
- put("batch.size", 16384);
- put("linger.ms", 1);
- put("buffer.memory", 33554432);
- put("key.serializer", KAFKA_STRING_SERIALIZER);
- put("value.serializer", KAFKA_STRING_SERIALIZER);
- put("reconnect.backoff.ms", 5000L);
- }
-
- public boolean enabled() {
- return enabled;
- }
-
- public boolean enabledEvent(EventFamily eventType) {
- return eventsEnabled.get(eventType);
- }
- }
-
- public static class KafkaSubscriber extends Properties {
- private static final long serialVersionUID = 1L;
-
- static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
-
- private final boolean enabled;
- private final Integer pollingInterval;
- private Map<EventFamily, Boolean> eventsEnabled;
- private final Config cfg;
-
- public KafkaSubscriber(Supplier<Config> configSupplier) {
- this.cfg = configSupplier.get();
-
- this.pollingInterval =
- cfg.getInt(
- KafkaConfiguration.KAFKA_SECTION,
- KAFKA_SUBSCRIBER_SUBSECTION,
- "pollingIntervalMs",
- KafkaConfiguration.DEFAULT_POLLING_INTERVAL_MS);
-
- enabled =
- cfg.getBoolean(
- KafkaConfiguration.KAFKA_SECTION,
- KAFKA_SUBSCRIBER_SUBSECTION,
- KafkaConfiguration.ENABLE_KEY,
- false);
-
- eventsEnabled = KafkaConfiguration.eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
-
- if (enabled) {
- KafkaConfiguration.applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
- }
- }
-
- public boolean enabled() {
- return enabled;
- }
-
- public boolean enabledEvent(EventFamily eventFamily) {
- return eventsEnabled.get(eventFamily);
- }
-
- public Properties initPropsWith(UUID instanceId) {
- String groupId =
- getString(
- cfg,
- KafkaConfiguration.KAFKA_SECTION,
- KAFKA_SUBSCRIBER_SUBSECTION,
- "groupId",
- instanceId.toString());
- this.put("group.id", groupId);
-
- return this;
- }
-
- public Integer getPollingInterval() {
- return pollingInterval;
- }
-
- private String getString(
- Config cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.getString(section, subsection, name);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- return defaultValue;
- }
- }
-
public static class SharedRefDatabase {
public static final String SECTION = "ref-database";
public static final String SUBSECTION_ENFORCEMENT_RULES = "enforcementRules";
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
index 3376b49..b88699a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
@@ -21,34 +21,44 @@
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
-import com.googlesource.gerrit.plugins.multisite.Configuration.Kafka;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.google.common.collect.ImmutableMap;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Singleton
public class KafkaConfiguration {
- private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
- static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+ private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
+ public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
static final String KAFKA_SECTION = "kafka";
static final String ENABLE_KEY = "enabled";
static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
static final boolean DEFAULT_ENABLE_PROCESSING = true;
- static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
+ private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
private final Supplier<KafkaSubscriber> subscriber;
private final Supplier<Kafka> kafka;
private final Supplier<KafkaPublisher> publisher;
+ @Inject
+ KafkaConfiguration(SitePaths sitePaths) {
+ this(getConfigFile(sitePaths, Configuration.MULTI_SITE_CONFIG));
+ }
+
@VisibleForTesting
public KafkaConfiguration(Config kafkaConfig) {
Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
@@ -57,6 +67,10 @@
subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
}
+ private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
+ return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
+ }
+
public Kafka getKafka() {
return kafka.get();
}
@@ -65,7 +79,7 @@
return subscriber.get();
}
- static void applyKafkaConfig(
+ private static void applyKafkaConfig(
Supplier<Config> configSupplier, String subsectionName, Properties target) {
Config config = configSupplier.get();
for (String section : config.getSubsections(KAFKA_SECTION)) {
@@ -103,7 +117,8 @@
return defaultValue;
}
- static Map<EventFamily, Boolean> eventsEnabled(Supplier<Config> config, String subsection) {
+ private static Map<EventFamily, Boolean> eventsEnabled(
+ Supplier<Config> config, String subsection) {
Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
for (EventFamily eventFamily : EventFamily.values()) {
String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
@@ -112,11 +127,7 @@
eventFamily,
config
.get()
- .getBoolean(
- KafkaConfiguration.KAFKA_SECTION,
- subsection,
- enabledConfigKey,
- KafkaConfiguration.DEFAULT_ENABLE_PROCESSING));
+ .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
}
return eventsEnabled;
}
@@ -142,4 +153,156 @@
}
return ofInstance(config);
}
+
+ public static class Kafka {
+ private final Map<EventFamily, String> eventTopics;
+ private final String bootstrapServers;
+
+ private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
+ ImmutableMap.of(
+ EventFamily.INDEX_EVENT,
+ "GERRIT.EVENT.INDEX",
+ EventFamily.STREAM_EVENT,
+ "GERRIT.EVENT.STREAM",
+ EventFamily.CACHE_EVENT,
+ "GERRIT.EVENT.CACHE",
+ EventFamily.PROJECT_LIST_EVENT,
+ "GERRIT.EVENT.PROJECT.LIST");
+
+ Kafka(Supplier<Config> config) {
+ this.bootstrapServers =
+ getString(
+ config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
+
+ this.eventTopics = new HashMap<>();
+ for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
+ String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
+ eventTopics.put(
+ topicDefault.getKey(),
+ getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+ }
+ }
+
+ public String getTopic(EventFamily eventType) {
+ return eventTopics.get(eventType);
+ }
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ private static String getString(
+ Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+ String value = cfg.get().getString(section, subsection, name);
+ if (!Strings.isNullOrEmpty(value)) {
+ return value;
+ }
+ return defaultValue;
+ }
+ }
+
+ public static class KafkaPublisher extends Properties {
+ private static final long serialVersionUID = 0L;
+
+ public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
+
+ public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
+ public static final boolean DEFAULT_BROKER_ENABLED = false;
+
+ private final boolean enabled;
+ private final Map<EventFamily, Boolean> eventsEnabled;
+
+ private KafkaPublisher(Supplier<Config> cfg) {
+ enabled =
+ cfg.get()
+ .getBoolean(
+ KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+
+ eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
+
+ if (enabled) {
+ setDefaults();
+ applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
+ }
+ }
+
+ private void setDefaults() {
+ put("acks", "all");
+ put("retries", 0);
+ put("batch.size", 16384);
+ put("linger.ms", 1);
+ put("buffer.memory", 33554432);
+ put("key.serializer", KAFKA_STRING_SERIALIZER);
+ put("value.serializer", KAFKA_STRING_SERIALIZER);
+ put("reconnect.backoff.ms", 5000L);
+ }
+
+ public boolean enabled() {
+ return enabled;
+ }
+
+ public boolean enabledEvent(EventFamily eventType) {
+ return eventsEnabled.get(eventType);
+ }
+ }
+
+ public static class KafkaSubscriber extends Properties {
+ private static final long serialVersionUID = 1L;
+
+ static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
+
+ private final boolean enabled;
+ private final Integer pollingInterval;
+ private Map<EventFamily, Boolean> eventsEnabled;
+ private final Config cfg;
+
+ public KafkaSubscriber(Supplier<Config> configSupplier) {
+ this.cfg = configSupplier.get();
+
+ this.pollingInterval =
+ cfg.getInt(
+ KAFKA_SECTION,
+ KAFKA_SUBSCRIBER_SUBSECTION,
+ "pollingIntervalMs",
+ DEFAULT_POLLING_INTERVAL_MS);
+
+ enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+
+ eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
+
+ if (enabled) {
+ applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
+ }
+ }
+
+ public boolean enabled() {
+ return enabled;
+ }
+
+ public boolean enabledEvent(EventFamily eventFamily) {
+ return eventsEnabled.get(eventFamily);
+ }
+
+ public Properties initPropsWith(UUID instanceId) {
+ String groupId =
+ getString(
+ cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
+ this.put("group.id", groupId);
+
+ return this;
+ }
+
+ public Integer getPollingInterval() {
+ return pollingInterval;
+ }
+
+ private String getString(
+ Config cfg, String section, String subsection, String name, String defaultValue) {
+ String value = cfg.getString(section, subsection, name);
+ if (!Strings.isNullOrEmpty(value)) {
+ return value;
+ }
+ return defaultValue;
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 620d658..c9d7c4b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
@@ -47,11 +48,31 @@
public class Module extends LifecycleModule {
private static final Logger log = LoggerFactory.getLogger(Module.class);
- private final Configuration config;
+ private Configuration config;
+ private final boolean disableGitRepositoryValidation;
+ private KafkaConfiguration kafkaConfig;
@Inject
- public Module(Configuration config) {
+ public Module(Configuration config, KafkaConfiguration kafkaConfig) {
+ this(config, kafkaConfig, false);
+ }
+
+ // TODO: It is not possible to properly test the libModules in Gerrit.
+ // Disable the Git repository validation during integration test and then build the necessary
+ // support
+ // in Gerrit for it.
+ @VisibleForTesting
+ public Module(
+ Configuration config,
+ KafkaConfiguration kafkaConfig,
+ boolean disableGitRepositoryValidation) {
+ init(config, kafkaConfig);
+ this.disableGitRepositoryValidation = disableGitRepositoryValidation;
+ }
+
+ private void init(Configuration config, KafkaConfiguration kafkaConfig) {
this.config = config;
+ this.kafkaConfig = kafkaConfig;
}
@Override
@@ -77,12 +98,12 @@
install(new IndexModule());
}
- if (config.kafkaSubscriber().enabled()) {
- install(new KafkaConsumerModule(config.kafkaSubscriber()));
+ if (kafkaConfig.kafkaSubscriber().enabled()) {
+ install(new KafkaConsumerModule(kafkaConfig.kafkaSubscriber()));
install(new ForwardedEventRouterModule());
}
- if (config.kafkaPublisher().enabled()) {
- install(new BrokerForwarderModule(config.kafkaPublisher()));
+ if (kafkaConfig.kafkaPublisher().enabled()) {
+ install(new BrokerForwarderModule(kafkaConfig.kafkaPublisher()));
}
if (config.getSharedRefDb().isEnabled()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java
new file mode 100644
index 0000000..f6be65a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class BrokerMetrics {
+ private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_counter";
+ private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
+
+ private final Counter1<String> brokerPublisherSuccessCounter;
+ private final Counter1<String> brokerPublisherFailureCounter;
+
+ @Inject
+ public BrokerMetrics(MetricMaker metricMaker) {
+
+ this.brokerPublisherSuccessCounter =
+ metricMaker.newCounter(
+ "multi_site/broker/broker_message_publisher_counter",
+ new Description("Number of messages published by the broker publisher")
+ .setRate()
+ .setUnit("messages"),
+ Field.ofString(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+ this.brokerPublisherFailureCounter =
+ metricMaker.newCounter(
+ "multi_site/broker/broker_message_publisher_failure_counter",
+ new Description("Number of messages failed to publish by the broker publisher")
+ .setRate()
+ .setUnit("errors"),
+ Field.ofString(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+ }
+
+ public void incrementBrokerPublishedMessage() {
+ brokerPublisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+ }
+
+ public void incrementBrokerFailedToPublishMessage() {
+ brokerPublisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
index 4dca9e9..62716e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
@@ -39,17 +39,20 @@
private final Gson gson;
private final UUID instanceId;
private final MessageLogger msgLog;
+ private final BrokerMetrics brokerMetrics;
@Inject
public BrokerPublisher(
BrokerSession session,
@BrokerGson Gson gson,
@InstanceId UUID instanceId,
- MessageLogger msgLog) {
+ MessageLogger msgLog,
+ BrokerMetrics brokerMetrics) {
this.session = session;
this.gson = gson;
this.instanceId = instanceId;
this.msgLog = msgLog;
+ this.brokerMetrics = brokerMetrics;
}
@Override
@@ -73,7 +76,13 @@
SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
msgLog.log(Direction.PUBLISH, brokerEvent);
- return session.publishEvent(eventType, getPayload(brokerEvent));
+ Boolean eventPublished = session.publishEvent(eventType, getPayload(brokerEvent));
+ if (eventPublished) {
+ brokerMetrics.incrementBrokerPublishedMessage();
+ } else {
+ brokerMetrics.incrementBrokerFailedToPublishMessage();
+ }
+ return eventPublished;
}
private String getPayload(SourceAwareEventWrapper event) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 6c3c3cf..7c83346 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -15,8 +15,8 @@
package com.googlesource.gerrit.plugins.multisite.broker.kafka;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import java.util.UUID;
@@ -31,13 +31,13 @@
public class KafkaSession implements BrokerSession {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
- private final Configuration properties;
+ private KafkaConfiguration properties;
private final UUID instanceId;
private volatile Producer<String, String> producer;
@Inject
- public KafkaSession(Configuration configuration, @InstanceId UUID instanceId) {
- this.properties = configuration;
+ public KafkaSession(KafkaConfiguration kafkaConfig, @InstanceId UUID instanceId) {
+ this.properties = kafkaConfig;
this.instanceId = instanceId;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 7c6bef3..b7ee20b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -16,7 +16,7 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index e6c73f1..4116c06 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -23,8 +23,8 @@
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
@@ -52,12 +52,12 @@
private final UUID instanceId;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
- private final Configuration configuration;
+ private final KafkaConfiguration configuration;
private final OneOffRequestContext oneOffCtx;
private final MessageLogger msgLog;
public AbstractKafkaSubcriber(
- Configuration configuration,
+ KafkaConfiguration configuration,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
ForwardedEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
index 8e724be..12b2790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
@@ -19,8 +19,8 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
public class CacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public CacheEvictionEventSubscriber(
- Configuration configuration,
+ KafkaConfiguration configuration,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
StreamEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index e52b624..9020486 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -19,8 +19,8 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
public class IndexEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public IndexEventSubscriber(
- Configuration configuration,
+ KafkaConfiguration configuration,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
IndexEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 5bc7669..3e0e5d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -17,7 +17,7 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
import java.util.concurrent.Executor;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index a949cd2..9e5db3a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -19,8 +19,8 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public ProjectUpdateEventSubscriber(
- Configuration configuration,
+ KafkaConfiguration configuration,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
ProjectListUpdateRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index d43a3c3..66070b6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -19,8 +19,8 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -32,7 +32,7 @@
public class StreamEventSubscriber extends AbstractKafkaSubcriber {
@Inject
public StreamEventSubscriber(
- Configuration configuration,
+ KafkaConfiguration configuration,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
StreamEventRouter eventRouter,
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 5c5d75a..83082b2 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -86,3 +86,16 @@
For further information and supported options, refer to [config](config.md)
documentation.
+
+## Metrics
+
+@PLUGIN@ plugin exposes following metrics:
+
+### Broker message publisher
+* Broker message published count
+
+`metric=multi_site/broker/broker_message_publisher_counter/broker_msg_publisher_counter, type=com.codahale.metrics.Meter`
+
+* Broker failed to publish message count
+
+`metric=multi_site/broker/broker_message_publisher_failure_counter/broker_msg_publisher_failure_counter, type=com.codahale.metrics.Meter`
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index ec3a431..0863f55 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -18,16 +18,11 @@
import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.CACHE_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.PATTERN_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_THREAD_POOL_SIZE;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
import com.google.common.collect.ImmutableList;
import org.eclipse.jgit.lib.Config;
@@ -129,84 +124,6 @@
}
@Test
- public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
- assertThat(property.equals(kafkaPropertyValue)).isTrue();
- }
-
- @Test
- public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
- public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
- final String kafkaPropertyName = "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
- public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
- assertThat(property.equals(kafkaPropertyValue)).isTrue();
- }
-
- @Test
- public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
- final String kafkaPropertyName = "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
- public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
public void shouldReturnValidationErrorsWhenReplicationOnStartupIsEnabled() throws Exception {
Config replicationConfig = new Config();
replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
new file mode 100644
index 0000000..ab0d3bc
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
@@ -0,0 +1,121 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConfigurationTest {
+
+ private Config globalPluginConfig;
+
+ @Before
+ public void setUp() {
+ globalPluginConfig = new Config();
+ }
+
+ private KafkaConfiguration getConfiguration() {
+ return new KafkaConfiguration(globalPluginConfig);
+ }
+
+ @Test
+ public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
+ final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+ final String kafkaPropertyValue = "aValue";
+ globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
+ globalPluginConfig.setString(
+ KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+ final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+ assertThat(property.equals(kafkaPropertyValue)).isTrue();
+ }
+
+ @Test
+ public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
+ final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+ final String kafkaPropertyValue = "aValue";
+ globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+ globalPluginConfig.setString(
+ KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+ final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+ assertThat(property).isNull();
+ }
+
+ @Test
+ public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
+ final String kafkaPropertyName = "fooBarBaz";
+ final String kafkaPropertyValue = "aValue";
+ globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
+ globalPluginConfig.setString(
+ KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+ final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+ assertThat(property).isNull();
+ }
+
+ @Test
+ public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
+ final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+ final String kafkaPropertyValue = "aValue";
+ globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
+ globalPluginConfig.setString(
+ KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+ final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+ assertThat(property.equals(kafkaPropertyValue)).isTrue();
+ }
+
+ @Test
+ public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
+ final String kafkaPropertyName = "fooBarBaz";
+ final String kafkaPropertyValue = "aValue";
+ globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
+ globalPluginConfig.setString(
+ KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+ final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+ assertThat(property).isNull();
+ }
+
+ @Test
+ public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
+ final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+ final String kafkaPropertyValue = "aValue";
+ globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
+ globalPluginConfig.setString(
+ KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+ final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+ assertThat(property).isNull();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 5281371..556917b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -36,13 +36,16 @@
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Configuration configMock;
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private KafkaConfiguration kafkaConfig;
+
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private Module module;
@Before
public void setUp() {
- module = new Module(configMock);
+ module = new Module(configMock, kafkaConfig);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index 8131a3f..36408ba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -15,6 +15,10 @@
package com.googlesource.gerrit.plugins.multisite.broker.kafka;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.gerrit.extensions.client.ChangeKind;
import com.google.gerrit.reviewdb.client.Account;
@@ -23,12 +27,14 @@
import com.google.gerrit.server.data.AccountAttribute;
import com.google.gerrit.server.data.ApprovalAttribute;
import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.googlesource.gerrit.plugins.multisite.DisabledMessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
@@ -36,65 +42,44 @@
import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+@RunWith(MockitoJUnitRunner.class)
public class BrokerPublisherTest {
+
+ @Mock private BrokerMetrics brokerMetrics;
+ @Mock private BrokerSession session;
private BrokerPublisher publisher;
+
private MessageLogger NO_MSG_LOG = new DisabledMessageLogger();
private Gson gson = new GsonProvider().get();
+ private String accountName = "Foo Bar";
+ private String accountEmail = "foo@bar.com";
+ private String accountUsername = "foobar";
+ private String approvalType = ChangeKind.REWORK.toString();
+
+ private String approvalDescription = "ApprovalDescription";
+ private String approvalValue = "+2";
+ private String oldApprovalValue = "+1";
+ private Long approvalGrantedOn = 123L;
+ private String commentDescription = "Patch Set 1: Code-Review+2";
+ private String projectName = "project";
+ private String refName = "refs/heads/master";
+ private String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
+ private Long eventCreatedOn = 123L;
+
@Before
public void setUp() {
- publisher = new BrokerPublisher(new TestBrokerSession(), gson, UUID.randomUUID(), NO_MSG_LOG);
+ publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), NO_MSG_LOG, brokerMetrics);
}
@Test
public void shouldSerializeCommentAddedEvent() {
- final String accountName = "Foo Bar";
- final String accountEmail = "foo@bar.com";
- final String accountUsername = "foobar";
- final String approvalType = ChangeKind.REWORK.toString();
-
- final String approvalDescription = "ApprovalDescription";
- final String approvalValue = "+2";
- final String oldApprovalValue = "+1";
- final Long approvalGrantedOn = 123L;
- final String commentDescription = "Patch Set 1: Code-Review+2";
- final String projectName = "project";
- final String refName = "refs/heads/master";
- final String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
- final Long eventCreatedOn = 123L;
-
- final Change change =
- new Change(
- new Change.Key(changeId),
- new Change.Id(1),
- new Account.Id(1),
- new Branch.NameKey(projectName, refName),
- TimeUtil.nowTs());
-
- CommentAddedEvent event = new CommentAddedEvent(change);
- AccountAttribute accountAttribute = new AccountAttribute();
- accountAttribute.email = accountEmail;
- accountAttribute.name = accountName;
- accountAttribute.username = accountUsername;
-
- event.eventCreatedOn = eventCreatedOn;
- event.approvals =
- () -> {
- ApprovalAttribute approvalAttribute = new ApprovalAttribute();
- approvalAttribute.value = approvalValue;
- approvalAttribute.oldValue = oldApprovalValue;
- approvalAttribute.description = approvalDescription;
- approvalAttribute.by = accountAttribute;
- approvalAttribute.type = ChangeKind.REWORK.toString();
- approvalAttribute.grantedOn = approvalGrantedOn;
-
- return new ApprovalAttribute[] {approvalAttribute};
- };
-
- event.author = () -> accountAttribute;
- event.comment = commentDescription;
+ Event event = createSampleEvent();
String expectedSerializedCommentEvent =
"{\"author\": {\"name\": \""
@@ -140,22 +125,55 @@
assertThat(publisher.eventToJson(event).equals(expectedCommentEventJsonObject)).isTrue();
}
- private static class TestBrokerSession implements BrokerSession {
+ @Test
+ public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+ Event event = createSampleEvent();
+ when(session.publishEvent(any(), any())).thenReturn(true);
+ publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+ verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+ }
- @Override
- public boolean isOpen() {
- return false;
- }
+ @Test
+ public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
+ Event event = createSampleEvent();
+ when(session.publishEvent(any(), any())).thenReturn(false);
- @Override
- public void connect() {}
+ publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+ verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+ }
- @Override
- public void disconnect() {}
+ private Event createSampleEvent() {
+ final Change change =
+ new Change(
+ new Change.Key(changeId),
+ new Change.Id(1),
+ new Account.Id(1),
+ new Branch.NameKey(projectName, refName),
+ TimeUtil.nowTs());
- @Override
- public boolean publishEvent(EventFamily eventFamily, String payload) {
- return false;
- }
+ CommentAddedEvent event = new CommentAddedEvent(change);
+ AccountAttribute accountAttribute = new AccountAttribute();
+ accountAttribute.email = accountEmail;
+ accountAttribute.name = accountName;
+ accountAttribute.username = accountUsername;
+
+ event.eventCreatedOn = eventCreatedOn;
+ event.approvals =
+ () -> {
+ ApprovalAttribute approvalAttribute = new ApprovalAttribute();
+ approvalAttribute.value = approvalValue;
+ approvalAttribute.oldValue = oldApprovalValue;
+ approvalAttribute.description = approvalDescription;
+ approvalAttribute.by = accountAttribute;
+ approvalAttribute.type = ChangeKind.REWORK.toString();
+ approvalAttribute.grantedOn = approvalGrantedOn;
+
+ return new ApprovalAttribute[] {approvalAttribute};
+ };
+
+ event.author = () -> accountAttribute;
+ event.comment = commentDescription;
+
+ return event;
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 6f7aa74..a65a455 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -38,6 +38,7 @@
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.Module;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
@@ -96,7 +97,8 @@
this.config =
new FileBasedConfig(
sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
- this.multiSiteModule = new Module(new Configuration(config, new Config()));
+ this.multiSiteModule =
+ new Module(new Configuration(config, new Config()), new KafkaConfiguration(config), true);
}
@Override