Merge Kafka-related modules into single KafkaBrokerModule
Merge all Kafka-related modules in preparation to moving it
to a separate plugin.
Feature: Issue 10829
Change-Id: I4a06c4d7247512563219ff3a5edeba312888b535
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 637ceef..d445251 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -20,9 +20,8 @@
import com.google.inject.Inject;
import com.google.inject.Scopes;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
public class PluginModule extends LifecycleModule {
@@ -30,19 +29,16 @@
private Configuration config;
private ZkValidationModule zkValidationModule;
- private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
- private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+ private KafkaBrokerModule kafkaBrokerModule;
@Inject
public PluginModule(
Configuration config,
ZkValidationModule zkValidationModule,
- KafkaForwardedEventRouterModule forwardedEeventRouterModule,
- KafkaBrokerForwarderModule brokerForwarderModule) {
+ KafkaBrokerModule kafkaBrokerModule) {
this.config = config;
this.zkValidationModule = zkValidationModule;
- this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
- this.kafkaBrokerForwarderModule = brokerForwarderModule;
+ this.kafkaBrokerModule = kafkaBrokerModule;
}
@Override
@@ -55,7 +51,6 @@
DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
listener().to(KafkaBrokerApi.class);
- install(kafkaForwardedEventRouterModule);
- install(kafkaBrokerForwarderModule);
+ install(kafkaBrokerModule);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
deleted file mode 100644
index a49aa8b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-
-public class KafkaBrokerForwarderModule extends LifecycleModule {
- private final KafkaConfiguration config;
-
- @Inject
- public KafkaBrokerForwarderModule(KafkaConfiguration config) {
- this.config = config;
- }
-
- @Override
- protected void configure() {
- if (config.kafkaPublisher().enabled()) {
- listener().to(BrokerPublisher.class);
- bind(BrokerSession.class).to(KafkaSession.class);
-
- if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), CacheEvictionForwarder.class)
- .to(BrokerCacheEvictionForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
- .to(BrokerProjectListUpdateForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
- }
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
new file mode 100644
index 0000000..be57b18
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -0,0 +1,95 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaBrokerModule extends LifecycleModule {
+ private KafkaConfiguration config;
+
+ @Inject
+ public KafkaBrokerModule(KafkaConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ protected void configure() {
+ if (config.kafkaSubscriber().enabled()) {
+ bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+ bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+ .to(KafkaEventDeserializer.class);
+
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+ }
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+ }
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+ }
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
+ }
+
+ DynamicSet.setOf(binder(), DroppedEventListener.class);
+ }
+
+ if (config.kafkaPublisher().enabled()) {
+ listener().to(BrokerPublisher.class);
+ bind(BrokerSession.class).to(KafkaSession.class);
+
+ if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
+ DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
+ DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+ .to(BrokerCacheEvictionForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+ DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+ .to(BrokerProjectListUpdateForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+ DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
deleted file mode 100644
index b7bcabd..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaConsumerModule extends LifecycleModule {
-
- private KafkaConfiguration config;
-
- @Inject
- public KafkaConsumerModule(KafkaConfiguration config) {
- this.config = config;
- }
-
- @Override
- protected void configure() {
-
- bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
- bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
- .to(KafkaEventDeserializer.class);
-
- if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
- }
-
- DynamicSet.setOf(binder(), DroppedEventListener.class);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
deleted file mode 100644
index 2e05de8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-
-public class KafkaForwardedEventRouterModule extends AbstractModule {
- private KafkaConfiguration kafkaConfig;
- private KafkaConsumerModule kafkaConsumerModule;
-
- @Inject
- public KafkaForwardedEventRouterModule(
- KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
- this.kafkaConfig = config;
- this.kafkaConsumerModule = kafkaConsumerModule;
- }
-
- @Override
- protected void configure() {
- if (kafkaConfig.kafkaSubscriber().enabled()) {
- install(kafkaConsumerModule);
- }
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 9596946..cbd192d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -47,13 +47,12 @@
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
import java.io.IOException;
import java.util.ArrayList;
@@ -136,10 +135,7 @@
new PluginModule(
multiSiteConfig,
new ZkValidationModule(multiSiteConfig),
- new KafkaForwardedEventRouterModule(
- new KafkaConfiguration(multiSiteConfig),
- new KafkaConsumerModule(new KafkaConfiguration(multiSiteConfig))),
- new KafkaBrokerForwarderModule(new KafkaConfiguration(multiSiteConfig)));
+ new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
}
@Override