Merge branch 'stable-2.16' into stable-3.0
* stable-2.16:
Move DroppedEventListener out of kafka domain
Merge Kafka-related modules into single KafkaBrokerModule
Decouple the multi-site engine from broker implementation
Fix NPE when source ref is null
Change-Id: I546786507c663230ac573ac0ab27face85ad917d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 03bff94..89c2253 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -14,9 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
-import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.SitePaths;
import com.google.gson.Gson;
@@ -26,19 +24,14 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoopSharedRefDatabase;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
import java.io.BufferedReader;
@@ -55,17 +48,12 @@
public class Module extends LifecycleModule {
private static final Logger log = LoggerFactory.getLogger(Module.class);
private Configuration config;
- private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
- private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+ private BrokerModule brokerModule;
@Inject
- public Module(
- Configuration config,
- KafkaForwardedEventRouterModule forwardedEeventRouterModule,
- KafkaBrokerForwarderModule brokerForwarderModule) {
+ public Module(Configuration config, BrokerModule brokerModule) {
this.config = config;
- this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
- this.kafkaBrokerForwarderModule = brokerForwarderModule;
+ this.brokerModule = brokerModule;
}
@Override
@@ -97,20 +85,10 @@
install(new IndexModule());
}
- install(new BrokerModule());
- DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
- listener().to(KafkaBrokerApi.class);
+ install(brokerModule);
install(new RouterModule());
- install(kafkaForwardedEventRouterModule);
- install(kafkaBrokerForwarderModule);
-
- if (config.getSharedRefDb().isEnabled()) {
- DynamicSet.bind(binder(), ProjectDeletedListener.class)
- .to(ProjectDeletedSharedDbCleanup.class);
- }
-
bind(Gson.class)
.annotatedWith(BrokerGson.class)
.toProvider(GsonProvider.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 22a8d2d..b27ae01 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,8 +15,13 @@
package com.googlesource.gerrit.plugins.multisite;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
public class PluginModule extends LifecycleModule {
@@ -24,20 +29,29 @@
private Configuration config;
private ZkValidationModule zkValidationModule;
+ private KafkaBrokerModule kafkaBrokerModule;
@Inject
- public PluginModule(Configuration config, ZkValidationModule zkValidationModule) {
+ public PluginModule(
+ Configuration config,
+ ZkValidationModule zkValidationModule,
+ KafkaBrokerModule kafkaBrokerModule) {
this.config = config;
this.zkValidationModule = zkValidationModule;
+ this.kafkaBrokerModule = kafkaBrokerModule;
}
@Override
protected void configure() {
- listener().to(PluginStartup.class);
-
if (config.getSharedRefDb().isEnabled()) {
+ listener().to(PluginStartup.class);
logger.atInfo().log("Shared ref-db engine: Zookeeper");
install(zkValidationModule);
}
+
+ DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+ listener().to(KafkaBrokerApi.class);
+
+ install(kafkaBrokerModule);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
new file mode 100644
index 0000000..19cf0f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
+
+public class BrokerApiNoOp implements BrokerApi {
+
+ @Override
+ public boolean send(String topic, Event event) {
+ return true;
+ }
+
+ @Override
+ public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index f1ab1f6..6983984 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -24,6 +24,7 @@
@Override
protected void configure() {
DynamicItem.itemOf(binder(), BrokerApi.class);
+ DynamicItem.bind(binder(), BrokerApi.class).to(BrokerApiNoOp.class).in(Scopes.SINGLETON);
bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
deleted file mode 100644
index a49aa8b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-
-public class KafkaBrokerForwarderModule extends LifecycleModule {
- private final KafkaConfiguration config;
-
- @Inject
- public KafkaBrokerForwarderModule(KafkaConfiguration config) {
- this.config = config;
- }
-
- @Override
- protected void configure() {
- if (config.kafkaPublisher().enabled()) {
- listener().to(BrokerPublisher.class);
- bind(BrokerSession.class).to(KafkaSession.class);
-
- if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), CacheEvictionForwarder.class)
- .to(BrokerCacheEvictionForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
- .to(BrokerProjectListUpdateForwarder.class);
- }
- if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
- }
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 8e41690..7854aab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -22,6 +22,7 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
@@ -41,7 +42,7 @@
private SubscriberMetrics subscriberMetrics;
public AbstractSubcriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
ForwardedEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index fc62e6d..d096148 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@Inject
public CacheEvictionEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index c201f65..df55040 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
@@ -30,7 +30,7 @@
public class IndexEventSubscriber extends AbstractSubcriber {
@Inject
public IndexEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 4157609..5c42ea6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
@@ -30,7 +30,7 @@
public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
@Inject
public ProjectUpdateEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
ProjectListUpdateRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index af0f3e7..b48ab31 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
public class StreamEventSubscriber extends AbstractSubcriber {
@Inject
public StreamEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index 940ad1f..0a0c350 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -32,5 +32,6 @@
listener().to(MultiSiteConsumerRunner.class);
DynamicSet.setOf(binder(), AbstractSubcriber.class);
+ DynamicSet.setOf(binder(), DroppedEventListener.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
new file mode 100644
index 0000000..5fdb07e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -0,0 +1,92 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaBrokerModule extends LifecycleModule {
+ private KafkaConfiguration config;
+
+ @Inject
+ public KafkaBrokerModule(KafkaConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ protected void configure() {
+ if (config.kafkaSubscriber().enabled()) {
+ bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
+ bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
+ .to(KafkaEventDeserializer.class);
+
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+ }
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+ }
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+ }
+ if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
+ }
+ }
+
+ if (config.kafkaPublisher().enabled()) {
+ listener().to(BrokerPublisher.class);
+ bind(BrokerSession.class).to(KafkaSession.class);
+
+ if (config.kafkaPublisher().enabledEvent(EventTopic.INDEX_TOPIC)) {
+ DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventTopic.CACHE_TOPIC)) {
+ DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+ .to(BrokerCacheEvictionForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
+ DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+ .to(BrokerProjectListUpdateForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
+ DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
deleted file mode 100644
index b7bcabd..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaConsumerModule extends LifecycleModule {
-
- private KafkaConfiguration config;
-
- @Inject
- public KafkaConsumerModule(KafkaConfiguration config) {
- this.config = config;
- }
-
- @Override
- protected void configure() {
-
- bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
- bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
- .to(KafkaEventDeserializer.class);
-
- if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
- }
- if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
- }
-
- DynamicSet.setOf(binder(), DroppedEventListener.class);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
deleted file mode 100644
index 2e05de8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-
-public class KafkaForwardedEventRouterModule extends AbstractModule {
- private KafkaConfiguration kafkaConfig;
- private KafkaConsumerModule kafkaConsumerModule;
-
- @Inject
- public KafkaForwardedEventRouterModule(
- KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
- this.kafkaConfig = config;
- this.kafkaConsumerModule = kafkaConsumerModule;
- }
-
- @Override
- protected void configure() {
- if (kafkaConfig.kafkaSubscriber().enabled()) {
- install(kafkaConsumerModule);
- }
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 59df7bb..10b4adb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,8 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import com.google.gerrit.server.config.SitePaths;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -38,8 +37,7 @@
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Configuration configMock;
- @Mock private KafkaForwardedEventRouterModule routerModule;
- @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
+ @Mock private BrokerModule brokerModule;
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -47,7 +45,7 @@
@Before
public void setup() {
- module = new Module(configMock, routerModule, brokerForwarderModule);
+ module = new Module(configMock, brokerModule);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index d1da1f0..1d3db68 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -24,6 +24,7 @@
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.SitePaths;
@@ -36,16 +37,23 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Key;
+import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.GitModule;
import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.PluginModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -93,8 +101,20 @@
}
}
+ public static class TestBrokerModule extends BrokerModule {
+ @Override
+ protected void configure() {
+ DynamicItem.itemOf(binder(), BrokerApi.class);
+ bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+ install(new SubscriberModule());
+ }
+ }
+
private final FileBasedConfig config;
private final Module multiSiteModule;
+ private final PluginModule pluginModule;
+ private final GitModule gitModule;
@Inject
public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
@@ -107,13 +127,13 @@
config.save();
Configuration multiSiteConfig = new Configuration(config, new Config());
- KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
- this.multiSiteModule =
- new Module(
+ this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
+ this.pluginModule =
+ new PluginModule(
multiSiteConfig,
- new KafkaForwardedEventRouterModule(
- kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
- new KafkaBrokerForwarderModule(kafkaConfiguration));
+ new ZkValidationModule(multiSiteConfig),
+ new KafkaBrokerModule(new KafkaConfiguration(multiSiteConfig)));
+ this.gitModule = new GitModule(multiSiteConfig);
}
@Override
@@ -124,6 +144,8 @@
listener().toInstance(new KafkaStopAtShutdown(kafka));
install(multiSiteModule);
+ install(pluginModule);
+ install(gitModule);
} catch (IOException e) {
throw new IllegalStateException(e);