Decouple the multi-site engine from broker implementation
Use DynamicItem and DynamicSet with proper abstraction to decouple
multi-site engine from Kafka broker implementation.
Bind a NoOp implementation of the broker in The multi-site libModule
so that can be replaced by any plugin later on.
Inside the multi-site.jar there is a Kafka implementation of the broker
that can be loaded as a plugin.
Feature: Issue 10829
Change-Id: Ib05beb2c52bf5012f017c3bc114b7a892a1b648d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index b201942..5f4f635 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.multisite;
import com.google.common.annotations.VisibleForTesting;
-import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.SitePaths;
import com.google.gson.Gson;
@@ -23,21 +22,16 @@
import com.google.inject.Inject;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
-import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
import com.googlesource.gerrit.plugins.multisite.validation.ValidationModule;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -54,17 +48,12 @@
private static final Logger log = LoggerFactory.getLogger(Module.class);
private Configuration config;
private NoteDbStatus noteDb;
- private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
- private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+ private BrokerModule brokerModule;
private final boolean disableGitRepositoryValidation;
@Inject
- public Module(
- Configuration config,
- NoteDbStatus noteDb,
- KafkaForwardedEventRouterModule forwardedEeventRouterModule,
- KafkaBrokerForwarderModule brokerForwarderModule) {
- this(config, noteDb, forwardedEeventRouterModule, brokerForwarderModule, false);
+ public Module(Configuration config, NoteDbStatus noteDb, BrokerModule brokerModule) {
+ this(config, noteDb, brokerModule, false);
}
// TODO: It is not possible to properly test the libModules in Gerrit.
@@ -75,22 +64,12 @@
public Module(
Configuration config,
NoteDbStatus noteDb,
- KafkaForwardedEventRouterModule forwardedEeventRouterModule,
- KafkaBrokerForwarderModule brokerForwarderModule,
+ BrokerModule brokerModule,
boolean disableGitRepositoryValidation) {
- init(config, noteDb, forwardedEeventRouterModule, brokerForwarderModule);
- this.disableGitRepositoryValidation = disableGitRepositoryValidation;
- }
-
- private void init(
- Configuration config,
- NoteDbStatus noteDb,
- KafkaForwardedEventRouterModule forwardedEeventRouterModule,
- KafkaBrokerForwarderModule brokerForwarderModule) {
this.config = config;
this.noteDb = noteDb;
- this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
- this.kafkaBrokerForwarderModule = brokerForwarderModule;
+ this.brokerModule = brokerModule;
+ this.disableGitRepositoryValidation = disableGitRepositoryValidation;
}
@Override
@@ -121,15 +100,10 @@
install(new IndexModule());
}
- install(new BrokerModule());
- DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
- listener().to(KafkaBrokerApi.class);
+ install(brokerModule);
install(new RouterModule());
- install(kafkaForwardedEventRouterModule);
- install(kafkaBrokerForwarderModule);
-
install(
new ValidationModule(
config, disableGitRepositoryValidation || !config.getSharedRefDb().isEnabled()));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index ff8bd13..637ceef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,8 +15,14 @@
package com.googlesource.gerrit.plugins.multisite;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
public class PluginModule extends LifecycleModule {
@@ -24,11 +30,19 @@
private Configuration config;
private ZkValidationModule zkValidationModule;
+ private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
+ private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
@Inject
- public PluginModule(Configuration config, ZkValidationModule zkValidationModule) {
+ public PluginModule(
+ Configuration config,
+ ZkValidationModule zkValidationModule,
+ KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+ KafkaBrokerForwarderModule brokerForwarderModule) {
this.config = config;
this.zkValidationModule = zkValidationModule;
+ this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
+ this.kafkaBrokerForwarderModule = brokerForwarderModule;
}
@Override
@@ -37,5 +51,11 @@
logger.atInfo().log("Shared ref-db engine: Zookeeper");
install(zkValidationModule);
}
+
+ DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+ listener().to(KafkaBrokerApi.class);
+
+ install(kafkaForwardedEventRouterModule);
+ install(kafkaBrokerForwarderModule);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
new file mode 100644
index 0000000..19cf0f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
+
+public class BrokerApiNoOp implements BrokerApi {
+
+ @Override
+ public boolean send(String topic, Event event) {
+ return true;
+ }
+
+ @Override
+ public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index f1ab1f6..6983984 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -24,6 +24,7 @@
@Override
protected void configure() {
DynamicItem.itemOf(binder(), BrokerApi.class);
+ DynamicItem.bind(binder(), BrokerApi.class).to(BrokerApiNoOp.class).in(Scopes.SINGLETON);
bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 6e70299..2308c0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -23,11 +23,11 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
-
import java.io.IOException;
import java.util.UUID;
@@ -43,7 +43,7 @@
private SubscriberMetrics subscriberMetrics;
public AbstractSubcriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
ForwardedEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index fc62e6d..d096148 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@Inject
public CacheEvictionEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index c201f65..df55040 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
@@ -30,7 +30,7 @@
public class IndexEventSubscriber extends AbstractSubcriber {
@Inject
public IndexEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 4157609..5c42ea6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
@@ -30,7 +30,7 @@
public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
@Inject
public ProjectUpdateEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
ProjectListUpdateRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index af0f3e7..b48ab31 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -20,7 +20,7 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
public class StreamEventSubscriber extends AbstractSubcriber {
@Inject
public StreamEventSubscriber(
- BrokerApi brokerApi,
+ BrokerApiWrapper brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index e93d5f3..8b7a17b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,8 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import com.google.gerrit.server.config.SitePaths;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -39,9 +38,7 @@
private Configuration configMock;
@Mock private NoteDbStatus noteDb;
-
- @Mock private KafkaForwardedEventRouterModule routerModule;
- @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
+ @Mock private BrokerModule brokerModule;
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -49,7 +46,7 @@
@Before
public void setup() {
- module = new Module(configMock, noteDb, routerModule, brokerForwarderModule);
+ module = new Module(configMock, noteDb, brokerModule);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index b69f9e6..9596946 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -24,6 +24,7 @@
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.SitePaths;
@@ -36,17 +37,24 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Key;
+import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.Module;
import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
+import com.googlesource.gerrit.plugins.multisite.PluginModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -98,8 +106,19 @@
}
}
+ public static class TestBrokerModule extends BrokerModule {
+ @Override
+ protected void configure() {
+ DynamicItem.itemOf(binder(), BrokerApi.class);
+ bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+ install(new SubscriberModule());
+ }
+ }
+
private final FileBasedConfig config;
private final Module multiSiteModule;
+ private final PluginModule pluginModule;
@Inject
public KafkaTestContainerModule(SitePaths sitePaths, NoteDbStatus noteDb) throws IOException {
@@ -112,15 +131,15 @@
config.save();
Configuration multiSiteConfig = new Configuration(config, new Config());
- KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
- this.multiSiteModule =
- new Module(
+ this.multiSiteModule = new Module(multiSiteConfig, noteDb, new TestBrokerModule(), true);
+ this.pluginModule =
+ new PluginModule(
multiSiteConfig,
- noteDb,
+ new ZkValidationModule(multiSiteConfig),
new KafkaForwardedEventRouterModule(
- kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
- new KafkaBrokerForwarderModule(kafkaConfiguration),
- true);
+ new KafkaConfiguration(multiSiteConfig),
+ new KafkaConsumerModule(new KafkaConfiguration(multiSiteConfig))),
+ new KafkaBrokerForwarderModule(new KafkaConfiguration(multiSiteConfig)));
}
@Override
@@ -131,6 +150,7 @@
listener().toInstance(new KafkaStopAtShutdown(kafka));
install(multiSiteModule);
+ install(pluginModule);
} catch (IOException e) {
throw new IllegalStateException(e);