Remove producer/subscriber independent disabling properties

Producer/subscriber properties for disabling event management are not
needed anymore. Also disabling publisher/subscriber independently is not
bringing any value so it can be removed.

Feature: Issue 10829
Change-Id: Ic002407c8eb195e27cf15022aac7519b565fefbd
diff --git a/README.md b/README.md
index 0f526a9..75fd07d 100644
--- a/README.md
+++ b/README.md
@@ -89,12 +89,6 @@
 [kafka]
   bootstrapServers = <kafka-host>:<kafka-port>
 
-[kafka "publisher"]
-  enabled = true
-
-[kafka "subscriber"]
-  enabled = true
-
 [ref-database]
   enabled = true
 
diff --git a/dockerised_local_env/gerrit-common/multi-site.config b/dockerised_local_env/gerrit-common/multi-site.config
index 04b9c2c..deec00f 100644
--- a/dockerised_local_env/gerrit-common/multi-site.config
+++ b/dockerised_local_env/gerrit-common/multi-site.config
@@ -12,14 +12,10 @@
 	cacheEventTopic = gerrit_cache_eviction
 
 [kafka "subscriber"]
-	enabled = true
 	pollingIntervalMs = 1000
 	KafkaProp-enableAutoCommit = true
 	KafkaProp-autoCommitIntervalMs = 1000
 	KafkaProp-autoOffsetReset = latest
 
-[kafka "publisher"]
-	enabled = true
-
 [ref-database "zookeeper"]
 	connectString = "zookeeper:2181"
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index ed9e6ad..5287f61 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -9,12 +9,9 @@
 	projectListEventTopic = gerrit_list_project
 	cacheEventTopic = gerrit_cache_eviction
 [kafka "subscriber"]
-	enabled = true
 	pollingIntervalMs = 1000
 	KafkaProp-enableAutoCommit = true
 	KafkaProp-autoCommitIntervalMs = 1000
 	KafkaProp-autoOffsetReset = latest
-[kafka "publisher"]
-	enabled = true
 [ref-database "zookeeper"]
 	connectString = localhost:$ZK_PORT
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index ca01471..14c6be6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -49,7 +49,6 @@
   static final String INSTANCE_ID_FILE = "instanceId.data";
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
-  static final String ENABLE_KEY = "enabled";
 
   private static final String REPLICATION_CONFIG = "replication.config";
   // common parameters to cache and index sections
@@ -162,6 +161,7 @@
 
   public static class SharedRefDatabase {
     public static final String SECTION = "ref-database";
+    public static final String ENABLE_KEY = "enabled";
     public static final String SUBSECTION_ENFORCEMENT_RULES = "enforcementRules";
 
     private final boolean enabled;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 7d42acc..371abae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -36,10 +36,6 @@
     return aliasKey + "Topic";
   }
 
-  public String enabledKey() {
-    return aliasKey + "Enabled";
-  }
-
   public static EventTopic of(String topicString) {
     EventTopic[] topics = EventTopic.values();
     for (EventTopic topic : topics) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
index 5fdb07e..665ad49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -16,7 +16,6 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
@@ -35,58 +34,30 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 
 public class KafkaBrokerModule extends LifecycleModule {
-  private KafkaConfiguration config;
-
-  @Inject
-  public KafkaBrokerModule(KafkaConfiguration config) {
-    this.config = config;
-  }
 
   @Override
   protected void configure() {
-    if (config.kafkaSubscriber().enabled()) {
-      bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
-      bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
-          .to(KafkaEventDeserializer.class);
+    bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+    bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+        .to(KafkaEventDeserializer.class);
 
-      if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
-        DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
-      }
-      if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-        DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
-      }
-      if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
-        DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
-      }
-      if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-        DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
-      }
-    }
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+    DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
 
-    if (config.kafkaPublisher().enabled()) {
-      listener().to(BrokerPublisher.class);
-      bind(BrokerSession.class).to(KafkaSession.class);
+    listener().to(BrokerPublisher.class);
+    bind(BrokerSession.class).to(KafkaSession.class);
 
-      if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
-        DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
-        DynamicSet.bind(binder(), CacheEvictionForwarder.class)
-            .to(BrokerCacheEvictionForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
-        DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
-            .to(BrokerProjectListUpdateForwarder.class);
-      }
-      if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
-        DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
-      }
-    }
+    DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+    DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
+    DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+        .to(BrokerProjectListUpdateForwarder.class);
+    DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index dee7382..b6c01f3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -42,9 +42,7 @@
   private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
   static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
   static final String KAFKA_SECTION = "kafka";
-  static final String ENABLE_KEY = "enabled";
   private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
-  private static final boolean DEFAULT_ENABLE_PROCESSING = true;
   private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
 
   private final Supplier<KafkaSubscriber> subscriber;
@@ -105,20 +103,6 @@
     return defaultValue;
   }
 
-  private static Map<EventTopic, Boolean> eventsEnabled(
-      Supplier<Config> config, String subsection) {
-    Map<EventTopic, Boolean> eventsEnabled = new HashMap<>();
-    for (EventTopic topic : EventTopic.values()) {
-      eventsEnabled.put(
-          topic,
-          config
-              .get()
-              .getBoolean(
-                  KAFKA_SECTION, subsection, topic.enabledKey(), DEFAULT_ENABLE_PROCESSING));
-    }
-    return eventsEnabled;
-  }
-
   public KafkaPublisher kafkaPublisher() {
     return publisher.get();
   }
@@ -180,25 +164,11 @@
     private static final long serialVersionUID = 0L;
 
     public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
-
     public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
-    public static final boolean DEFAULT_BROKER_ENABLED = false;
 
-    private final boolean enabled;
-    private final Map<EventTopic, Boolean> eventsEnabled;
-
-    private KafkaPublisher(Supplier<Config> cfg) {
-      enabled =
-          cfg.get()
-              .getBoolean(
-                  KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
-
-      eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
-
-      if (enabled) {
-        setDefaults();
-        applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
-      }
+    private KafkaPublisher(Supplier<Config> kafkaConfig) {
+      setDefaults();
+      applyKafkaConfig(kafkaConfig, KAFKA_PUBLISHER_SUBSECTION, this);
     }
 
     private void setDefaults() {
@@ -211,14 +181,6 @@
       put("value.serializer", KAFKA_STRING_SERIALIZER);
       put("reconnect.backoff.ms", 5000L);
     }
-
-    public boolean enabled() {
-      return enabled;
-    }
-
-    public boolean enabledEvent(EventTopic eventType) {
-      return eventsEnabled.get(eventType);
-    }
   }
 
   public static class KafkaSubscriber extends Properties {
@@ -226,13 +188,11 @@
 
     static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
 
-    private final boolean enabled;
     private final Integer pollingInterval;
-    private Map<EventTopic, Boolean> eventsEnabled;
     private final Config cfg;
 
-    public KafkaSubscriber(Supplier<Config> configSupplier) {
-      this.cfg = configSupplier.get();
+    public KafkaSubscriber(Supplier<Config> kafkaCfg) {
+      this.cfg = kafkaCfg.get();
 
       this.pollingInterval =
           cfg.getInt(
@@ -241,21 +201,7 @@
               "pollingIntervalMs",
               DEFAULT_POLLING_INTERVAL_MS);
 
-      enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
-
-      eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
-
-      if (enabled) {
-        applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
-      }
-    }
-
-    public boolean enabled() {
-      return enabled;
-    }
-
-    public boolean enabledEvent(EventTopic topic) {
-      return eventsEnabled.get(topic);
+      applyKafkaConfig(kafkaCfg, KAFKA_SUBSCRIBER_SUBSECTION, this);
     }
 
     public Properties initPropsWith(UUID instanceId) {
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 204742e..c404ea1 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -72,18 +72,15 @@
   eventTopic = gerrit_index
 
 [kafka "publisher"]
-  enable = true
   indexEventTopic = gerrit_index
   streamEventTopic = gerrit_stream
   cacheEvictionEventTopic = gerrit_cache_eviction
 
 [kafka "subscriber"]
-  enable = true
   pollingIntervalMs = 1000
   autoCommitIntervalMs = 1000
 ```
 
-
 For further information and supported options, refer to [config](config.md)
 documentation.
 
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 227d0e8..b44d1dd 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -21,29 +21,16 @@
   projectListEventTopic = gerrit_project_list
 
 [kafka "publisher"]
-  enabled = true
-
-  indexEventEnabled = true
-  cacheEventEnabled = true
-  projectListEventEnabled = true
-  streamEventEnabled = true
-
   KafkaProp-compressionType = none
   KafkaProp-deliveryTimeoutMs = 60000
 
 [kafka "subscriber"]
-  enabled = true
   pollingIntervalMs = 1000
 
   KafkaProp-enableAutoCommit = true
   KafkaProp-autoCommitIntervalMs = 1000
   KafkaProp-autoCommitIntervalMs = 5000
 
-  indexEventEnabled = true
-  cacheEventEnabled = true
-  projectListEventEnabled = true
-  streamEventEnabled = true
-
 [ref-database "zookeeper"]
   connectString = "localhost:2181"
   rootNode = "/gerrit/multi-site"
@@ -126,58 +113,6 @@
 :   Name of the Kafka topic to use for publishing cache eviction events
     Defaults to GERRIT.EVENT.PROJECT.LIST
 
-```kafka.publisher.indexEventEnabled```
-:   Enable publication of index events, ignored when `kafka.publisher.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.publisher.cacheEventEnabled```
-:   Enable publication of cache events, ignored when `kafka.publisher.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.publisher.projectListEventEnabled```
-:   Enable publication of project list events, ignored when `kafka.publisher.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.publisher.streamEventEnabled```
-:   Enable publication of stream events, ignored when `kafka.publisher.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.subscriber.enabled```
-:   Enable consuming of events from Kafka
-    Defaults: false
-
-```kafka.subscriber.indexEventEnabled```
-:   Enable consumption of index events, ignored when `kafka.subscriber.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.subscriber.cacheEventEnabled```
-:   Enable consumption of cache events, ignored when `kafka.subscriber.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.subscriber.projectListEventEnabled```
-:   Enable consumption of project list events, ignored when `kafka.subscriber.enabled`
-    is false
-
-    Defaults: true
-
-```kafka.subscriber.streamEventEnabled```
-:   Enable consumption of stream events, ignored when `kafka.subscriber.enabled`
-    is false
-
-    Defaults: true
-
 ```kafka.subscriber.pollingIntervalMs```
 :   Polling interval in milliseconds for checking incoming events
 
@@ -290,11 +225,12 @@
 For example, if you want to set the `auto.commit.interval.ms` property for
 consumers, you need to configure this property as `KafkaProp-autoCommitIntervalMs`.
 
-**NOTE**: custom Kafka properties will be ignored when the relevant subsection is
-disabled (i.e. `kafka.subscriber.enabled` and/or `kafka.publisher.enabled` are
-set to `false`).
-
 The complete list of available settings can be found directly in the kafka website:
 
 * **Publisher**: https://kafka.apache.org/documentation/#producerconfigs
 * **Subscriber**: https://kafka.apache.org/documentation/#consumerconfigs
+
+#### Notes:
+* From version 3.0 publisher and subscribers cannot be disabled independently anymore.
+* From version 3.0 disabling the management of certain cache invalidations or re-indexing
+is not available anymore.
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index 84e1dd8..6d8c60e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -15,8 +15,6 @@
 package com.googlesource.gerrit.plugins.multisite.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.ENABLE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
@@ -46,36 +44,9 @@
   }
 
   @Test
-  public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property.equals(kafkaPropertyValue)).isTrue();
-  }
-
-  @Test
-  public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
   public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
     final String kafkaPropertyName = "fooBarBaz";
     final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
     globalPluginConfig.setString(
         KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
 
@@ -85,36 +56,9 @@
   }
 
   @Test
-  public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property.equals(kafkaPropertyValue)).isTrue();
-  }
-
-  @Test
   public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
     final String kafkaPropertyName = "fooBarBaz";
     final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
     globalPluginConfig.setString(
         KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
 
@@ -157,44 +101,7 @@
     assertThat(property).isEqualTo("gerrit_cache");
   }
 
-  @Test
-  public void shouldReturnKafkaTopicEnabledForCacheEventTopic() {
-    setKafkaTopicEnabled("cacheEventEnabled", false);
-    final Boolean property =
-        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC);
-    assertThat(property).isFalse();
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicEnabledForIndexTopic() {
-    setKafkaTopicEnabled("indexEventEnabled", false);
-    final Boolean property =
-        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC);
-    assertThat(property).isFalse();
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicEnabledForStreamEventTopic() {
-    setKafkaTopicEnabled("streamEventEnabled", false);
-    final Boolean property =
-        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC);
-    assertThat(property).isFalse();
-  }
-
-  @Test
-  public void shouldReturnKafkaTopicEnabledForProjectListEventTopic() {
-    setKafkaTopicEnabled("projectListEventEnabled", false);
-    final Boolean property =
-        getConfiguration().kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC);
-    assertThat(property).isFalse();
-  }
-
   private void setKafkaTopicAlias(String topicKey, String topic) {
     globalPluginConfig.setString(KAFKA_SECTION, null, topicKey, topic);
   }
-
-  private void setKafkaTopicEnabled(String topicEnabledKey, Boolean isEnabled) {
-    globalPluginConfig.setBoolean(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, topicEnabledKey, isEnabled);
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 58ac211..4dc915d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -56,7 +56,6 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -125,8 +124,6 @@
       this.config =
           new FileBasedConfig(
               sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
-      config.setBoolean("kafka", "publisher", "enabled", true);
-      config.setBoolean("kafka", "subscriber", "enabled", true);
       config.setBoolean("ref-database", null, "enabled", false);
       config.save();
 
@@ -141,7 +138,7 @@
           new PluginModule(
               multiSiteConfig,
               new ZkValidationModule(zookeeperConfig),
-              new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
+              new KafkaBrokerModule());
       this.gitModule = new GitModule(multiSiteConfig);
     }