Decouple the multi-site engine from broker implementation

Use DynamicItem and DynamicSet with proper abstraction to decouple
multi-site engine from Kafka broker implementation.
Bind a NoOp implementation of the broker in The multi-site libModule
so that can be replaced by any plugin later on.

Inside the multi-site.jar there is a Kafka implementation of the broker
that can be loaded as a plugin.

Feature: Issue 10829
Change-Id: Ib05beb2c52bf5012f017c3bc114b7a892a1b648d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index b201942..5f4f635 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gson.Gson;
@@ -23,21 +22,16 @@
 import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.ProvisionException;
-import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ValidationModule;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -54,17 +48,12 @@
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
   private NoteDbStatus noteDb;
-  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
-  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
+  private BrokerModule brokerModule;
   private final boolean disableGitRepositoryValidation;
 
   @Inject
-  public Module(
-      Configuration config,
-      NoteDbStatus noteDb,
-      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
-      KafkaBrokerForwarderModule brokerForwarderModule) {
-    this(config, noteDb, forwardedEeventRouterModule, brokerForwarderModule, false);
+  public Module(Configuration config, NoteDbStatus noteDb, BrokerModule brokerModule) {
+    this(config, noteDb, brokerModule, false);
   }
 
   // TODO: It is not possible to properly test the libModules in Gerrit.
@@ -75,22 +64,12 @@
   public Module(
       Configuration config,
       NoteDbStatus noteDb,
-      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
-      KafkaBrokerForwarderModule brokerForwarderModule,
+      BrokerModule brokerModule,
       boolean disableGitRepositoryValidation) {
-    init(config, noteDb, forwardedEeventRouterModule, brokerForwarderModule);
-    this.disableGitRepositoryValidation = disableGitRepositoryValidation;
-  }
-
-  private void init(
-      Configuration config,
-      NoteDbStatus noteDb,
-      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
-      KafkaBrokerForwarderModule brokerForwarderModule) {
     this.config = config;
     this.noteDb = noteDb;
-    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
-    this.kafkaBrokerForwarderModule = brokerForwarderModule;
+    this.brokerModule = brokerModule;
+    this.disableGitRepositoryValidation = disableGitRepositoryValidation;
   }
 
   @Override
@@ -121,15 +100,10 @@
       install(new IndexModule());
     }
 
-    install(new BrokerModule());
-    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
-    listener().to(KafkaBrokerApi.class);
+    install(brokerModule);
 
     install(new RouterModule());
 
-    install(kafkaForwardedEventRouterModule);
-    install(kafkaBrokerForwarderModule);
-
     install(
         new ValidationModule(
             config, disableGitRepositoryValidation || !config.getSharedRefDb().isEnabled()));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index ff8bd13..637ceef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,8 +15,14 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class PluginModule extends LifecycleModule {
@@ -24,11 +30,19 @@
 
   private Configuration config;
   private ZkValidationModule zkValidationModule;
+  private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
+  private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
 
   @Inject
-  public PluginModule(Configuration config, ZkValidationModule zkValidationModule) {
+  public PluginModule(
+      Configuration config,
+      ZkValidationModule zkValidationModule,
+      KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+      KafkaBrokerForwarderModule brokerForwarderModule) {
     this.config = config;
     this.zkValidationModule = zkValidationModule;
+    this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
+    this.kafkaBrokerForwarderModule = brokerForwarderModule;
   }
 
   @Override
@@ -37,5 +51,11 @@
       logger.atInfo().log("Shared ref-db engine: Zookeeper");
       install(zkValidationModule);
     }
+
+    DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+    listener().to(KafkaBrokerApi.class);
+
+    install(kafkaForwardedEventRouterModule);
+    install(kafkaBrokerForwarderModule);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
new file mode 100644
index 0000000..19cf0f7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import java.util.function.Consumer;
+
+public class BrokerApiNoOp implements BrokerApi {
+
+  @Override
+  public boolean send(String topic, Event event) {
+    return true;
+  }
+
+  @Override
+  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index f1ab1f6..6983984 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -24,6 +24,7 @@
   @Override
   protected void configure() {
     DynamicItem.itemOf(binder(), BrokerApi.class);
+    DynamicItem.bind(binder(), BrokerApi.class).to(BrokerApiNoOp.class).in(Scopes.SINGLETON);
 
     bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 6e70299..2308c0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -23,11 +23,11 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
-
 import java.io.IOException;
 import java.util.UUID;
 
@@ -43,7 +43,7 @@
   private SubscriberMetrics subscriberMetrics;
 
   public AbstractSubcriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index fc62e6d..d096148 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
   @Inject
   public CacheEvictionEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index c201f65..df55040 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
@@ -30,7 +30,7 @@
 public class IndexEventSubscriber extends AbstractSubcriber {
   @Inject
   public IndexEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 4157609..5c42ea6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
@@ -30,7 +30,7 @@
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index af0f3e7..b48ab31 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -20,7 +20,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
@@ -30,7 +30,7 @@
 public class StreamEventSubscriber extends AbstractSubcriber {
   @Inject
   public StreamEventSubscriber(
-      BrokerApi brokerApi,
+      BrokerApiWrapper brokerApi,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index e93d5f3..8b7a17b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,8 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 
 import com.google.gerrit.server.config.SitePaths;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -39,9 +38,7 @@
   private Configuration configMock;
 
   @Mock private NoteDbStatus noteDb;
-
-  @Mock private KafkaForwardedEventRouterModule routerModule;
-  @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
+  @Mock private BrokerModule brokerModule;
 
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -49,7 +46,7 @@
 
   @Before
   public void setup() {
-    module = new Module(configMock, noteDb, routerModule, brokerForwarderModule);
+    module = new Module(configMock, noteDb, brokerModule);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index b69f9e6..9596946 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -24,6 +24,7 @@
 import com.google.gerrit.acceptance.UseLocalDisk;
 import com.google.gerrit.extensions.api.changes.ReviewInput;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
@@ -36,17 +37,24 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
+import com.googlesource.gerrit.plugins.multisite.PluginModule;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
 import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -98,8 +106,19 @@
       }
     }
 
+    public static class TestBrokerModule extends BrokerModule {
+      @Override
+      protected void configure() {
+        DynamicItem.itemOf(binder(), BrokerApi.class);
+        bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+
+        install(new SubscriberModule());
+      }
+    }
+
     private final FileBasedConfig config;
     private final Module multiSiteModule;
+    private final PluginModule pluginModule;
 
     @Inject
     public KafkaTestContainerModule(SitePaths sitePaths, NoteDbStatus noteDb) throws IOException {
@@ -112,15 +131,15 @@
       config.save();
 
       Configuration multiSiteConfig = new Configuration(config, new Config());
-      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
-      this.multiSiteModule =
-          new Module(
+      this.multiSiteModule = new Module(multiSiteConfig, noteDb, new TestBrokerModule(), true);
+      this.pluginModule =
+          new PluginModule(
               multiSiteConfig,
-              noteDb,
+              new ZkValidationModule(multiSiteConfig),
               new KafkaForwardedEventRouterModule(
-                  kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
-              new KafkaBrokerForwarderModule(kafkaConfiguration),
-              true);
+                  new KafkaConfiguration(multiSiteConfig),
+                  new KafkaConsumerModule(new KafkaConfiguration(multiSiteConfig))),
+              new KafkaBrokerForwarderModule(new KafkaConfiguration(multiSiteConfig)));
     }
 
     @Override
@@ -131,6 +150,7 @@
         listener().toInstance(new KafkaStopAtShutdown(kafka));
 
         install(multiSiteModule);
+        install(pluginModule);
 
       } catch (IOException e) {
         throw new IllegalStateException(e);