Turn the multi-site plugin into a libModule

Abandon the plugin packaging style and turn the whole project into
a libModule. The @PluginName annotation isn't available anymore
and thus needs to be replaced with a static string.

The configuration needs to be lazy loaded, otherwise it won't be possible
to use straight away in the Guice module and it would risk to not be
loaded at all.

The EventConsumerIT test would need to be more realistic and use a real
life filesystem-based configuration, so to simulate a real injection
of the configuration into the Guice Module.

The repackaging is needed in preparation for the injection of an
alternative Git repository manager to intercept and wrap all
ref-updates and check them against the shared ref-db.

Re-binding the Git repository manager is done very deeply in the Gerrit
daemon startup process and does not allow any modification through
plugins.

The introduction of a libModule allows influencing the Guice bindings
at the core of Gerrit daemon startup.

Change-Id: I302e2c303ca145f951426912a71e87b44082c85b
diff --git a/README.md b/README.md
index 4c84e72..fe8a86f 100644
--- a/README.md
+++ b/README.md
@@ -69,9 +69,16 @@
 
 ## How to configure
 
-Install the multi-site plugin into the `$GERRIT_SITE/plugins` directory of all
+Install the multi-site plugin into the `$GERRIT_SITE/lib` directory of all
 the Gerrit servers that are part of the multi-site cluster.
 
+Add the multi-site module to `$GERRIT_SITE/etc/gerrit.config` as follows:
+
+```
+[gerrit]
+  installModule = com.googlesource.gerrit.plugins.multisite.Module
+```
+
 Create the `$GERRIT_SITE/etc/multi-site.config` on all Gerrit servers with the
 following basic settings:
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index cd80bfa..f4c2c7a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -15,17 +15,20 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.base.Suppliers.ofInstance;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
-import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -39,7 +42,10 @@
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +53,9 @@
 public class Configuration {
   private static final Logger log = LoggerFactory.getLogger(Configuration.class);
 
+  public static final String PLUGIN_NAME = "multi-site";
+  public static final String MULTI_SITE_CONFIG = PLUGIN_NAME + ".config";
+
   static final String INSTANCE_ID_FILE = "instanceId.data";
 
   // common parameters to cache and index sections
@@ -64,67 +73,86 @@
   static final String KAFKA_SECTION = "kafka";
   public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
 
-  private final KafkaPublisher publisher;
-  private final Cache cache;
-  private final Event event;
-  private final Index index;
-  private final KafkaSubscriber subscriber;
-  private final Kafka kafka;
-  private final ZookeeperConfig zookeeperConfig;
+  private final Supplier<KafkaPublisher> publisher;
+  private final Supplier<Cache> cache;
+  private final Supplier<Event> event;
+  private final Supplier<Index> index;
+  private final Supplier<KafkaSubscriber> subscriber;
+  private final Supplier<Kafka> kafka;
+  private final Supplier<ZookeeperConfig> zookeeperConfig;
 
   @Inject
-  Configuration(PluginConfigFactory pluginConfigFactory, @PluginName String pluginName) {
-    this(pluginConfigFactory.getGlobalPluginConfig(pluginName));
+  Configuration(SitePaths sitePaths) {
+    this(new FileBasedConfig(sitePaths.etc_dir.resolve(MULTI_SITE_CONFIG).toFile(), FS.DETECTED));
   }
 
   @VisibleForTesting
-  public Configuration(Config cfg) {
-    kafka = new Kafka(cfg);
-    publisher = new KafkaPublisher(cfg);
-    subscriber = new KafkaSubscriber(cfg);
-    cache = new Cache(cfg);
-    event = new Event(cfg);
-    index = new Index(cfg);
-    zookeeperConfig = new ZookeeperConfig(cfg);
+  public Configuration(final Config cfg) {
+    Supplier<Config> lazyCfg = lazyLoad(cfg);
+    kafka = memoize(() -> new Kafka(lazyCfg));
+    publisher = memoize(() -> new KafkaPublisher(lazyCfg));
+    subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
+    cache = memoize(() -> new Cache(lazyCfg));
+    event = memoize(() -> new Event(lazyCfg));
+    index = memoize(() -> new Index(lazyCfg));
+    zookeeperConfig = memoize(() -> new ZookeeperConfig(lazyCfg));
   }
 
   public ZookeeperConfig getZookeeperConfig() {
-    return zookeeperConfig;
+    return zookeeperConfig.get();
   }
 
   public Kafka getKafka() {
-    return kafka;
+    return kafka.get();
   }
 
   public KafkaPublisher kafkaPublisher() {
-    return publisher;
+    return publisher.get();
   }
 
   public Cache cache() {
-    return cache;
+    return cache.get();
   }
 
   public Event event() {
-    return event;
+    return event.get();
   }
 
   public Index index() {
-    return index;
+    return index.get();
   }
 
   public KafkaSubscriber kafkaSubscriber() {
-    return subscriber;
+    return subscriber.get();
+  }
+
+  private Supplier<Config> lazyLoad(Config config) {
+    if (config instanceof FileBasedConfig) {
+      return memoize(
+          () -> {
+            FileBasedConfig fileConfig = (FileBasedConfig) config;
+            String fileConfigFileName = fileConfig.getFile().getPath();
+            try {
+              log.info("Loading configuration from {}", fileConfigFileName);
+              fileConfig.load();
+            } catch (IOException | ConfigInvalidException e) {
+              log.error("Unable to load configuration from " + fileConfigFileName, e);
+            }
+            return fileConfig;
+          });
+    }
+    return ofInstance(config);
   }
 
   private static boolean getBoolean(
-      Config cfg, String section, String subsection, String name, boolean defaultValue) {
-    return cfg.getBoolean(section, subsection, name, defaultValue);
+      Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
+    return cfg.get().getBoolean(section, subsection, name, defaultValue);
   }
 
   private static int getInt(
-      Config cfg, String section, String subSection, String name, int defaultValue) {
+      Supplier<Config> cfg, String section, String subSection, String name, int defaultValue) {
     try {
-      return cfg.getInt(section, subSection, name, defaultValue);
+      return cfg.get().getInt(section, subSection, name, defaultValue);
     } catch (IllegalArgumentException e) {
       log.error("invalid value for {}; using default value {}", name, defaultValue);
       log.debug("Failed to retrieve integer value: {}", e.getMessage(), e);
@@ -133,28 +161,32 @@
   }
 
   private static String getString(
-      Config cfg, String section, String subsection, String name, String defaultValue) {
-    String value = cfg.getString(section, subsection, name);
+      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+    String value = cfg.get().getString(section, subsection, name);
     if (!Strings.isNullOrEmpty(value)) {
       return value;
     }
     return defaultValue;
   }
 
-  private static Map<EventFamily, Boolean> eventsEnabled(Config config, String subsection) {
+  private static Map<EventFamily, Boolean> eventsEnabled(
+      Supplier<Config> config, String subsection) {
     Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
     for (EventFamily eventFamily : EventFamily.values()) {
       String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
 
       eventsEnabled.put(
           eventFamily,
-          config.getBoolean(
-              KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
+          config
+              .get()
+              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
     }
     return eventsEnabled;
   }
 
-  private static void applyKafkaConfig(Config config, String subsectionName, Properties target) {
+  private static void applyKafkaConfig(
+      Supplier<Config> configSupplier, String subsectionName, Properties target) {
+    Config config = configSupplier.get();
     for (String section : config.getSubsections(KAFKA_SECTION)) {
       if (section.equals(subsectionName)) {
         for (String name : config.getNames(KAFKA_SECTION, section, true)) {
@@ -174,7 +206,11 @@
     target.put(
         "bootstrap.servers",
         getString(
-            config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
+            configSupplier,
+            KAFKA_SECTION,
+            null,
+            "bootstrapServers",
+            DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
   }
 
   public static class Kafka {
@@ -183,12 +219,16 @@
 
     private static final Map<EventFamily, String> EVENT_TOPICS =
         ImmutableMap.of(
-            EventFamily.INDEX_EVENT, "GERRIT.EVENT.INDEX",
-            EventFamily.STREAM_EVENT, "GERRIT.EVENT.STREAM",
-            EventFamily.CACHE_EVENT, "GERRIT.EVENT.CACHE",
-            EventFamily.PROJECT_LIST_EVENT, "GERRIT.EVENT.PROJECT.LIST");
+            EventFamily.INDEX_EVENT,
+            "GERRIT.EVENT.INDEX",
+            EventFamily.STREAM_EVENT,
+            "GERRIT.EVENT.STREAM",
+            EventFamily.CACHE_EVENT,
+            "GERRIT.EVENT.CACHE",
+            EventFamily.PROJECT_LIST_EVENT,
+            "GERRIT.EVENT.PROJECT.LIST");
 
-    Kafka(Config config) {
+    Kafka(Supplier<Config> config) {
       this.bootstrapServers =
           getString(
               config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
@@ -222,10 +262,11 @@
     private final boolean enabled;
     private final Map<EventFamily, Boolean> eventsEnabled;
 
-    private KafkaPublisher(Config cfg) {
+    private KafkaPublisher(Supplier<Config> cfg) {
       enabled =
-          cfg.getBoolean(
-              KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+          cfg.get()
+              .getBoolean(
+                  KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
 
       eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
 
@@ -265,21 +306,22 @@
     private Map<EventFamily, Boolean> eventsEnabled;
     private final Config cfg;
 
-    public KafkaSubscriber(Config cfg) {
+    public KafkaSubscriber(Supplier<Config> configSupplier) {
+      this.cfg = configSupplier.get();
+
       this.pollingInterval =
           cfg.getInt(
               KAFKA_SECTION,
               KAFKA_SUBSCRIBER_SUBSECTION,
               "pollingIntervalMs",
               DEFAULT_POLLING_INTERVAL_MS);
-      this.cfg = cfg;
 
       enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
 
-      eventsEnabled = eventsEnabled(cfg, KAFKA_SUBSCRIBER_SUBSECTION);
+      eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
 
       if (enabled) {
-        applyKafkaConfig(cfg, KAFKA_SUBSCRIBER_SUBSECTION, this);
+        applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
       }
     }
 
@@ -321,14 +363,14 @@
 
     private final boolean synchronize;
 
-    private Forwarding(Config cfg, String section) {
+    private Forwarding(Supplier<Config> cfg, String section) {
       synchronize = getBoolean(cfg, section, SYNCHRONIZE_KEY, DEFAULT_SYNCHRONIZE);
     }
 
     private static boolean getBoolean(
-        Config cfg, String section, String name, boolean defaultValue) {
+        Supplier<Config> cfg, String section, String name, boolean defaultValue) {
       try {
-        return cfg.getBoolean(section, name, defaultValue);
+        return cfg.get().getBoolean(section, name, defaultValue);
       } catch (IllegalArgumentException e) {
         log.error("invalid value for {}; using default value {}", name, defaultValue);
         log.debug("Failed to retrieve boolean value: {}", e.getMessage(), e);
@@ -348,11 +390,11 @@
     private final int threadPoolSize;
     private final List<String> patterns;
 
-    private Cache(Config cfg) {
+    private Cache(Supplier<Config> cfg) {
       super(cfg, CACHE_SECTION);
       threadPoolSize =
           getInt(cfg, CACHE_SECTION, null, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
-      patterns = Arrays.asList(cfg.getStringList(CACHE_SECTION, null, PATTERN_KEY));
+      patterns = Arrays.asList(cfg.get().getStringList(CACHE_SECTION, null, PATTERN_KEY));
     }
 
     public int threadPoolSize() {
@@ -367,7 +409,7 @@
   public static class Event extends Forwarding {
     static final String EVENT_SECTION = "event";
 
-    private Event(Config cfg) {
+    private Event(Supplier<Config> cfg) {
       super(cfg, EVENT_SECTION);
     }
   }
@@ -383,7 +425,7 @@
 
     private final int numStripedLocks;
 
-    private Index(Config cfg) {
+    private Index(Supplier<Config> cfg) {
       super(cfg, INDEX_SECTION);
       threadPoolSize =
           getInt(cfg, INDEX_SECTION, null, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
@@ -458,7 +500,7 @@
 
     private CuratorFramework build;
 
-    private ZookeeperConfig(Config cfg) {
+    private ZookeeperConfig(Supplier<Config> cfg) {
       connectionString =
           getString(cfg, SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
       root = getString(cfg, SECTION, SUBSECTION, KEY_ROOT_NODE, "gerrit/multi-site");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 72c956e..d9d062f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -14,8 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Provides;
@@ -35,6 +35,7 @@
 import java.io.FileReader;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.UUID;
 import org.slf4j.Logger;
@@ -90,8 +91,12 @@
   @Provides
   @Singleton
   @InstanceId
-  UUID getInstanceId(@PluginData java.nio.file.Path dataDir) throws IOException {
+  UUID getInstanceId(SitePaths sitePaths) throws IOException {
     UUID instanceId = null;
+    Path dataDir = sitePaths.data_dir.resolve(Configuration.PLUGIN_NAME);
+    if (!dataDir.toFile().exists()) {
+      dataDir.toFile().mkdirs();
+    }
     String serverIdFile =
         dataDir.toAbsolutePath().toString() + "/" + Configuration.INSTANCE_ID_FILE;
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 20de0d8..0f7b30b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -28,36 +28,29 @@
 import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
-import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableList;
-import com.google.gerrit.server.config.PluginConfigFactory;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ConfigurationTest {
   private static final String INVALID_BOOLEAN = "invalidBoolean";
   private static final String INVALID_INT = "invalidInt";
-  private static final String PLUGIN_NAME = "multi-site";
   private static final int THREAD_POOL_SIZE = 1;
-  private static final boolean SPLIT_BRAIN_ENABLED = true;
 
-  @Mock private PluginConfigFactory pluginConfigFactoryMock;
   private Config globalPluginConfig;
 
   @Before
   public void setUp() {
     globalPluginConfig = new Config();
-    when(pluginConfigFactoryMock.getGlobalPluginConfig(PLUGIN_NAME)).thenReturn(globalPluginConfig);
   }
 
   private Configuration getConfiguration() {
-    return new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+    return new Configuration(globalPluginConfig);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 030d670..759fe7c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
+import com.google.gerrit.server.config.SitePaths;
 import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -48,15 +49,19 @@
 
   @Test
   public void shouldGetInstanceId() throws Exception {
-    File tmpConfigDirectory = tempFolder.newFolder();
-    Path path = Paths.get(tmpConfigDirectory.getPath(), Configuration.INSTANCE_ID_FILE);
+    File tmpSitePath = tempFolder.newFolder();
+    File tmpPluginDataPath =
+        Paths.get(tmpSitePath.getPath(), "data", Configuration.PLUGIN_NAME).toFile();
+    tmpPluginDataPath.mkdirs();
+    Path path = Paths.get(tmpPluginDataPath.getPath(), Configuration.INSTANCE_ID_FILE);
+    SitePaths sitePaths = new SitePaths(Paths.get(tmpSitePath.getPath()));
     assertThat(path.toFile().exists()).isFalse();
 
-    UUID gotUUID1 = module.getInstanceId(Paths.get(tmpConfigDirectory.getPath()));
+    UUID gotUUID1 = module.getInstanceId(sitePaths);
     assertThat(gotUUID1).isNotNull();
     assertThat(path.toFile().exists()).isTrue();
 
-    UUID gotUUID2 = module.getInstanceId(Paths.get(tmpConfigDirectory.getPath()));
+    UUID gotUUID2 = module.getInstanceId(sitePaths);
     assertThat(gotUUID1).isEqualTo(gotUUID2);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 9d9f087..1a1a3a0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -25,6 +25,7 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.data.PatchSetAttribute;
 import com.google.gerrit.server.events.CommentAddedEvent;
 import com.google.gerrit.server.events.Event;
@@ -36,9 +37,10 @@
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.Module;
-import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -46,11 +48,12 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevCommit;
 import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
 import org.testcontainers.containers.KafkaContainer;
@@ -88,28 +91,42 @@
       }
     }
 
-    private final NoteDbStatus noteDb;
+    private final File multiSiteConfigFile;
+    private final Module multiSiteModule;
 
     @Inject
-    public KafkaTestContainerModule(NoteDbStatus noteDb) {
-      this.noteDb = noteDb;
+    public KafkaTestContainerModule(SitePaths sitePaths, Module multiSiteModule) {
+      this.multiSiteConfigFile =
+          sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile();
+      this.multiSiteModule = multiSiteModule;
     }
 
     @Override
     protected void configure() {
-      final KafkaContainer kafka = new KafkaContainer();
-      kafka.start();
+      try {
+        final KafkaContainer kafka = startAndConfigureKafkaConnection();
 
-      Config config = new Config();
-      config.setString("kafka", null, "bootstrapServers", kafka.getBootstrapServers());
+        listener().toInstance(new KafkaStopAtShutdown(kafka));
+
+        install(multiSiteModule);
+
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    private KafkaContainer startAndConfigureKafkaConnection() throws IOException {
+      KafkaContainer kafkaContainer = new KafkaContainer();
+      kafkaContainer.start();
+
+      FileBasedConfig config = new FileBasedConfig(multiSiteConfigFile, FS.DETECTED);
+      config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
       config.setBoolean("kafka", "publisher", "enabled", true);
       config.setBoolean("kafka", "subscriber", "enabled", true);
       config.setBoolean("ref-database", null, "enabled", false);
-      Configuration multiSiteConfig = new Configuration(config);
-      bind(Configuration.class).toInstance(multiSiteConfig);
-      install(new Module(multiSiteConfig, noteDb));
+      config.save();
 
-      listener().toInstance(new KafkaStopAtShutdown(kafka));
+      return kafkaContainer;
     }
   }
 
@@ -137,6 +154,8 @@
     String patchsetRef = change.currentPatchSet().getRefName();
 
     Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
+    assertThat(eventsByType).isNotEmpty();
+
     assertThat(eventsByType.get("change-index"))
         .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
 
@@ -144,9 +163,9 @@
             eventsByType.get("ref-updated").stream()
                 .map(e -> ((RefUpdatedEvent) e).getRefName())
                 .collect(toSet()))
-        .containsAllOf(
-            changeNotesRef,
-            patchsetRef); // 'refs/sequences/changes' not always updated thus not checked
+        .containsAllOf(changeNotesRef, patchsetRef); // 'refs/sequences/changes'
+    // not always updated thus
+    // not checked
 
     List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
     assertThat(patchSetCreatedEvents).hasSize(1);
@@ -182,6 +201,8 @@
 
     Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
 
+    assertThat(eventsByType).isNotEmpty();
+
     assertThat(eventsByType.get("change-index"))
         .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));