Remove producer/subscriber independent disabling properties
Producer/subscriber properties for disabling event management are not
needed anymore. Also disabling publisher/subscriber independently is not
bringing any value so it can be removed.
Feature: Issue 10829
Change-Id: Ic002407c8eb195e27cf15022aac7519b565fefbd
diff --git a/README.md b/README.md
index 0f526a9..75fd07d 100644
--- a/README.md
+++ b/README.md
@@ -89,12 +89,6 @@
[kafka]
bootstrapServers = <kafka-host>:<kafka-port>
-[kafka "publisher"]
- enabled = true
-
-[kafka "subscriber"]
- enabled = true
-
[ref-database]
enabled = true
diff --git a/dockerised_local_env/gerrit-common/multi-site.config b/dockerised_local_env/gerrit-common/multi-site.config
index 04b9c2c..deec00f 100644
--- a/dockerised_local_env/gerrit-common/multi-site.config
+++ b/dockerised_local_env/gerrit-common/multi-site.config
@@ -12,14 +12,10 @@
cacheEventTopic = gerrit_cache_eviction
[kafka "subscriber"]
- enabled = true
pollingIntervalMs = 1000
KafkaProp-enableAutoCommit = true
KafkaProp-autoCommitIntervalMs = 1000
KafkaProp-autoOffsetReset = latest
-[kafka "publisher"]
- enabled = true
-
[ref-database "zookeeper"]
connectString = "zookeeper:2181"
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index ed9e6ad..5287f61 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -9,12 +9,9 @@
projectListEventTopic = gerrit_list_project
cacheEventTopic = gerrit_cache_eviction
[kafka "subscriber"]
- enabled = true
pollingIntervalMs = 1000
KafkaProp-enableAutoCommit = true
KafkaProp-autoCommitIntervalMs = 1000
KafkaProp-autoOffsetReset = latest
-[kafka "publisher"]
- enabled = true
[ref-database "zookeeper"]
connectString = localhost:$ZK_PORT
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index ca01471..14c6be6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -49,7 +49,6 @@
static final String INSTANCE_ID_FILE = "instanceId.data";
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
static final int DEFAULT_THREAD_POOL_SIZE = 4;
- static final String ENABLE_KEY = "enabled";
private static final String REPLICATION_CONFIG = "replication.config";
// common parameters to cache and index sections
@@ -162,6 +161,7 @@
public static class SharedRefDatabase {
public static final String SECTION = "ref-database";
+ public static final String ENABLE_KEY = "enabled";
public static final String SUBSECTION_ENFORCEMENT_RULES = "enforcementRules";
private final boolean enabled;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 7d42acc..371abae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -36,10 +36,6 @@
return aliasKey + "Topic";
}
- public String enabledKey() {
- return aliasKey + "Enabled";
- }
-
public static EventTopic of(String topicString) {
EventTopic[] topics = EventTopic.values();
for (EventTopic topic : topics) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
index 5fdb07e..665ad49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -16,7 +16,6 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
@@ -35,58 +34,30 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
public class KafkaBrokerModule extends LifecycleModule {
- private KafkaConfiguration config;
-
- @Inject
- public KafkaBrokerModule(KafkaConfiguration config) {
- this.config = config;
- }
@Override
protected void configure() {
- if (config.kafkaSubscriber().enabled()) {
- bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
- bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
- .to(KafkaEventDeserializer.class);
+ bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+ bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+ .to(KafkaEventDeserializer.class);
- if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
- }
- }
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
- if (config.kafkaPublisher().enabled()) {
- listener().to(BrokerPublisher.class);
- bind(BrokerSession.class).to(KafkaSession.class);
+ listener().to(BrokerPublisher.class);
+ bind(BrokerSession.class).to(KafkaSession.class);
- if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), CacheEvictionForwarder.class)
- .to(BrokerCacheEvictionForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
- .to(BrokerProjectListUpdateForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
- }
- }
+ DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+ DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
+ DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+ .to(BrokerProjectListUpdateForwarder.class);
+ DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index dee7382..b6c01f3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -42,9 +42,7 @@
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
static final String KAFKA_SECTION = "kafka";
- static final String ENABLE_KEY = "enabled";
private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
- private static final boolean DEFAULT_ENABLE_PROCESSING = true;
private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
private final Supplier<KafkaSubscriber> subscriber;
@@ -105,20 +103,6 @@
return defaultValue;
}
- private static Map<EventTopic, Boolean> eventsEnabled(
- Supplier<Config> config, String subsection) {
- Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
- for (EventTopic topic : EventTopic.values()) {
- eventsEnabled.put(
- topic,
- config
- .get()
- .getBoolean(
- KAFKA_SECTION, subsection, topic.enabledKey(), DEFAULT_ENABLE_PROCESSING));
- }
- return eventsEnabled;
- }
-
public KafkaPublisher kafkaPublisher() {
return publisher.get();
}
@@ -180,25 +164,11 @@
private static final long serialVersionUID = 0L;
public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
-
public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
- public static final boolean DEFAULT_BROKER_ENABLED = false;
- private final boolean enabled;
- private final Map<EventTopic, Boolean> eventsEnabled;
-
- private KafkaPublisher(Supplier<Config> cfg) {
- enabled =
- cfg.get()
- .getBoolean(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
-
- eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
-
- if (enabled) {
- setDefaults();
- applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
- }
+ private KafkaPublisher(Supplier<Config> kafkaConfig) {
+ setDefaults();
+ applyKafkaConfig(kafkaConfig, KAFKA_PUBLISHER_SUBSECTION, this);
}
private void setDefaults() {
@@ -211,14 +181,6 @@
put("value.serializer", KAFKA_STRING_SERIALIZER);
put("reconnect.backoff.ms", 5000L);
}
-
- public boolean enabled() {
- return enabled;
- }
-
- public boolean enabledEvent(EventTopic eventType) {
- return eventsEnabled.get(eventType);
- }
}
public static class KafkaSubscriber extends Properties {
@@ -226,13 +188,11 @@
static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
- private final boolean enabled;
private final Integer pollingInterval;
- private Map<EventTopic, Boolean> eventsEnabled;
private final Config cfg;
- public KafkaSubscriber(Supplier<Config> configSupplier) {
- this.cfg = configSupplier.get();
+ public KafkaSubscriber(Supplier<Config> kafkaCfg) {
+ this.cfg = kafkaCfg.get();
this.pollingInterval =
cfg.getInt(
@@ -241,21 +201,7 @@
"pollingIntervalMs",
DEFAULT_POLLING_INTERVAL_MS);
- enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
-
- eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
-
- if (enabled) {
- applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
- }
- }
-
- public boolean enabled() {
- return enabled;
- }
-
- public boolean enabledEvent(EventTopic topic) {
- return eventsEnabled.get(topic);
+ applyKafkaConfig(kafkaCfg, KAFKA_SUBSCRIBER_SUBSECTION, this);
}
public Properties initPropsWith(UUID instanceId) {
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 204742e..c404ea1 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -72,18 +72,15 @@
eventTopic = gerrit_index
[kafka "publisher"]
- enable = true
indexEventTopic = gerrit_index
streamEventTopic = gerrit_stream
cacheEvictionEventTopic = gerrit_cache_eviction
[kafka "subscriber"]
- enable = true
pollingIntervalMs = 1000
autoCommitIntervalMs = 1000
```
-
For further information and supported options, refer to [config](config.md)
documentation.
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 227d0e8..b44d1dd 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -21,29 +21,16 @@
projectListEventTopic = gerrit_project_list
[kafka "publisher"]
- enabled = true
-
- indexEventEnabled = true
- cacheEventEnabled = true
- projectListEventEnabled = true
- streamEventEnabled = true
-
KafkaProp-compressionType = none
KafkaProp-deliveryTimeoutMs = 60000
[kafka "subscriber"]
- enabled = true
pollingIntervalMs = 1000
KafkaProp-enableAutoCommit = true
KafkaProp-autoCommitIntervalMs = 1000
KafkaProp-autoCommitIntervalMs = 5000
- indexEventEnabled = true
- cacheEventEnabled = true
- projectListEventEnabled = true
- streamEventEnabled = true
-
[ref-database "zookeeper"]
connectString = "localhost:2181"
rootNode = "/gerrit/multi-site"
@@ -126,58 +113,6 @@
: Name of the Kafka topic to use for publishing cache eviction events
Defaults to GERRIT.EVENT.PROJECT.LIST
-```kafka.publisher.indexEventEnabled```
-: Enable publication of index events, ignored when `kafka.publisher.enabled`
- is false
-
- Defaults: true
-
-```kafka.publisher.cacheEventEnabled```
-: Enable publication of cache events, ignored when `kafka.publisher.enabled`
- is false
-
- Defaults: true
-
-```kafka.publisher.projectListEventEnabled```
-: Enable publication of project list events, ignored when `kafka.publisher.enabled`
- is false
-
- Defaults: true
-
-```kafka.publisher.streamEventEnabled```
-: Enable publication of stream events, ignored when `kafka.publisher.enabled`
- is false
-
- Defaults: true
-
-```kafka.subscriber.enabled```
-: Enable consuming of events from Kafka
- Defaults: false
-
-```kafka.subscriber.indexEventEnabled```
-: Enable consumption of index events, ignored when `kafka.subscriber.enabled`
- is false
-
- Defaults: true
-
-```kafka.subscriber.cacheEventEnabled```
-: Enable consumption of cache events, ignored when `kafka.subscriber.enabled`
- is false
-
- Defaults: true
-
-```kafka.subscriber.projectListEventEnabled```
-: Enable consumption of project list events, ignored when `kafka.subscriber.enabled`
- is false
-
- Defaults: true
-
-```kafka.subscriber.streamEventEnabled```
-: Enable consumption of stream events, ignored when `kafka.subscriber.enabled`
- is false
-
- Defaults: true
-
```kafka.subscriber.pollingIntervalMs```
: Polling interval in milliseconds for checking incoming events
@@ -290,11 +225,12 @@
For example, if you want to set the `auto.commit.interval.ms` property for
consumers, you need to configure this property as `KafkaProp-autoCommitIntervalMs`.
-**NOTE**: custom Kafka properties will be ignored when the relevant subsection is
-disabled (i.e. `kafka.subscriber.enabled` and/or `kafka.publisher.enabled` are
-set to `false`).
-
The complete list of available settings can be found directly in the kafka website:
* **Publisher**: https://kafka.apache.org/documentation/#producerconfigs
* **Subscriber**: https://kafka.apache.org/documentation/#consumerconfigs
+
+#### Notes:
+* From version 3.0 publisher and subscribers cannot be disabled independently anymore.
+* From version 3.0 disabling the management of certain cache invalidations or re-indexing
+is not available anymore.
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index 84e1dd8..6d8c60e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -15,8 +15,6 @@
package com.googlesource.gerrit.plugins.multisite.kafka;
import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.ENABLE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
@@ -46,36 +44,9 @@
}
@Test
- public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
- assertThat(property.equals(kafkaPropertyValue)).isTrue();
- }
-
- @Test
- public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
final String kafkaPropertyName = "fooBarBaz";
final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
globalPluginConfig.setString(
KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
@@ -85,36 +56,9 @@
}
@Test
- public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
- assertThat(property.equals(kafkaPropertyValue)).isTrue();
- }
-
- @Test
public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
final String kafkaPropertyName = "fooBarBaz";
final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
- globalPluginConfig.setString(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
- public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
- final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
globalPluginConfig.setString(
KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
@@ -157,44 +101,7 @@
assertThat(property).isEqualTo("gerrit_cache");
}
- @Test
- public void shouldReturnKafkaTopicEnabledForCacheEventTopic() {
- setKafkaTopicEnabled("cacheEventEnabled", false);
- final Boolean property =
- getConfiguration().kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC);
- assertThat(property).isFalse();
- }
-
- @Test
- public void shouldReturnKafkaTopicEnabledForIndexTopic() {
- setKafkaTopicEnabled("indexEventEnabled", false);
- final Boolean property =
- getConfiguration().kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC);
- assertThat(property).isFalse();
- }
-
- @Test
- public void shouldReturnKafkaTopicEnabledForStreamEventTopic() {
- setKafkaTopicEnabled("streamEventEnabled", false);
- final Boolean property =
- getConfiguration().kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC);
- assertThat(property).isFalse();
- }
-
- @Test
- public void shouldReturnKafkaTopicEnabledForProjectListEventTopic() {
- setKafkaTopicEnabled("projectListEventEnabled", false);
- final Boolean property =
- getConfiguration().kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC);
- assertThat(property).isFalse();
- }
-
private void setKafkaTopicAlias(String topicKey, String topic) {
globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
}
-
- private void setKafkaTopicEnabled(String topicEnabledKey, Boolean isEnabled) {
- globalPluginConfig.setBoolean(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, topicEnabledKey, isEnabled);
- }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 58ac211..4dc915d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -56,7 +56,6 @@
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
import java.io.IOException;
import java.util.ArrayList;
@@ -125,8 +124,6 @@
this.config =
new FileBasedConfig(
sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
- config.setBoolean("kafka", "publisher", "enabled", true);
- config.setBoolean("kafka", "subscriber", "enabled", true);
config.setBoolean("ref-database", null, "enabled", false);
config.save();
@@ -141,7 +138,7 @@
new PluginModule(
multiSiteConfig,
new ZkValidationModule(zookeeperConfig),
- new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
+ new KafkaBrokerModule());
this.gitModule = new GitModule(multiSiteConfig);
}