Merge branch 'stable-3.0'

* stable-3.0:
  Broker publisher metric for number of message published
  Use KafkaConfiguration for Kafka related properties and functionality
  Fix setup.sh script and use bazel-bin directory
  Use bazel-bin to look for plugin's output
  Move ProjectDeletedSharedDbCleanup to global Module
  Decouple Kafka configuration class
  Add LB (HAProxy) to dockerised environment
  Add health-check plugin to dockerised environment
  Add multi-site plugin to dockerised environment
  Move CURATOR_VER constant to right before its use
  Fix plugins URL in the multi-site local environment
  Zookeeper module reads it's own configuration
  Update design proposal to abstract broker and Ref-DB
  Move Zookeeper-specifics into its own Module
  Don't import Id/Key/NameKey directly
  Module: Simplify BufferedReader creation
  DefaultSharedRefEnforcement: Fix MutableConstantField warning
  Fix ClassCanBeStatic warnings
  Fix DefaultCharset warnings
  ChangeCheckerImpl: Fix OperatorPrecedence warning
  Module: Fix DefaultCharset warning
  Configuration: Fix MutableConstantField warning
  KafkaSubscriber: Fix ClassCanBeStatic warning
  Bazel: Drop dependency on commons-lang3 library
  Bazel: Remove unneeded dependencies from test rule
  Bazel: Harmonize library names
  Format build files with buildifier
  Upgrade zookeeper to 3.4.14
  Update test dependencies
  Consolidate source of generated image files
  Allow restart of Gerrit instances
  Add header to the custom Kafka properties

Change-Id: Icb489c3bec2ae02e161d03655518fbe39f5b095b
diff --git a/BUILD b/BUILD
index 52f037b..1de15db 100644
--- a/BUILD
+++ b/BUILD
@@ -17,11 +17,10 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
-        "@commons-lang3//jar",
-        "@kafka_client//jar",
+        "@curator-client//jar",
         "@curator-framework//jar",
         "@curator-recipes//jar",
-        "@curator-client//jar",
+        "@kafka-client//jar",
         "@zookeeper//jar",
     ],
 )
@@ -46,7 +45,7 @@
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":multi-site__plugin",
         "@wiremock//jar",
-        "@kafka_client//jar",
+        "@kafka-client//jar",
         "@testcontainers-kafka//jar",
         "//lib/testcontainers",
         "@curator-framework//jar",
diff --git a/DESIGN.md b/DESIGN.md
index 248d8cc..0995a14 100644
--- a/DESIGN.md
+++ b/DESIGN.md
@@ -257,8 +257,18 @@
 The multi-site solution described here depends upon the combined use of different
 components:
 
-- **multi-site plugin**: Enables the replication of Gerrit _indexes_, _caches_,
-  and _stream events_ across sites.
+- **multi-site libModule**: exports interfaces as DynamicItems to plug in specific
+implementation of `Brokers` and `Global Ref-DB` plugins.
+
+- **broker plugin**: an implementation of the broker interface, which enables the
+replication of Gerrit _indexes_, _caches_,  and _stream events_ across sites.
+When no specific implementation is provided, then the [Broker Noop implementation](#broker-noop-implementation)
+then libModule interfaces are mapped to internal no-ops implementations.
+
+- **Global Ref-DB plugin**: an implementation of the Global Ref-DB interface,
+which enables the detection of out-of-sync refs across gerrit sites.
+When no specific implementation is provided, then the [Global Ref-DB Noop implementation](#global-ref-db-noop-implementation)
+then libModule interfaces are mapped to internal no-ops implementations.
 
 - **replication plugin**: enables the replication of the _Git repositories_ across
   sites.
@@ -273,14 +283,82 @@
 
 The interactions between these components are illustrated in the following diagram:
 
-![Initial Multi-Site Plugin Architecture](./images/architecture-first-iteration.png)
+![Initial Multi-Site Plugin Architecture](images/architecture-first-iteration.png)
 
 ## Implementation Details
 
-### Message brokers
-The multi-site plugin adopts an event-sourcing pattern and is based on an
-external message broker. The current implementation uses Apache Kafka.
-It is, however, potentially extensible to others, like RabbitMQ or NATS.
+### Multi-site libModule
+As mentioned earlier there are different components behind the overarching architecture
+of this solution of a distributed multi-site gerrit installation, each one fulfilling
+a specific goal. However, whilst the goal of each component is well-defined, the
+mechanics on how each single component achieves that goal is not: the choice of which
+specific message broker or which Ref-DB to use can depend on different factors,
+such as scalability, maintainability, business standards and costs, to name a few.
+
+For this reason the multi-site component is designed to be explicitly agnostic to
+specific choices of brokers and Global Ref-DB implementations, and it does
+not care how they, specifically, fulfill their task.
+
+Instead, this component takes on only two responsibilities:
+
+* Wrapping the GitRepositoryManager so that every interaction with git can be
+verified by the Global Ref-DB plugin.
+
+* Exposing DynamicItem bindings onto which concrete _Broker_ and a _Global Ref-DB_
+plugins can register their specific implementations.
+When no such plugins are installed, then the initial binding points to no-ops.
+
+* Detect out-of-sync refs across multiple gerrit sites:
+Each change attempting to mutate a ref will be checked against the Ref-DB to
+guarantee that each node has an up-to-date view of the repository state.
+
+### Message brokers plugin
+Each gerrit node in the cluster needs to be informed and inform all other nodes
+about fundamental events, such as indexing of new changes, cache evictions and
+stream events. This component will provide a specific pub/sub broker implementation
+that is able to do so.
+
+When provided, the message broker plugin will override the dynamicItem binding exposed
+by the multi-site module with a specific implementation, such as Kafka, RabbitMQ, NATS, etc.
+
+#### Broker Noop implementation
+The default `Noop` implementation provided by the `Multi-site` libModule does nothing
+upon publishing and producing events. This is useful for setting up a test environment
+and allows multi-site library to be installed independently from any additional
+plugins or the existence of a specific broker installation.
+The Noop implementation can also be useful when there is no need for coordination
+with remote nodes, since it avoids maintaining an external broker altogether:
+for example, using the multi-site plugin purely for the purpose of replicating the Git
+repository to a disaster-recovery site and nothing else.
+
+### Global Ref-DB plugin
+Whilst the replication plugin allows the propagation of the Git repositories across
+sites and the broker plugin provides a mechanism to propagate events, the Global
+Ref-DB ensures correct alignment of refs of the multi-site nodes.
+
+It is the responsibility of this plugin to store atomically key/pairs of refs in
+order to allow the libModule to detect out-of-sync refs across multi sites.
+(aka split brain).  This is achieved by storing the most recent `sha` for each
+specific mutable `refs`, by the usage of some sort of atomic _Compare and Set_ operation.
+
+We mentioned earlier the [CAP theorem](https://en.wikipedia.org/wiki/CAP_theorem),
+which in a nutshell states that a distributed system can only provide two of these
+three properties: _Consistency_, _Availability_ and _Partition tolerance_: the Global
+Ref-DB helps achieving _Consistency_ and _Partition tolerance_ (thus sacrificing
+Availability).
+
+See [Prevent split brain thanks to Global Ref-DB](#prevent-split-brain-thanks-to-global-ref-db)
+For a thorough example on this.
+
+When provided, the Global Ref-DB plugin will override the dynamicItem binding
+exposed by the multi-site module with a specific implementation, such as Zoekeeper,
+etcd, MySQL, Mongo, etc.
+
+#### Global Ref-DB Noop implementation
+The default `Noop` implementation provided by the `Multi-site` libModule accepts
+any refs without checking for consistency. This is useful for setting up a test environment
+and allows multi-site library to be installed independently from any additional
+plugins or the existence of a specific Ref-DB installation.
 
 ### Eventual consistency on Git, indexes, caches, and stream events
 
@@ -348,7 +426,7 @@
 
 #### The diagram below illustrates the happy path with crash recovery returning the system to a healthy state.
 
-![Healthy Use Case](src/main/resources/Documentation/git-replication-healthy.png)
+![Healthy Use Case](images/git-replication-healthy.png)
 
 In this case we are considering two different clients each doing a `push` on top of
 the same reference. This could be a new commit in a branch or the change of an existing commit.
@@ -376,7 +454,7 @@
 
 #### The Split Brain situation is illustrated in the following diagram.
 
-![Split Brain Use Case](src/main/resources/Documentation/git-replication-split-brain.png)
+![Split Brain Use Case](images/git-replication-split-brain.png)
 
 In this case the steps are very similar except that `Instance1` fails after acknowledging the
 push of `W0 -> W1` but before having replicated the status to `Instance2`.
@@ -408,24 +486,12 @@
 
 **NOTE**: The two options are not exclusive.
 
-#### Introduce a `DfsRefDatabase`
+#### Prevent split brain thanks to Global Ref-DB
 
-An implementation of the out-of-sync detection logic could be based on a central
-coordinator holding the _last known status_ of a _mutable ref_ (immutable refs won't
-have to be stored here). This would be, essentially, a DFS base `RefDatabase` or `DfsRefDatabase`.
+The above scenario can be prevented by using an implementation of the Global Ref-DB
+interface, which will operate as follows:
 
-This component would:
- 
-- Contain a subset of the local `RefDatabase` data:
-  - Store only _mutable _ `refs`
-  - Keep only the most recent `sha` for each specific `ref`
-- Require that atomic _Compare and Set_ operations can be performed on a
-key -> value storage.  For example, it could be implemented using `Zookeeper`. (One implementation
-was done by Dave Borowitz some years ago.)
-
-This interaction is illustrated in the diagram below:
-
-![Split Brain Prevented](src/main/resources/Documentation/git-replication-split-brain-detected.png)
+![Split Brain Prevented](images/git-replication-split-brain-detected.png)
 
 The difference, in respect to the split brain use case, is that now, whenever a change of a
 _mutable ref_ is requested, the Gerrit server verifies with the central RefDB that its
@@ -469,17 +535,6 @@
   able to differentiate the type of traffic and, thus, is forced always to use the
   RW site, even though the operation is RO.
 
-- **Support for different brokers**: Currently, the multi-site plugin supports Kafka.
-  More brokers need to be supported in a fashion similar to the
-  [ITS-* plugins framework](https://gerrit-review.googlesource.com/admin/repos/q/filter:plugins%252Fits).
-  Explicit references to Kafka must be removed from the multi-site plugin.  Other plugins may contribute
-  implementations to the broker extension point.
-
-- **Split the publishing and subscribing**:  Create two separate
-  plugins.  Combine the generation of the events into the current kafka-
-  events plugin.  The multi-site plugin will focus on
-  consumption of, and sorting of, the replication issues.
-
 ## Step-2: Move to multi-site Stage #8.
 
 - Auto-reconfigure HAProxy rules based on the projects sharding policy
@@ -487,5 +542,3 @@
 - Serve RW/RW traffic based on the project name/ref-name.
 
 - Balance traffic with "locally-aware" policies based on historical data
-
-- Preventing split-brain in case of temporary sites isolation
diff --git a/README.md b/README.md
index 7c731af..6e55d86 100644
--- a/README.md
+++ b/README.md
@@ -53,7 +53,7 @@
 bazel build plugins/multi-site
 ```
 
-The multi-site.jar plugin is generated to `bazel-genfiles/plugins/multi-site/multi-site.jar`.
+The multi-site.jar plugin is generated to `bazel-bin/plugins/multi-site/multi-site.jar`.
 
 Example of testing the multi-site plugin:
 
diff --git a/dockerised_local_env/.gitignore b/dockerised_local_env/.gitignore
index 714b661..c88a11c 100644
--- a/dockerised_local_env/.gitignore
+++ b/dockerised_local_env/.gitignore
@@ -6,6 +6,9 @@
 gerrit-1/data
 gerrit-1/bin
 gerrit-1/etc
+gerrit-1/tmp
+gerrit-1/lib
+gerrit-1/ssh/known_hosts
 gerrit-2/plugins
 gerrit-2/db
 gerrit-2/git
@@ -14,3 +17,9 @@
 gerrit-2/data
 gerrit-2/bin
 gerrit-2/etc
+gerrit-2/tmp
+gerrit-2/lib
+gerrit-2/ssh/known_hosts
+gerrit-2/bin
+syslog-sidecar/logs
+syslog-sidecar/socket
diff --git a/dockerised_local_env/Makefile b/dockerised_local_env/Makefile
index d4f5c39..0b6b7e3 100644
--- a/dockerised_local_env/Makefile
+++ b/dockerised_local_env/Makefile
@@ -1,9 +1,11 @@
 GERRIT_JOB=Gerrit-bazel-stable-2.16
-BUILD_NUM=259
-GERRIT_1_PLUGINS_DIRECTORY=./gerrit-1/plugins
-GERRIT_2_PLUGINS_DIRECTORY=./gerrit-2/plugins
+BUILD_NUM=377
 GERRIT_1_BIN_DIRECTORY=./gerrit-1/bin
 GERRIT_2_BIN_DIRECTORY=./gerrit-2/bin
+GERRIT_1_LIB_DIRECTORY=./gerrit-1/lib
+GERRIT_2_LIB_DIRECTORY=./gerrit-2/lib
+GERRIT_1_ETC_DIRECTORY=./gerrit-1/etc
+GERRIT_2_ETC_DIRECTORY=./gerrit-2/etc
 GERRIT_1_PLUGINS_DIRECTORY=./gerrit-1/plugins
 GERRIT_2_PLUGINS_DIRECTORY=./gerrit-2/plugins
 CORE_PLUGINS=replication
@@ -14,7 +16,7 @@
 all: prepare download build
 
 prepare:
-	-mkdir -p $(GERRIT_1_PLUGINS_DIRECTORY) $(GERRIT_2_PLUGINS_DIRECTORY) $(GERRIT_1_BIN_DIRECTORY) $(GERRIT_2_BIN_DIRECTORY)
+	-mkdir -p $(GERRIT_1_PLUGINS_DIRECTORY) $(GERRIT_2_PLUGINS_DIRECTORY) $(GERRIT_1_BIN_DIRECTORY) $(GERRIT_2_BIN_DIRECTORY) $(GERRIT_1_ETC_DIRECTORY) $(GERRIT_2_ETC_DIRECTORY) $(GERRIT_1_LIB_DIRECTORY) $(GERRIT_2_LIB_DIRECTORY)
 
 download: gerrit plugin_websession_flatfile \
 	plugin_healthcheck \
@@ -24,19 +26,19 @@
 gerrit: prepare
 	$(WGET) $(CI_URL)/$(GERRIT_JOB)/lastSuccessfulBuild/artifact/gerrit/bazel-bin/release.war -P $(GERRIT_1_BIN_DIRECTORY)
 	cp $(GERRIT_1_BIN_DIRECTORY)/*.war $(GERRIT_2_BIN_DIRECTORY)
-	for plugin in $(CORE_PLUGINS); do $(WGET) $(CI_URL)/$(GERRIT_JOB)/$(BUILD_NUM)/artifact/gerrit/bazel-genfiles/plugins/$$plugin/$$plugin.jar -P $(GERRIT_1_PLUGINS_DIRECTORY); done
+	for plugin in $(CORE_PLUGINS); do $(WGET) $(CI_URL)/$(GERRIT_JOB)/lastSuccessfulBuild/artifact/gerrit/bazel-bin/plugins/$$plugin/$$plugin.jar -P $(GERRIT_1_PLUGINS_DIRECTORY); done
 	cp $(GERRIT_1_PLUGINS_DIRECTORY)/*.jar $(GERRIT_2_PLUGINS_DIRECTORY)
 
 plugin_websession_flatfile: prepare
-	$(WGET) $(CI_URL)/plugin-websession-flatfile-bazel-master-stable-2.16/lastSuccessfulBuild/artifact/bazel-genfiles/plugins/websession-flatfile/websession-flatfile.jar -P $(GERRIT_1_PLUGINS_DIRECTORY)
+	$(WGET) $(CI_URL)/plugin-websession-flatfile-bazel-master-stable-2.16/lastSuccessfulBuild/artifact/bazel-bin/plugins/websession-flatfile/websession-flatfile.jar -P $(GERRIT_1_PLUGINS_DIRECTORY)
 	cp $(GERRIT_1_PLUGINS_DIRECTORY)/websession-flatfile.jar $(GERRIT_2_PLUGINS_DIRECTORY)/websession-flatfile.jar
 
 plugin_multi_site: prepare
-	$(WGET) $(CI_URL)/plugin-multi-site-bazel-stable-2.16/lastSuccessfulBuild/artifact/bazel-genfiles/plugins/multi-site/multi-site.jar -P $(GERRIT_1_PLUGINS_DIRECTORY)
-	cp $(GERRIT_1_PLUGINS_DIRECTORY)/multi-site.jar $(GERRIT_2_PLUGINS_DIRECTORY)/multi-site.jar
+	$(WGET) $(CI_URL)/plugin-multi-site-bazel-stable-2.16/lastSuccessfulBuild/artifact/bazel-bin/plugins/multi-site/multi-site.jar -P $(GERRIT_1_LIB_DIRECTORY)
+	cp $(GERRIT_1_LIB_DIRECTORY)/multi-site.jar $(GERRIT_2_LIB_DIRECTORY)/multi-site.jar
 
 plugin_healthcheck: prepare
-	$(WGET) $(CI_URL)/plugin-healthcheck-bazel-stable-2.16/lastSuccessfulBuild/artifact/bazel-genfiles/plugins/healthcheck/healthcheck.jar -P $(GERRIT_1_PLUGINS_DIRECTORY)
+	$(WGET) $(CI_URL)/plugin-healthcheck-bazel-stable-2.16/lastSuccessfulBuild/artifact/bazel-bin/plugins/healthcheck/healthcheck.jar -P $(GERRIT_1_PLUGINS_DIRECTORY)
 	cp $(GERRIT_1_PLUGINS_DIRECTORY)/healthcheck.jar $(GERRIT_2_PLUGINS_DIRECTORY)/healthcheck.jar
 
 build:
@@ -44,13 +46,11 @@
 	docker build -t $(MYDIR) ./gerrit-2
 
 clean_gerrit: prepare
-	rm -fr gerrit-1/db/ gerrit-1/data/ gerrit-1/cache/ gerrit-1/db/ gerrit-1/git/ gerrit-1/indexes/ gerrit-1/etc/
-	rm -fr gerrit-2/db/ gerrit-2/data/ gerrit-2/cache/ gerrit-2/db/ gerrit-2/git/ gerrit-2/indexes/ gerrit-1/etc/
-	-mkdir -p gerrit-{1,2}/etc/
-	export GERRIT_REPLICATION_INSTANCE=gerrit-2; cat ./gerrit-common/replication.config.template | envsubst '$${GERRIT_REPLICATION_INSTANCE}'  > ./gerrit-1/etc/replication.config
-	export GERRIT_REPLICATION_INSTANCE=gerrit-1; cat ./gerrit-common/replication.config.template | envsubst '$${GERRIT_REPLICATION_INSTANCE}'  > ./gerrit-2/etc/replication.config
-	cp ./gerrit-common/gerrit.config ./gerrit-1/etc
-	cp ./gerrit-common/gerrit.config ./gerrit-2/etc
+	-rm -fr gerrit-{1,2}/{db,data,cache,db,git,index,etc,bin,tmp,plugins,lib}/*
+	export GERRIT_REPLICATION_INSTANCE=gerrit-2; cat ./gerrit-common/replication.config.template | envsubst '$${GERRIT_REPLICATION_INSTANCE}' > ./gerrit-1/etc/replication.config
+	export GERRIT_REPLICATION_INSTANCE=gerrit-1; cat ./gerrit-common/replication.config.template | envsubst '$${GERRIT_REPLICATION_INSTANCE}' > ./gerrit-2/etc/replication.config
+	cp ./gerrit-common/*.config ./gerrit-1/etc
+	cp ./gerrit-common/*.config ./gerrit-2/etc
 	cp ./gerrit-common/git-daemon.sh ./gerrit-1/bin
 	cp ./gerrit-common/git-daemon.sh ./gerrit-2/bin
 
@@ -58,4 +58,12 @@
 	docker-compose down && docker-compose build gerrit-1 && docker-compose build gerrit-2 && docker-compose up -d gerrit-1 && docker-compose up -d gerrit-2
 
 init_all: clean_gerrit download
-		docker-compose down && docker-compose build && docker-compose up -d
+		docker-compose down && docker-compose build && INIT=1 docker-compose up -d
+restart_gerrit_1:
+		cp ./gerrit-common/*.db ./gerrit-1/db
+		-docker-compose kill gerrit-1
+		sleep 3; INIT=0 docker-compose up -d gerrit-1
+restart_gerrit_2:
+		cp ./gerrit-common/*.db ./gerrit-2/db
+		-docker-compose kill gerrit-2
+		sleep 3; INIT=0 docker-compose up -d gerrit-2
diff --git a/dockerised_local_env/README.md b/dockerised_local_env/README.md
index bf7d8e0..099be72 100644
--- a/dockerised_local_env/README.md
+++ b/dockerised_local_env/README.md
@@ -1,5 +1,16 @@
 # Dockerised test environment
 
+## Prerequisites
+
+ * envsubst:
+
+```bash
+brew install gettext
+brew link --force gettext
+```
+
+## Instructions
+
 The docker compose provided in this directory is meant to orchestrate the spin up
 of a dockerised test environment with the latest stable Gerrit version.
 Run it with:
@@ -15,3 +26,10 @@
 ```
 
 *NOTE:* If you want to run any ssh command as admin you can use the ssh keys into the *gerrit-{1,2}/ssh* directory.
+
+If you need to restart one of the Gerrit instances to simulate, for example,
+an upgrade, you can do it this way:
+
+```bash
+make restart_gerrit_1 # (or make restart_gerrit_2)
+```
diff --git a/dockerised_local_env/docker-compose.yaml b/dockerised_local_env/docker-compose.yaml
index 4e66ec9..b15684d 100644
--- a/dockerised_local_env/docker-compose.yaml
+++ b/dockerised_local_env/docker-compose.yaml
@@ -4,33 +4,54 @@
     build: ./gerrit-1
     networks:
       gerrit-net:
+    environment:
+      INIT: ${INIT:-1}
     volumes:
        - ./gerrit-1/git:/var/gerrit/git
        - ./gerrit-1/logs:/var/gerrit/logs
        - ./gerrit-1/ssh:/var/gerrit/.ssh
        - ./gerrit-1/index:/var/gerrit/index
        - ./gerrit-1/data:/var/gerrit/data
+       - ./gerrit-1/etc:/var/gerrit/etc
+       - ./gerrit-1/db:/var/gerrit/db
+       - ./gerrit-1/plugins:/var/gerrit/plugins
+       - ./gerrit-1/lib:/var/gerrit/lib
+       - ./gerrit-1/tmp:/var/gerrit/tmp
+       - ./gerrit-common/shared-dir:/var/gerrit/shared-dir
     ports:
-       - "29418:29418"
-       - "8080:8080"
+       - "39418:29418"
+       - "8081:8080"
     depends_on:
-      - gerrit-2
       - sshd
+      - zookeeper
+      - kafka-broker
+    container_name: gerrit-1
   gerrit-2:
     build: ./gerrit-2
     networks:
       gerrit-net:
+    environment:
+      INIT: ${INIT:-1}
     volumes:
        - ./gerrit-2/git:/var/gerrit/git
        - ./gerrit-2/logs:/var/gerrit/logs
        - ./gerrit-2/ssh:/var/gerrit/.ssh
        - ./gerrit-2/index:/var/gerrit/index
        - ./gerrit-2/data:/var/gerrit/data
+       - ./gerrit-2/etc:/var/gerrit/etc
+       - ./gerrit-2/db:/var/gerrit/db
+       - ./gerrit-2/plugins:/var/gerrit/plugins
+       - ./gerrit-2/lib:/var/gerrit/lib
+       - ./gerrit-2/tmp:/var/gerrit/tmp
+       - ./gerrit-common/shared-dir:/var/gerrit/shared-dir
     ports:
-       - "39418:29418"
-       - "8081:8080"
+       - "49418:29418"
+       - "8082:8080"
     depends_on:
       - sshd
+      - zookeeper
+      - kafka-broker
+    container_name: gerrit-2
   sshd:
     build: ./sshd
     networks:
@@ -39,6 +60,52 @@
        - ./gerrit-2/git:/var/gerrit-2/git
        - ./gerrit-2/ssh:/root/.ssh
        - ./gerrit-1/git:/var/gerrit-1/git
+    container_name: sshd
+  zookeeper:
+    image: wurstmeister/zookeeper:latest
+    networks:
+      gerrit-net:
+    ports:
+      - "2181:2181"
+  kafka-broker:
+    image: wurstmeister/kafka:2.12-2.1.0
+    networks:
+      gerrit-net:
+    ports:
+      - "9092:9092"
+    container_name: kafka-broker
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: kafka-broker
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  haproxy:
+    build: haproxy
+    ports:
+      - "8080:8080"
+      - "29418:29418"
+    networks:
+      gerrit-net:
+    depends_on:
+      - syslog-sidecar
+      - gerrit-1
+      - gerrit-2
+    environment:
+      - SYSLOG_SIDECAR=syslog-sidecar
+      - GERRIT_1=gerrit-1
+      - GERRIT_1_SSH=29418
+      - GERRIT_1_HTTP=8080
+      - GERRIT_2=gerrit-2
+      - GERRIT_2_SSH=29418
+      - GERRIT_2_HTTP=8080
+      - HAPROXY_HTTP_PORT=8080
+      - HAPROXY_SSH_PORT=29418
+  syslog-sidecar:
+    image: balabit/syslog-ng:3.19.1
+    volumes:
+      - "./syslog-sidecar/logs:/var/log/syslog-ng"
+      - "./syslog-sidecar/socket:/var/run/syslog-ng"
+      - "./syslog-sidecar/config/:/etc/syslog-ng"
+    networks:
+      gerrit-net:
 networks:
   gerrit-net:
     driver: bridge
diff --git a/dockerised_local_env/gerrit-1/Dockerfile b/dockerised_local_env/gerrit-1/Dockerfile
index d71b6c2..0842933 100644
--- a/dockerised_local_env/gerrit-1/Dockerfile
+++ b/dockerised_local_env/gerrit-1/Dockerfile
@@ -6,7 +6,7 @@
     apk add --no-cache git-daemon
 
 COPY --chown=gerrit:gerrit bin/release.war /var/gerrit/bin/gerrit.war
-COPY --chown=gerrit:gerrit plugins/* /var/gerrit/plugins/
+COPY --chown=gerrit:gerrit plugins /var/gerrit/plugins
 COPY --chown=gerrit:gerrit etc /var/gerrit/etc
 
 ADD bin/git-daemon.sh /usr/local/bin/git-daemon.sh
diff --git a/dockerised_local_env/gerrit-1/docker-entrypoint.sh b/dockerised_local_env/gerrit-1/docker-entrypoint.sh
index 5429a52..d0eb5f2 100755
--- a/dockerised_local_env/gerrit-1/docker-entrypoint.sh
+++ b/dockerised_local_env/gerrit-1/docker-entrypoint.sh
@@ -1,16 +1,13 @@
 #!/bin/bash -ex
 
-java -Xmx100g -jar /var/gerrit/bin/gerrit.war init -d /var/gerrit --batch --dev --install-all-plugins
-java -Xmx100g -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit --index accounts
-java -Xmx100g -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit --index groups
-
-echo "Gerrit configuration:"
-cat /var/gerrit/etc/gerrit.config
-echo "Replication plugin configuration:"
-cat /var/gerrit/etc/replication.config
-
 echo "Starting git daemon"
 /usr/local/bin/git-daemon.sh &
 
-sed -i -e 's/\-\-console-log//g' /var/gerrit/bin/gerrit.sh
-/var/gerrit/bin/gerrit.sh run
+if [[ $INIT == 1 ]]; then
+  echo "Initializing Gerrit..."
+  java -jar /var/gerrit/bin/gerrit.war init -d /var/gerrit --batch --dev --install-all-plugins --no-auto-start
+  java -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit --index accounts
+  java -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit --index groups
+fi
+
+java -jar /var/gerrit/bin/gerrit.war daemon
diff --git a/dockerised_local_env/gerrit-1/ssh/known_hosts b/dockerised_local_env/gerrit-1/ssh/known_hosts
deleted file mode 100644
index 012fa68..0000000
--- a/dockerised_local_env/gerrit-1/ssh/known_hosts
+++ /dev/null
@@ -1,2 +0,0 @@
-
-sshd,* ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBKZRTdCgJ0FFTpP3ZzRgMMbYgaNmm6PDlg0e9QtOXzCG63GE41EExz2RWw7K9e6eRz+hSVf4hC/KMPgH3Clgo6w=
diff --git a/dockerised_local_env/gerrit-2/docker-entrypoint.sh b/dockerised_local_env/gerrit-2/docker-entrypoint.sh
index 01f983b..532377c 100755
--- a/dockerised_local_env/gerrit-2/docker-entrypoint.sh
+++ b/dockerised_local_env/gerrit-2/docker-entrypoint.sh
@@ -1,19 +1,19 @@
 #!/bin/bash -ex
 
-java -Xmx100g -jar /var/gerrit/bin/gerrit.war init -d /var/gerrit --batch --dev --no-auto-start --install-all-plugins
-#
-echo "Gerrit configuration:"
-cat /var/gerrit/etc/gerrit.config
-echo "Replication plugin configuration:"
-cat /var/gerrit/etc/replication.config
-
-echo "Remove git repos created during init phase"
-rm -fr /var/gerrit/git/*
-
 echo "Starting git daemon"
 /usr/local/bin/git-daemon.sh &
 
-echo "Waiting for initial replication"
-sleep 120
+if [[ $INIT == 1 ]]; then
+  java -jar /var/gerrit/bin/gerrit.war init -d /var/gerrit --batch --dev --no-auto-start --install-all-plugins
 
-java -Xmx100g -jar /var/gerrit/bin/gerrit.war daemon
+  echo "Remove git repos created during init phase"
+  rm -fr /var/gerrit/git/*
+
+  echo "Waiting for gerrit1 server to become available."
+  sleep 120
+  ssh -p 29418 admin@gerrit-1 replication start
+  echo "Waiting for replication to complete."
+  sleep 30
+fi
+
+java -jar /var/gerrit/bin/gerrit.war daemon
diff --git a/dockerised_local_env/gerrit-2/ssh/known_hosts b/dockerised_local_env/gerrit-2/ssh/known_hosts
deleted file mode 100644
index c6f2bdd..0000000
--- a/dockerised_local_env/gerrit-2/ssh/known_hosts
+++ /dev/null
@@ -1,2 +0,0 @@
-[localhost]:39418 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBHAfiYyOe3Df8h+nT1axmB5F4cQQg/qnzvBEJsfKHt3uCYjkOLjjadYjujCnkzb74LToaw4pToTfAnCJ42jw5Bk=
-[gerrit-1]:22 ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBHAfiYyOe3Df8h+nT1axmB5F4cQQg/qnzvBEJsfKHt3uCYjkOLjjadYjujCnkzb74LToaw4pToTfAnCJ42jw5Bk=
diff --git a/dockerised_local_env/gerrit-common/ReviewDB.h2.db b/dockerised_local_env/gerrit-common/ReviewDB.h2.db
new file mode 100644
index 0000000..d23d444
--- /dev/null
+++ b/dockerised_local_env/gerrit-common/ReviewDB.h2.db
Binary files differ
diff --git a/dockerised_local_env/gerrit-common/ReviewDB.trace.db b/dockerised_local_env/gerrit-common/ReviewDB.trace.db
new file mode 100644
index 0000000..52681d8
--- /dev/null
+++ b/dockerised_local_env/gerrit-common/ReviewDB.trace.db
@@ -0,0 +1,3 @@
+11-23 08:24:02 jdbc[3]: exception
+org.h2.jdbc.JdbcSQLException: Table "SCHEMA_VERSION" not found; SQL statement:
+SELECT T.version_nbr,T.singleton FROM schema_version T WHERE T.singleton=? [42102-176]
diff --git a/dockerised_local_env/gerrit-common/gerrit.config b/dockerised_local_env/gerrit-common/gerrit.config
index c21242f..0a47bea 100644
--- a/dockerised_local_env/gerrit-common/gerrit.config
+++ b/dockerised_local_env/gerrit-common/gerrit.config
@@ -2,10 +2,12 @@
 	basePath = git
 	serverId = ff17821f-9571-42df-b690-30660f2d6e20
 	canonicalWebUrl = http://localhost:8080/
+	installModule = com.googlesource.gerrit.plugins.multisite.Module
 [database]
 	type = h2
-	database = /var/gerrit/db-volume-1/db/ReviewDB
+	database = db/ReviewDB
 [noteDb "changes"]
+	autoMigrate = true
 	disableReviewDb = true
 	primaryStorage = note db
 	read = true
@@ -26,9 +28,12 @@
 	smtpServer = localhost
 [sshd]
 	listenAddress = *:29418
+	advertisedAddress = *:29418
 [httpd]
-	listenUrl = http://*:8080/
+	listenUrl = proxy-http://*:8080/
 [cache]
 	directory = cache
 [plugins]
 	allowRemoteAdmin = true
+[plugin "websession-flatfile"]
+    directory = /var/gerrit/shared-dir
diff --git a/dockerised_local_env/gerrit-common/healthcheck.config b/dockerised_local_env/gerrit-common/healthcheck.config
new file mode 100644
index 0000000..849e23f
--- /dev/null
+++ b/dockerised_local_env/gerrit-common/healthcheck.config
@@ -0,0 +1,9 @@
+[healthcheck]
+	timeout = 10s
+
+[healthcheck "querychanges"]
+	# No changes available when Gerrit is installed from scratch
+	enabled = false
+
+[healthcheck "auth"]
+	username = "admin"
\ No newline at end of file
diff --git a/dockerised_local_env/gerrit-common/multi-site.config b/dockerised_local_env/gerrit-common/multi-site.config
new file mode 100644
index 0000000..04b9c2c
--- /dev/null
+++ b/dockerised_local_env/gerrit-common/multi-site.config
@@ -0,0 +1,25 @@
+[index]
+  maxTries = 6
+  retryInterval = 30000
+  numStripedLocks = 100
+
+[kafka]
+	bootstrapServers = kafka-broker:9092
+	securityProtocol = PLAINTEXT
+	indexEventTopic = gerrit_index
+	streamEventTopic = gerrit_stream
+	projectListEventTopic = gerrit_list_project
+	cacheEventTopic = gerrit_cache_eviction
+
+[kafka "subscriber"]
+	enabled = true
+	pollingIntervalMs = 1000
+	KafkaProp-enableAutoCommit = true
+	KafkaProp-autoCommitIntervalMs = 1000
+	KafkaProp-autoOffsetReset = latest
+
+[kafka "publisher"]
+	enabled = true
+
+[ref-database "zookeeper"]
+	connectString = "zookeeper:2181"
diff --git a/dockerised_local_env/gerrit-common/replication.config.template b/dockerised_local_env/gerrit-common/replication.config.template
index 537e8d8..3864e92 100644
--- a/dockerised_local_env/gerrit-common/replication.config.template
+++ b/dockerised_local_env/gerrit-common/replication.config.template
@@ -11,7 +11,7 @@
     replicateHiddenProjects = true
 [gerrit]
     autoReload = true
-    replicateOnStartup = true
+    replicateOnStartup = false
 [replication]
     lockErrorMaxRetries = 5
     maxRetries = 5
diff --git a/dockerised_local_env/haproxy/Dockerfile b/dockerised_local_env/haproxy/Dockerfile
new file mode 100644
index 0000000..dd3f9cd
--- /dev/null
+++ b/dockerised_local_env/haproxy/Dockerfile
@@ -0,0 +1,13 @@
+FROM haproxy:1.9.4
+
+RUN apt-get update && \
+    apt-get -y install gettext-base netcat && \
+    rm -rf /var/lib/apt/lists/* && \
+    mkdir /var/lib/haproxy && \
+    mkdir /var/run/haproxy && \
+    useradd haproxy && \
+    chown haproxy: /var/lib/haproxy /var/run/haproxy
+
+COPY haproxy.cfg /usr/local/etc/haproxy/haproxy.cfg.template
+
+COPY docker-entrypoint.sh /
diff --git a/dockerised_local_env/haproxy/docker-entrypoint.sh b/dockerised_local_env/haproxy/docker-entrypoint.sh
new file mode 100755
index 0000000..b78a994
--- /dev/null
+++ b/dockerised_local_env/haproxy/docker-entrypoint.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+envsubst < /usr/local/etc/haproxy/haproxy.cfg.template > /usr/local/etc/haproxy/haproxy.cfg
+haproxy -f /usr/local/etc/haproxy/haproxy.cfg
diff --git a/dockerised_local_env/haproxy/haproxy.cfg b/dockerised_local_env/haproxy/haproxy.cfg
new file mode 100644
index 0000000..000ed9e
--- /dev/null
+++ b/dockerised_local_env/haproxy/haproxy.cfg
@@ -0,0 +1,63 @@
+global
+    log $SYSLOG_SIDECAR local0
+    maxconn 2048
+
+defaults
+  log     global
+  mode    http
+  option  httplog
+  option  dontlognull
+  timeout connect 5000
+  timeout client  900000
+  timeout server  900000
+  timeout check 30000
+
+frontend haproxynode
+    bind *:$HAPROXY_HTTP_PORT
+    mode http
+    acl redirect_reads url_reg -i git-upload-pack
+    acl redirect_reads url_reg -i clone.bundle
+    acl redirect_writes url_reg -i git-receive-pack
+    use_backend read-backendnodes if redirect_reads
+    use_backend write-backendnodes if redirect_writes
+    default_backend read-backendnodes
+
+frontend git_ssh
+    bind *:$HAPROXY_SSH_PORT
+    option tcplog
+    mode tcp
+    timeout client  5m
+    default_backend ssh
+
+backend read-backendnodes
+    mode http
+    balance source
+    option forwardfor
+    http-request set-header X-Forwarded-Port %[dst_port]
+    default-server inter 10s fall 3 rise 2
+    option httpchk GET /config/server/healthcheck~status HTTP/1.0
+    http-check expect status 200
+    server node1 $GERRIT_1:$GERRIT_1_HTTP check inter 10s
+    server node2 $GERRIT_2:$GERRIT_2_HTTP check inter 10s
+
+backend write-backendnodes
+    mode http
+    balance roundrobin
+    option forwardfor
+    http-request set-header X-Forwarded-Port %[dst_port]
+    default-server inter 10s fall 3 rise 2
+    option httpchk GET /config/server/healthcheck~status HTTP/1.0
+    http-check expect status 200
+    server node1 $GERRIT_1:$GERRIT_1_HTTP check inter 10s
+    server node2 $GERRIT_2:$GERRIT_2_HTTP check inter 10s backup
+
+backend ssh
+    mode tcp
+    option redispatch
+    option httpchk GET /config/server/healthcheck~status HTTP/1.0
+    http-check expect status 200
+    balance source
+    timeout connect 10s
+    timeout server 5m
+    server ssh_node1 $GERRIT_1:$GERRIT_1_SSH check inter 10s check port $GERRIT_1_HTTP inter 10s
+    server ssh_node2 $GERRIT_2:$GERRIT_2_SSH check inter 10s check port $GERRIT_2_HTTP inter 10s backup
diff --git a/dockerised_local_env/syslog-sidecar/config/syslog-ng.conf b/dockerised_local_env/syslog-sidecar/config/syslog-ng.conf
new file mode 100644
index 0000000..bbd27b6
--- /dev/null
+++ b/dockerised_local_env/syslog-sidecar/config/syslog-ng.conf
@@ -0,0 +1,31 @@
+@version: 3.11
+
+options {
+    keep_hostname(yes);
+    create_dirs(yes);
+    ts_format(iso);
+    time_reopen (10);
+    chain_hostnames (no);
+};
+
+source s_net {
+    tcp(
+     ip("0.0.0.0")
+    );
+    udp(
+     ip("0.0.0.0")
+    );
+    syslog(
+     ip("0.0.0.0")
+    );
+    unix-stream("/var/run/lock/syslog-ng.sock");
+};
+
+destination logfiles {
+    file("/var/log/syslog-ng/$PROGRAM.log");
+};
+
+log {
+    source(s_net);
+    destination(logfiles);
+};
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 1c4d0bd..eb53541 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -3,57 +3,50 @@
 def external_plugin_deps():
     maven_jar(
         name = "wiremock",
-        artifact = "com.github.tomakehurst:wiremock-standalone:2.18.0",
-        sha1 = "cf7776dc7a0176d4f4a990155d819279078859f9",
+        artifact = "com.github.tomakehurst:wiremock-standalone:2.23.2",
+        sha1 = "4a920d6c04fd2444c7bc94880adc8313f5b31ba3",
     )
 
-    CURATOR_VER = "4.2.0"
-    CURATOR_TEST_VER = "2.12.0"
-
     maven_jar(
-        name = "kafka_client",
+        name = "kafka-client",
         artifact = "org.apache.kafka:kafka-clients:2.1.0",
         sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
     )
 
     maven_jar(
         name = "testcontainers-kafka",
-        artifact = "org.testcontainers:kafka:1.10.6",
-        sha1 = "5984e31306bd6c84a36092cdd19e0ef7e2268d98",
+        artifact = "org.testcontainers:kafka:1.11.3",
+        sha1 = "932d1baa2541f218b1b44a0546ae83d530011468",
     )
 
-    maven_jar(
-        name = "commons-lang3",
-        artifact = "org.apache.commons:commons-lang3:3.6",
-        sha1 = "9d28a6b23650e8a7e9063c04588ace6cf7012c17",
-    )
+    CURATOR_VER = "4.2.0"
 
     maven_jar(
         name = "curator-test",
-        artifact = "org.apache.curator:curator-test:" + CURATOR_TEST_VER,
-        sha1 = "0a797be57ba95b67688a7615f7ad41ee6b3ceff0"
+        artifact = "org.apache.curator:curator-test:" + CURATOR_VER,
+        sha1 = "98ac2dd69b8c07dcaab5e5473f93fdb9e320cd73",
     )
 
     maven_jar(
         name = "curator-framework",
         artifact = "org.apache.curator:curator-framework:" + CURATOR_VER,
-        sha1 = "5b1cc87e17b8fe4219b057f6025662a693538861"
+        sha1 = "5b1cc87e17b8fe4219b057f6025662a693538861",
     )
 
     maven_jar(
         name = "curator-recipes",
         artifact = "org.apache.curator:curator-recipes:" + CURATOR_VER,
-        sha1 = "7f775be5a7062c2477c51533b9d008f70411ba8e"
+        sha1 = "7f775be5a7062c2477c51533b9d008f70411ba8e",
     )
 
     maven_jar(
         name = "curator-client",
         artifact = "org.apache.curator:curator-client:" + CURATOR_VER,
-        sha1 = "d5d50930b8dd189f92c40258a6ba97675fea3e15"
-        )
+        sha1 = "d5d50930b8dd189f92c40258a6ba97675fea3e15",
+    )
 
     maven_jar(
         name = "zookeeper",
-        artifact = "org.apache.zookeeper:zookeeper:3.4.8",
-        sha1 = "933ea2ed15e6a0e24b788973e3d128ff163c3136"
+        artifact = "org.apache.zookeeper:zookeeper:3.4.14",
+        sha1 = "c114c1e1c8172a7cd3f6ae39209a635f7a06c1a1",
     )
diff --git a/images/architecture-first-iteration.png b/images/architecture-first-iteration.png
index 1a9fe36..841ee40 100644
--- a/images/architecture-first-iteration.png
+++ b/images/architecture-first-iteration.png
Binary files differ
diff --git a/src/main/resources/Documentation/git-replication-healthy.png b/images/git-replication-healthy.png
similarity index 100%
rename from src/main/resources/Documentation/git-replication-healthy.png
rename to images/git-replication-healthy.png
Binary files differ
diff --git a/images/git-replication-split-brain-detected.png b/images/git-replication-split-brain-detected.png
new file mode 100644
index 0000000..a49b9be
--- /dev/null
+++ b/images/git-replication-split-brain-detected.png
Binary files differ
diff --git a/src/main/resources/Documentation/git-replication-split-brain.png b/images/git-replication-split-brain.png
similarity index 100%
rename from src/main/resources/Documentation/git-replication-split-brain.png
rename to images/git-replication-split-brain.png
Binary files differ
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 94fc972..87fb2ea 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -294,7 +294,7 @@
 HA_PROXY_CERTIFICATES_DIR="$HA_PROXY_CONFIG_DIR/certificates"
 
 RELEASE_WAR_FILE_LOCATION=${RELEASE_WAR_FILE_LOCATION:-bazel-bin/release.war}
-MULTISITE_LIB_LOCATION=${MULTISITE_LIB_LOCATION:-bazel-genfiles/plugins/multi-site/multi-site.jar}
+MULTISITE_LIB_LOCATION=${MULTISITE_LIB_LOCATION:-bazel-bin/plugins/multi-site/multi-site.jar}
 
 
 export FAKE_NFS=$COMMON_LOCATION/fake_nfs
@@ -317,11 +317,11 @@
 	cp -f $MULTISITE_LIB_LOCATION $DEPLOYMENT_LOCATION/multi-site.jar  >/dev/null 2>&1 || { echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
 fi
 if [ $DOWNLOAD_WEBSESSION_FLATFILE = "true" ];then
-	echo "Downloading websession-flatfile plugin stable 3.0"
-	wget https://gerrit-ci.gerritforge.com/view/Plugins-stable-3.0/job/plugin-websession-flatfile-bazel-master-stable-3.0/lastSuccessfulBuild/artifact/bazel-genfiles/plugins/websession-flatfile/websession-flatfile.jar \
+	echo "Downloading websession-flatfile plugin master"
+	wget https://gerrit-ci.gerritforge.com/view/Plugins-master/job/plugin-websession-flatfile-bazel-master-master/lastSuccessfulBuild/artifact/bazel-bin/plugins/websession-flatfile/websession-flatfile.jar \
 	-O $DEPLOYMENT_LOCATION/websession-flatfile.jar || { echo >&2 "Cannot download websession-flatfile plugin: Check internet connection. Abort\
 ing"; exit 1; }
-	wget https://gerrit-ci.gerritforge.com/view/Plugins-stable-3.0/job/plugin-healthcheck-bazel-stable-3.0/lastSuccessfulBuild/artifact/bazel-genfiles/plugins/healthcheck/healthcheck.jar \
+	wget https://gerrit-ci.gerritforge.com/view/Plugins-master/job/plugin-healthcheck-bazel-master/lastSuccessfulBuild/artifact/bazel-bin/plugins/healthcheck/healthcheck.jar \
 	-O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
 ing"; exit 1; }
 else
@@ -414,4 +414,4 @@
 echo "$HTTPS_CLONE_MSG"
 echo
 
-exit $?
\ No newline at end of file
+exit $?
diff --git a/src/.DS_Store b/src/.DS_Store
new file mode 100644
index 0000000..d549e6c
--- /dev/null
+++ b/src/.DS_Store
Binary files differ
diff --git a/src/main/.DS_Store b/src/main/.DS_Store
new file mode 100644
index 0000000..c068634
--- /dev/null
+++ b/src/main/.DS_Store
Binary files differ
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 72fec47..7348137 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -14,39 +14,24 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Suppliers.memoize;
 import static com.google.common.base.Suppliers.ofInstance;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
-import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.MultimapBuilder;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement.EnforcePolicy;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.BoundedExponentialBackoffRetry;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -66,27 +51,19 @@
 
   // common parameters to cache and index sections
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
-
   static final int DEFAULT_INDEX_MAX_TRIES = 2;
   static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
-  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
   static final String NUM_STRIPED_LOCKS = "numStripedLocks";
   static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
   static final String ENABLE_KEY = "enabled";
-  static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
-  static final boolean DEFAULT_ENABLE_PROCESSING = true;
-  static final String KAFKA_SECTION = "kafka";
-  public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
 
-  private final Supplier<KafkaPublisher> publisher;
   private final Supplier<Cache> cache;
   private final Supplier<Event> event;
   private final Supplier<Index> index;
-  private final Supplier<KafkaSubscriber> subscriber;
-  private final Supplier<Kafka> kafka;
-  private final Supplier<ZookeeperConfig> zookeeperConfig;
+  private final Supplier<SharedRefDatabase> sharedRefDb;
   private final Supplier<Collection<Message>> replicationConfigValidation;
+  private final Config multiSiteConfig;
 
   @Inject
   Configuration(SitePaths sitePaths) {
@@ -95,27 +72,21 @@
 
   @VisibleForTesting
   public Configuration(Config multiSiteConfig, Config replicationConfig) {
-    Supplier<Config> lazyCfg = lazyLoad(multiSiteConfig);
+    Supplier<Config> lazyMultiSiteCfg = lazyLoad(multiSiteConfig);
+    this.multiSiteConfig = multiSiteConfig;
     replicationConfigValidation = lazyValidateReplicatioConfig(replicationConfig);
-    kafka = memoize(() -> new Kafka(lazyCfg));
-    publisher = memoize(() -> new KafkaPublisher(lazyCfg));
-    subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
-    cache = memoize(() -> new Cache(lazyCfg));
-    event = memoize(() -> new Event(lazyCfg));
-    index = memoize(() -> new Index(lazyCfg));
-    zookeeperConfig = memoize(() -> new ZookeeperConfig(lazyCfg));
+    cache = memoize(() -> new Cache(lazyMultiSiteCfg));
+    event = memoize(() -> new Event(lazyMultiSiteCfg));
+    index = memoize(() -> new Index(lazyMultiSiteCfg));
+    sharedRefDb = memoize(() -> new SharedRefDatabase(lazyMultiSiteCfg));
   }
 
-  public ZookeeperConfig getZookeeperConfig() {
-    return zookeeperConfig.get();
+  public Config getMultiSiteConfig() {
+    return multiSiteConfig;
   }
 
-  public Kafka getKafka() {
-    return kafka.get();
-  }
-
-  public KafkaPublisher kafkaPublisher() {
-    return publisher.get();
+  public SharedRefDatabase getSharedRefDb() {
+    return sharedRefDb.get();
   }
 
   public Cache cache() {
@@ -130,10 +101,6 @@
     return index.get();
   }
 
-  public KafkaSubscriber kafkaSubscriber() {
-    return subscriber.get();
-  }
-
   public Collection<Message> validate() {
     return replicationConfigValidation.get();
   }
@@ -193,210 +160,34 @@
     }
   }
 
-  private static long getLong(
-      Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
-    try {
-      return cfg.get().getLong(section, subSection, name, defaultValue);
-    } catch (IllegalArgumentException e) {
-      log.error("invalid value for {}; using default value {}", name, defaultValue);
-      log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
-      return defaultValue;
-    }
-  }
-
-  private static String getString(
-      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
-    String value = cfg.get().getString(section, subsection, name);
-    if (!Strings.isNullOrEmpty(value)) {
-      return value;
-    }
-    return defaultValue;
-  }
-
-  private static Map<EventFamily, Boolean> eventsEnabled(
-      Supplier<Config> config, String subsection) {
-    Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
-    for (EventFamily eventFamily : EventFamily.values()) {
-      String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
-
-      eventsEnabled.put(
-          eventFamily,
-          config
-              .get()
-              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
-    }
-    return eventsEnabled;
-  }
-
-  private static void applyKafkaConfig(
-      Supplier<Config> configSupplier, String subsectionName, Properties target) {
-    Config config = configSupplier.get();
-    for (String section : config.getSubsections(KAFKA_SECTION)) {
-      if (section.equals(subsectionName)) {
-        for (String name : config.getNames(KAFKA_SECTION, section, true)) {
-          if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
-            Object value = config.getString(KAFKA_SECTION, subsectionName, name);
-            String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
-            String propName =
-                CaseFormat.LOWER_CAMEL
-                    .to(CaseFormat.LOWER_HYPHEN, configProperty)
-                    .replaceAll("-", ".");
-            log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
-            target.put(propName, value);
-          }
-        }
-      }
-    }
-    target.put(
-        "bootstrap.servers",
-        getString(
-            configSupplier,
-            KAFKA_SECTION,
-            null,
-            "bootstrapServers",
-            DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
-  }
-
-  public static class Kafka {
-    private final Map<EventFamily, String> eventTopics;
-    private final String bootstrapServers;
-
-    private static final Map<EventFamily, String> EVENT_TOPICS =
-        ImmutableMap.of(
-            EventFamily.INDEX_EVENT,
-            "GERRIT.EVENT.INDEX",
-            EventFamily.STREAM_EVENT,
-            "GERRIT.EVENT.STREAM",
-            EventFamily.CACHE_EVENT,
-            "GERRIT.EVENT.CACHE",
-            EventFamily.PROJECT_LIST_EVENT,
-            "GERRIT.EVENT.PROJECT.LIST");
-
-    Kafka(Supplier<Config> config) {
-      this.bootstrapServers =
-          getString(
-              config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
-
-      this.eventTopics = new HashMap<>();
-      for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
-        String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
-        eventTopics.put(
-            topicDefault.getKey(),
-            getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
-      }
-    }
-
-    public String getTopic(EventFamily eventType) {
-      return eventTopics.get(eventType);
-    }
-
-    public String getBootstrapServers() {
-      return bootstrapServers;
-    }
-  }
-
-  public static class KafkaPublisher extends Properties {
-    private static final long serialVersionUID = 0L;
-
-    public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
-
-    public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
-    public static final boolean DEFAULT_BROKER_ENABLED = false;
+  public static class SharedRefDatabase {
+    public static final String SECTION = "ref-database";
+    public static final String SUBSECTION_ENFORCEMENT_RULES = "enforcementRules";
 
     private final boolean enabled;
-    private final Map<EventFamily, Boolean> eventsEnabled;
+    private final Multimap<EnforcePolicy, String> enforcementRules;
 
-    private KafkaPublisher(Supplier<Config> cfg) {
-      enabled =
-          cfg.get()
-              .getBoolean(
-                  KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+    private SharedRefDatabase(Supplier<Config> cfg) {
+      enabled = getBoolean(cfg, SECTION, null, ENABLE_KEY, true);
 
-      eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
-
-      if (enabled) {
-        setDefaults();
-        applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
+      enforcementRules = MultimapBuilder.hashKeys().arrayListValues().build();
+      for (EnforcePolicy policy : EnforcePolicy.values()) {
+        enforcementRules.putAll(
+            policy, getList(cfg, SECTION, SUBSECTION_ENFORCEMENT_RULES, policy.name()));
       }
     }
 
-    private void setDefaults() {
-      put("acks", "all");
-      put("retries", 0);
-      put("batch.size", 16384);
-      put("linger.ms", 1);
-      put("buffer.memory", 33554432);
-      put("key.serializer", KAFKA_STRING_SERIALIZER);
-      put("value.serializer", KAFKA_STRING_SERIALIZER);
-      put("reconnect.backoff.ms", 5000L);
-    }
-
-    public boolean enabled() {
+    public boolean isEnabled() {
       return enabled;
     }
 
-    public boolean enabledEvent(EventFamily eventType) {
-      return eventsEnabled.get(eventType);
-    }
-  }
-
-  public class KafkaSubscriber extends Properties {
-    private static final long serialVersionUID = 1L;
-
-    static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
-
-    private final boolean enabled;
-    private final Integer pollingInterval;
-    private Map<EventFamily, Boolean> eventsEnabled;
-    private final Config cfg;
-
-    public KafkaSubscriber(Supplier<Config> configSupplier) {
-      this.cfg = configSupplier.get();
-
-      this.pollingInterval =
-          cfg.getInt(
-              KAFKA_SECTION,
-              KAFKA_SUBSCRIBER_SUBSECTION,
-              "pollingIntervalMs",
-              DEFAULT_POLLING_INTERVAL_MS);
-
-      enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
-
-      eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
-
-      if (enabled) {
-        applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
-      }
+    public Multimap<EnforcePolicy, String> getEnforcementRules() {
+      return enforcementRules;
     }
 
-    public boolean enabled() {
-      return enabled;
-    }
-
-    public boolean enabledEvent(EventFamily eventFamily) {
-      return eventsEnabled.get(eventFamily);
-    }
-
-    public Properties initPropsWith(UUID instanceId) {
-      String groupId =
-          getString(
-              cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
-      this.put("group.id", groupId);
-
-      return this;
-    }
-
-    public Integer getPollingInterval() {
-      return pollingInterval;
-    }
-
-    private String getString(
-        Config cfg, String section, String subsection, String name, String defaultValue) {
-      String value = cfg.getString(section, subsection, name);
-      if (!Strings.isNullOrEmpty(value)) {
-        return value;
-      }
-      return defaultValue;
+    private List<String> getList(
+        Supplier<Config> cfg, String section, String subsection, String name) {
+      return ImmutableList.copyOf(cfg.get().getStringList(section, subsection, name));
     }
   }
 
@@ -487,177 +278,6 @@
     }
   }
 
-  public static class ZookeeperConfig {
-    public static final String SECTION = "ref-database";
-    public static final int defaultSessionTimeoutMs;
-    public static final int defaultConnectionTimeoutMs;
-    public static final String DEFAULT_ZK_CONNECT = "localhost:2181";
-    private final int DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS = 1000;
-    private final int DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS = 3000;
-    private final int DEFAULT_RETRY_POLICY_MAX_RETRIES = 3;
-    private final int DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = 100;
-    private final int DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = 300;
-    private final int DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES = 3;
-    private final int DEFAULT_TRANSACTION_LOCK_TIMEOUT = 1000;
-
-    static {
-      CuratorFrameworkFactory.Builder b = CuratorFrameworkFactory.builder();
-      defaultSessionTimeoutMs = b.getSessionTimeoutMs();
-      defaultConnectionTimeoutMs = b.getConnectionTimeoutMs();
-    }
-
-    public static final String SUBSECTION = "zookeeper";
-    public static final String KEY_CONNECT_STRING = "connectString";
-    public static final String KEY_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
-    public static final String KEY_CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
-    public static final String KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS = "retryPolicyBaseSleepTimeMs";
-    public static final String KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS = "retryPolicyMaxSleepTimeMs";
-    public static final String KEY_RETRY_POLICY_MAX_RETRIES = "retryPolicyMaxRetries";
-    public static final String KEY_LOCK_TIMEOUT_MS = "lockTimeoutMs";
-    public static final String KEY_ROOT_NODE = "rootNode";
-    public final String KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = "casRetryPolicyBaseSleepTimeMs";
-    public final String KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = "casRetryPolicyMaxSleepTimeMs";
-    public final String KEY_CAS_RETRY_POLICY_MAX_RETRIES = "casRetryPolicyMaxRetries";
-    public static final String KEY_MIGRATE = "migrate";
-    public final String TRANSACTION_LOCK_TIMEOUT_KEY = "transactionLockTimeoutMs";
-
-    public static final String SUBSECTION_ENFORCEMENT_RULES = "enforcementRules";
-
-    private final String connectionString;
-    private final String root;
-    private final int sessionTimeoutMs;
-    private final int connectionTimeoutMs;
-    private final int baseSleepTimeMs;
-    private final int maxSleepTimeMs;
-    private final int maxRetries;
-    private final int casBaseSleepTimeMs;
-    private final int casMaxSleepTimeMs;
-    private final int casMaxRetries;
-    private final boolean enabled;
-
-    private final Multimap<EnforcePolicy, String> enforcementRules;
-
-    private final Long transactionLockTimeOut;
-
-    private CuratorFramework build;
-
-    private ZookeeperConfig(Supplier<Config> cfg) {
-      connectionString =
-          getString(cfg, SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
-      root = getString(cfg, SECTION, SUBSECTION, KEY_ROOT_NODE, "gerrit/multi-site");
-      sessionTimeoutMs =
-          getInt(cfg, SECTION, SUBSECTION, KEY_SESSION_TIMEOUT_MS, defaultSessionTimeoutMs);
-      connectionTimeoutMs =
-          getInt(cfg, SECTION, SUBSECTION, KEY_CONNECTION_TIMEOUT_MS, defaultConnectionTimeoutMs);
-
-      baseSleepTimeMs =
-          getInt(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS,
-              DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS);
-
-      maxSleepTimeMs =
-          getInt(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS,
-              DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS);
-
-      maxRetries =
-          getInt(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              KEY_RETRY_POLICY_MAX_RETRIES,
-              DEFAULT_RETRY_POLICY_MAX_RETRIES);
-
-      casBaseSleepTimeMs =
-          getInt(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS,
-              DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS);
-
-      casMaxSleepTimeMs =
-          getInt(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS,
-              DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS);
-
-      casMaxRetries =
-          getInt(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              KEY_CAS_RETRY_POLICY_MAX_RETRIES,
-              DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES);
-
-      transactionLockTimeOut =
-          getLong(
-              cfg,
-              SECTION,
-              SUBSECTION,
-              TRANSACTION_LOCK_TIMEOUT_KEY,
-              DEFAULT_TRANSACTION_LOCK_TIMEOUT);
-
-      checkArgument(StringUtils.isNotEmpty(connectionString), "zookeeper.%s contains no servers");
-
-      enabled = Configuration.getBoolean(cfg, SECTION, null, ENABLE_KEY, true);
-
-      enforcementRules = MultimapBuilder.hashKeys().arrayListValues().build();
-      for (EnforcePolicy policy : EnforcePolicy.values()) {
-        enforcementRules.putAll(
-            policy,
-            Configuration.getList(cfg, SECTION, SUBSECTION_ENFORCEMENT_RULES, policy.name()));
-      }
-    }
-
-    public CuratorFramework buildCurator() {
-      if (build == null) {
-        this.build =
-            CuratorFrameworkFactory.builder()
-                .connectString(connectionString)
-                .sessionTimeoutMs(sessionTimeoutMs)
-                .connectionTimeoutMs(connectionTimeoutMs)
-                .retryPolicy(
-                    new BoundedExponentialBackoffRetry(baseSleepTimeMs, maxSleepTimeMs, maxRetries))
-                .namespace(root)
-                .build();
-        this.build.start();
-      }
-
-      return this.build;
-    }
-
-    public Long getZkInterProcessLockTimeOut() {
-      return transactionLockTimeOut;
-    }
-
-    public RetryPolicy buildCasRetryPolicy() {
-      return new BoundedExponentialBackoffRetry(
-          casBaseSleepTimeMs, casMaxSleepTimeMs, casMaxRetries);
-    }
-
-    public boolean isEnabled() {
-      return enabled;
-    }
-
-    public Multimap<EnforcePolicy, String> getEnforcementRules() {
-      return enforcementRules;
-    }
-  }
-
-  static List<String> getList(
-      Supplier<Config> cfg, String section, String subsection, String name) {
-    return ImmutableList.copyOf(cfg.get().getStringList(section, subsection, name));
-  }
-
   static boolean getBoolean(
       Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
     try {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java
index 83cc99b..e861fd5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java
@@ -17,6 +17,7 @@
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.validation.ValidationModule;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class GitModule extends AbstractModule {
   private final Configuration config;
@@ -28,8 +29,9 @@
 
   @Override
   protected void configure() {
-    if (config.getZookeeperConfig().isEnabled()) {
+    if (config.getSharedRefDb().isEnabled()) {
       install(new ValidationModule(config));
+      install(new ZkValidationModule(config));
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
new file mode 100644
index 0000000..b88699a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
@@ -0,0 +1,308 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.base.Suppliers.ofInstance;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.CaseFormat;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class KafkaConfiguration {
+
+  private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
+  public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+  static final String KAFKA_SECTION = "kafka";
+  static final String ENABLE_KEY = "enabled";
+  static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+  static final boolean DEFAULT_ENABLE_PROCESSING = true;
+  private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
+
+  private final Supplier<KafkaSubscriber> subscriber;
+  private final Supplier<Kafka> kafka;
+  private final Supplier<KafkaPublisher> publisher;
+
+  @Inject
+  KafkaConfiguration(SitePaths sitePaths) {
+    this(getConfigFile(sitePaths, Configuration.MULTI_SITE_CONFIG));
+  }
+
+  @VisibleForTesting
+  public KafkaConfiguration(Config kafkaConfig) {
+    Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
+    kafka = memoize(() -> new Kafka(lazyCfg));
+    publisher = memoize(() -> new KafkaPublisher(lazyCfg));
+    subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
+  }
+
+  private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
+    return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
+  }
+
+  public Kafka getKafka() {
+    return kafka.get();
+  }
+
+  public KafkaSubscriber kafkaSubscriber() {
+    return subscriber.get();
+  }
+
+  private static void applyKafkaConfig(
+      Supplier<Config> configSupplier, String subsectionName, Properties target) {
+    Config config = configSupplier.get();
+    for (String section : config.getSubsections(KAFKA_SECTION)) {
+      if (section.equals(subsectionName)) {
+        for (String name : config.getNames(KAFKA_SECTION, section, true)) {
+          if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
+            Object value = config.getString(KAFKA_SECTION, subsectionName, name);
+            String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
+            String propName =
+                CaseFormat.LOWER_CAMEL
+                    .to(CaseFormat.LOWER_HYPHEN, configProperty)
+                    .replaceAll("-", ".");
+            log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
+            target.put(propName, value);
+          }
+        }
+      }
+    }
+    target.put(
+        "bootstrap.servers",
+        getString(
+            configSupplier,
+            KAFKA_SECTION,
+            null,
+            "bootstrapServers",
+            DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
+  }
+
+  private static String getString(
+      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+    String value = cfg.get().getString(section, subsection, name);
+    if (!Strings.isNullOrEmpty(value)) {
+      return value;
+    }
+    return defaultValue;
+  }
+
+  private static Map<EventFamily, Boolean> eventsEnabled(
+      Supplier<Config> config, String subsection) {
+    Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
+    for (EventFamily eventFamily : EventFamily.values()) {
+      String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
+
+      eventsEnabled.put(
+          eventFamily,
+          config
+              .get()
+              .getBoolean(KAFKA_SECTION, subsection, enabledConfigKey, DEFAULT_ENABLE_PROCESSING));
+    }
+    return eventsEnabled;
+  }
+
+  public KafkaPublisher kafkaPublisher() {
+    return publisher.get();
+  }
+
+  private Supplier<Config> lazyLoad(Config config) {
+    if (config instanceof FileBasedConfig) {
+      return memoize(
+          () -> {
+            FileBasedConfig fileConfig = (FileBasedConfig) config;
+            String fileConfigFileName = fileConfig.getFile().getPath();
+            try {
+              log.info("Loading configuration from {}", fileConfigFileName);
+              fileConfig.load();
+            } catch (IOException | ConfigInvalidException e) {
+              log.error("Unable to load configuration from " + fileConfigFileName, e);
+            }
+            return fileConfig;
+          });
+    }
+    return ofInstance(config);
+  }
+
+  public static class Kafka {
+    private final Map<EventFamily, String> eventTopics;
+    private final String bootstrapServers;
+
+    private static final ImmutableMap<EventFamily, String> EVENT_TOPICS =
+        ImmutableMap.of(
+            EventFamily.INDEX_EVENT,
+            "GERRIT.EVENT.INDEX",
+            EventFamily.STREAM_EVENT,
+            "GERRIT.EVENT.STREAM",
+            EventFamily.CACHE_EVENT,
+            "GERRIT.EVENT.CACHE",
+            EventFamily.PROJECT_LIST_EVENT,
+            "GERRIT.EVENT.PROJECT.LIST");
+
+    Kafka(Supplier<Config> config) {
+      this.bootstrapServers =
+          getString(
+              config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
+
+      this.eventTopics = new HashMap<>();
+      for (Map.Entry<EventFamily, String> topicDefault : EVENT_TOPICS.entrySet()) {
+        String topicConfigKey = topicDefault.getKey().lowerCamelName() + "Topic";
+        eventTopics.put(
+            topicDefault.getKey(),
+            getString(config, KAFKA_SECTION, null, topicConfigKey, topicDefault.getValue()));
+      }
+    }
+
+    public String getTopic(EventFamily eventType) {
+      return eventTopics.get(eventType);
+    }
+
+    public String getBootstrapServers() {
+      return bootstrapServers;
+    }
+
+    private static String getString(
+        Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+      String value = cfg.get().getString(section, subsection, name);
+      if (!Strings.isNullOrEmpty(value)) {
+        return value;
+      }
+      return defaultValue;
+    }
+  }
+
+  public static class KafkaPublisher extends Properties {
+    private static final long serialVersionUID = 0L;
+
+    public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
+
+    public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
+    public static final boolean DEFAULT_BROKER_ENABLED = false;
+
+    private final boolean enabled;
+    private final Map<EventFamily, Boolean> eventsEnabled;
+
+    private KafkaPublisher(Supplier<Config> cfg) {
+      enabled =
+          cfg.get()
+              .getBoolean(
+                  KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, DEFAULT_BROKER_ENABLED);
+
+      eventsEnabled = eventsEnabled(cfg, KAFKA_PUBLISHER_SUBSECTION);
+
+      if (enabled) {
+        setDefaults();
+        applyKafkaConfig(cfg, KAFKA_PUBLISHER_SUBSECTION, this);
+      }
+    }
+
+    private void setDefaults() {
+      put("acks", "all");
+      put("retries", 0);
+      put("batch.size", 16384);
+      put("linger.ms", 1);
+      put("buffer.memory", 33554432);
+      put("key.serializer", KAFKA_STRING_SERIALIZER);
+      put("value.serializer", KAFKA_STRING_SERIALIZER);
+      put("reconnect.backoff.ms", 5000L);
+    }
+
+    public boolean enabled() {
+      return enabled;
+    }
+
+    public boolean enabledEvent(EventFamily eventType) {
+      return eventsEnabled.get(eventType);
+    }
+  }
+
+  public static class KafkaSubscriber extends Properties {
+    private static final long serialVersionUID = 1L;
+
+    static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
+
+    private final boolean enabled;
+    private final Integer pollingInterval;
+    private Map<EventFamily, Boolean> eventsEnabled;
+    private final Config cfg;
+
+    public KafkaSubscriber(Supplier<Config> configSupplier) {
+      this.cfg = configSupplier.get();
+
+      this.pollingInterval =
+          cfg.getInt(
+              KAFKA_SECTION,
+              KAFKA_SUBSCRIBER_SUBSECTION,
+              "pollingIntervalMs",
+              DEFAULT_POLLING_INTERVAL_MS);
+
+      enabled = cfg.getBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+
+      eventsEnabled = eventsEnabled(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION);
+
+      if (enabled) {
+        applyKafkaConfig(configSupplier, KAFKA_SUBSCRIBER_SUBSECTION, this);
+      }
+    }
+
+    public boolean enabled() {
+      return enabled;
+    }
+
+    public boolean enabledEvent(EventFamily eventFamily) {
+      return eventsEnabled.get(eventFamily);
+    }
+
+    public Properties initPropsWith(UUID instanceId) {
+      String groupId =
+          getString(
+              cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
+      this.put("group.id", groupId);
+
+      return this;
+    }
+
+    public Integer getPollingInterval() {
+      return pollingInterval;
+    }
+
+    private String getString(
+        Config cfg, String section, String subsection, String name, String defaultValue) {
+      String value = cfg.getString(section, subsection, name);
+      if (!Strings.isNullOrEmpty(value)) {
+        return value;
+      }
+      return defaultValue;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 6a005e6..b8256a9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -14,6 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.CreationException;
@@ -28,9 +31,9 @@
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
-import java.io.FileReader;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -42,11 +45,31 @@
 
 public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
-  private final Configuration config;
+  private Configuration config;
+  private final boolean disableGitRepositoryValidation;
+  private KafkaConfiguration kafkaConfig;
 
   @Inject
-  public Module(Configuration config) {
+  public Module(Configuration config, KafkaConfiguration kafkaConfig) {
+    this(config, kafkaConfig, false);
+  }
+
+  // TODO: It is not possible to properly test the libModules in Gerrit.
+  // Disable the Git repository validation during integration test and then build the necessary
+  // support
+  // in Gerrit for it.
+  @VisibleForTesting
+  public Module(
+      Configuration config,
+      KafkaConfiguration kafkaConfig,
+      boolean disableGitRepositoryValidation) {
+    init(config, kafkaConfig);
+    this.disableGitRepositoryValidation = disableGitRepositoryValidation;
+  }
+
+  private void init(Configuration config, KafkaConfiguration kafkaConfig) {
     this.config = config;
+    this.kafkaConfig = kafkaConfig;
   }
 
   @Override
@@ -72,12 +95,17 @@
       install(new IndexModule());
     }
 
-    if (config.kafkaSubscriber().enabled()) {
-      install(new KafkaConsumerModule(config.kafkaSubscriber()));
+    if (kafkaConfig.kafkaSubscriber().enabled()) {
+      install(new KafkaConsumerModule(kafkaConfig.kafkaSubscriber()));
       install(new ForwardedEventRouterModule());
     }
-    if (config.kafkaPublisher().enabled()) {
-      install(new BrokerForwarderModule(config.kafkaPublisher()));
+    if (kafkaConfig.kafkaPublisher().enabled()) {
+      install(new BrokerForwarderModule(kafkaConfig.kafkaPublisher()));
+    }
+
+    if (config.getSharedRefDb().isEnabled()) {
+      DynamicSet.bind(binder(), ProjectDeletedListener.class)
+          .to(ProjectDeletedSharedDbCleanup.class);
     }
   }
 
@@ -112,7 +140,7 @@
 
   private UUID tryToLoadSavedInstanceId(String serverIdFile) {
     if (Files.exists(Paths.get(serverIdFile))) {
-      try (BufferedReader br = new BufferedReader(new FileReader(serverIdFile))) {
+      try (BufferedReader br = Files.newBufferedReader(Paths.get(serverIdFile))) {
         return UUID.fromString(br.readLine());
       } catch (IOException e) {
         log.warn(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ZookeeperConfig.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ZookeeperConfig.java
new file mode 100644
index 0000000..35471fa
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ZookeeperConfig.java
@@ -0,0 +1,247 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Suppliers.memoize;
+import static com.google.common.base.Suppliers.ofInstance;
+
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.gerrit.server.config.SitePaths;
+import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperConfig {
+  private static final Logger log = LoggerFactory.getLogger(ZookeeperConfig.class);
+  public static final int defaultSessionTimeoutMs;
+  public static final int defaultConnectionTimeoutMs;
+  public static final String DEFAULT_ZK_CONNECT = "localhost:2181";
+  private final int DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS = 1000;
+  private final int DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS = 3000;
+  private final int DEFAULT_RETRY_POLICY_MAX_RETRIES = 3;
+  private final int DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = 100;
+  private final int DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = 300;
+  private final int DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES = 3;
+  private final int DEFAULT_TRANSACTION_LOCK_TIMEOUT = 1000;
+
+  static {
+    CuratorFrameworkFactory.Builder b = CuratorFrameworkFactory.builder();
+    defaultSessionTimeoutMs = b.getSessionTimeoutMs();
+    defaultConnectionTimeoutMs = b.getConnectionTimeoutMs();
+  }
+
+  public static final String SUBSECTION = "zookeeper";
+  public static final String KEY_CONNECT_STRING = "connectString";
+  public static final String KEY_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+  public static final String KEY_CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
+  public static final String KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS = "retryPolicyBaseSleepTimeMs";
+  public static final String KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS = "retryPolicyMaxSleepTimeMs";
+  public static final String KEY_RETRY_POLICY_MAX_RETRIES = "retryPolicyMaxRetries";
+  public static final String KEY_ROOT_NODE = "rootNode";
+  public final String KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = "casRetryPolicyBaseSleepTimeMs";
+  public final String KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = "casRetryPolicyMaxSleepTimeMs";
+  public final String KEY_CAS_RETRY_POLICY_MAX_RETRIES = "casRetryPolicyMaxRetries";
+  public final String TRANSACTION_LOCK_TIMEOUT_KEY = "transactionLockTimeoutMs";
+
+  private final String connectionString;
+  private final String root;
+  private final int sessionTimeoutMs;
+  private final int connectionTimeoutMs;
+  private final int baseSleepTimeMs;
+  private final int maxSleepTimeMs;
+  private final int maxRetries;
+  private final int casBaseSleepTimeMs;
+  private final int casMaxSleepTimeMs;
+  private final int casMaxRetries;
+
+  public static final String SECTION = "ref-database";
+  private final Long transactionLockTimeOut;
+
+  private CuratorFramework build;
+
+  public ZookeeperConfig(Config zkCfg) {
+    Supplier<Config> lazyZkConfig = lazyLoad(zkCfg);
+    connectionString =
+        getString(lazyZkConfig, SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
+    root = getString(lazyZkConfig, SECTION, SUBSECTION, KEY_ROOT_NODE, "gerrit/multi-site");
+    sessionTimeoutMs =
+        getInt(lazyZkConfig, SECTION, SUBSECTION, KEY_SESSION_TIMEOUT_MS, defaultSessionTimeoutMs);
+    connectionTimeoutMs =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_CONNECTION_TIMEOUT_MS,
+            defaultConnectionTimeoutMs);
+
+    baseSleepTimeMs =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS,
+            DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS);
+
+    maxSleepTimeMs =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS,
+            DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS);
+
+    maxRetries =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_RETRY_POLICY_MAX_RETRIES,
+            DEFAULT_RETRY_POLICY_MAX_RETRIES);
+
+    casBaseSleepTimeMs =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS,
+            DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS);
+
+    casMaxSleepTimeMs =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS,
+            DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS);
+
+    casMaxRetries =
+        getInt(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            KEY_CAS_RETRY_POLICY_MAX_RETRIES,
+            DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES);
+
+    transactionLockTimeOut =
+        getLong(
+            lazyZkConfig,
+            SECTION,
+            SUBSECTION,
+            TRANSACTION_LOCK_TIMEOUT_KEY,
+            DEFAULT_TRANSACTION_LOCK_TIMEOUT);
+
+    checkArgument(StringUtils.isNotEmpty(connectionString), "zookeeper.%s contains no servers");
+  }
+
+  public CuratorFramework buildCurator() {
+    if (build == null) {
+      this.build =
+          CuratorFrameworkFactory.builder()
+              .connectString(connectionString)
+              .sessionTimeoutMs(sessionTimeoutMs)
+              .connectionTimeoutMs(connectionTimeoutMs)
+              .retryPolicy(
+                  new BoundedExponentialBackoffRetry(baseSleepTimeMs, maxSleepTimeMs, maxRetries))
+              .namespace(root)
+              .build();
+      this.build.start();
+    }
+
+    return this.build;
+  }
+
+  public Long getZkInterProcessLockTimeOut() {
+    return transactionLockTimeOut;
+  }
+
+  public RetryPolicy buildCasRetryPolicy() {
+    return new BoundedExponentialBackoffRetry(casBaseSleepTimeMs, casMaxSleepTimeMs, casMaxRetries);
+  }
+
+  private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
+    return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
+  }
+
+  private long getLong(
+      Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
+    try {
+      return cfg.get().getLong(section, subSection, name, defaultValue);
+    } catch (IllegalArgumentException e) {
+      log.error("invalid value for {}; using default value {}", name, defaultValue);
+      log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
+      return defaultValue;
+    }
+  }
+
+  private int getInt(
+      Supplier<Config> cfg, String section, String subSection, String name, int defaultValue) {
+    try {
+      return cfg.get().getInt(section, subSection, name, defaultValue);
+    } catch (IllegalArgumentException e) {
+      log.error("invalid value for {}; using default value {}", name, defaultValue);
+      log.debug("Failed to retrieve integer value: {}", e.getMessage(), e);
+      return defaultValue;
+    }
+  }
+
+  private Supplier<Config> lazyLoad(Config config) {
+    if (config instanceof FileBasedConfig) {
+      return memoize(
+          () -> {
+            FileBasedConfig fileConfig = (FileBasedConfig) config;
+            String fileConfigFileName = fileConfig.getFile().getPath();
+            try {
+              log.info("Loading configuration from {}", fileConfigFileName);
+              fileConfig.load();
+            } catch (IOException | ConfigInvalidException e) {
+              log.error("Unable to load configuration from " + fileConfigFileName, e);
+            }
+            return fileConfig;
+          });
+    }
+    return ofInstance(config);
+  }
+
+  private boolean getBoolean(
+      Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
+    try {
+      return cfg.get().getBoolean(section, subsection, name, defaultValue);
+    } catch (IllegalArgumentException e) {
+      log.error("invalid value for {}; using default value {}", name, defaultValue);
+      log.debug("Failed to retrieve boolean value: {}", e.getMessage(), e);
+      return defaultValue;
+    }
+  }
+
+  private String getString(
+      Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
+    String value = cfg.get().getString(section, subsection, name);
+    if (!Strings.isNullOrEmpty(value)) {
+      return value;
+    }
+    return defaultValue;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java
new file mode 100644
index 0000000..f6be65a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerMetrics.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class BrokerMetrics {
+  private static final String PUBLISHER_SUCCESS_COUNTER = "broker_msg_publisher_counter";
+  private static final String PUBLISHER_FAILURE_COUNTER = "broker_msg_publisher_failure_counter";
+
+  private final Counter1<String> brokerPublisherSuccessCounter;
+  private final Counter1<String> brokerPublisherFailureCounter;
+
+  @Inject
+  public BrokerMetrics(MetricMaker metricMaker) {
+
+    this.brokerPublisherSuccessCounter =
+        metricMaker.newCounter(
+            "multi_site/broker/broker_message_publisher_counter",
+            new Description("Number of messages published by the broker publisher")
+                .setRate()
+                .setUnit("messages"),
+            Field.ofString(PUBLISHER_SUCCESS_COUNTER, "Broker message published count"));
+    this.brokerPublisherFailureCounter =
+        metricMaker.newCounter(
+            "multi_site/broker/broker_message_publisher_failure_counter",
+            new Description("Number of messages failed to publish by the broker publisher")
+                .setRate()
+                .setUnit("errors"),
+            Field.ofString(PUBLISHER_FAILURE_COUNTER, "Broker failed to publish message count"));
+  }
+
+  public void incrementBrokerPublishedMessage() {
+    brokerPublisherSuccessCounter.increment(PUBLISHER_SUCCESS_COUNTER);
+  }
+
+  public void incrementBrokerFailedToPublishMessage() {
+    brokerPublisherFailureCounter.increment(PUBLISHER_FAILURE_COUNTER);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
index 5fe766a..744a558 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
@@ -40,17 +40,20 @@
   private final Gson gson;
   private final UUID instanceId;
   private final MessageLogger msgLog;
+  private final BrokerMetrics brokerMetrics;
 
   @Inject
   public BrokerPublisher(
       BrokerSession session,
       @EventGson Gson gson,
       @InstanceId UUID instanceId,
-      MessageLogger msgLog) {
+      MessageLogger msgLog,
+      BrokerMetrics brokerMetrics) {
     this.session = session;
     this.gson = gson;
     this.instanceId = instanceId;
     this.msgLog = msgLog;
+    this.brokerMetrics = brokerMetrics;
   }
 
   @Override
@@ -74,7 +77,13 @@
 
     SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
     msgLog.log(Direction.PUBLISH, brokerEvent);
-    return session.publishEvent(eventType, getPayload(brokerEvent));
+    Boolean eventPublished = session.publishEvent(eventType, getPayload(brokerEvent));
+    if (eventPublished) {
+      brokerMetrics.incrementBrokerPublishedMessage();
+    } else {
+      brokerMetrics.incrementBrokerFailedToPublishMessage();
+    }
+    return eventPublished;
   }
 
   private String getPayload(SourceAwareEventWrapper event) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 6c3c3cf..7c83346 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -15,8 +15,8 @@
 package com.googlesource.gerrit.plugins.multisite.broker.kafka;
 
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import java.util.UUID;
@@ -31,13 +31,13 @@
 
 public class KafkaSession implements BrokerSession {
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
-  private final Configuration properties;
+  private KafkaConfiguration properties;
   private final UUID instanceId;
   private volatile Producer<String, String> producer;
 
   @Inject
-  public KafkaSession(Configuration configuration, @InstanceId UUID instanceId) {
-    this.properties = configuration;
+  public KafkaSession(KafkaConfiguration kafkaConfig, @InstanceId UUID instanceId) {
+    this.properties = kafkaConfig;
     this.instanceId = instanceId;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 7c6bef3..b7ee20b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -16,7 +16,7 @@
 
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index e8f7d7b..4d171f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -100,8 +100,8 @@
         .map(
             e ->
                 (computedChangeTs.get() > e.eventCreatedOn)
-                    || (computedChangeTs.get() == e.eventCreatedOn)
-                        && (Objects.equals(getBranchTargetSha(), e.targetSha)))
+                    || ((computedChangeTs.get() == e.eventCreatedOn)
+                        && (Objects.equals(getBranchTargetSha(), e.targetSha))))
         .orElse(true);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 56af339..e4d3fff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.extensions.registration.DynamicSet;
@@ -22,8 +24,8 @@
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
@@ -50,12 +52,12 @@
   private final UUID instanceId;
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
-  private final Configuration configuration;
+  private final KafkaConfiguration configuration;
   private final OneOffRequestContext oneOffCtx;
   private final MessageLogger msgLog;
 
   public AbstractKafkaSubcriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ForwardedEventRouter eventRouter,
@@ -133,7 +135,7 @@
       }
     } catch (Exception e) {
       logger.atSevere().withCause(e).log(
-          "Malformed event '%s': [Exception: %s]", new String(consumerRecord.value()));
+          "Malformed event '%s': [Exception: %s]", new String(consumerRecord.value(), UTF_8));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
index 194a49c..a5d4c59 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
@@ -20,8 +20,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
@@ -32,7 +32,7 @@
 public class CacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public CacheEvictionEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 26c4b68..589afce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -20,8 +20,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
@@ -32,7 +32,7 @@
 public class IndexEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public IndexEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       IndexEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 5bc7669..3e0e5d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -17,7 +17,7 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
 import java.util.concurrent.Executor;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 7385b80..0845595 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -20,8 +20,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
@@ -32,7 +32,7 @@
 public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ProjectListUpdateRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
index 09fbe0d..cc638c0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/SourceAwareEventWrapper.java
@@ -14,11 +14,12 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.gerrit.server.events.Event;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import java.util.UUID;
-import org.apache.commons.lang3.Validate;
 
 public class SourceAwareEventWrapper {
 
@@ -67,9 +68,9 @@
     }
 
     public void validate() {
-      Validate.notNull(eventId, "EventId cannot be null");
-      Validate.notNull(eventType, "EventType cannot be null");
-      Validate.notNull(sourceInstanceId, "Source Instance ID cannot be null");
+      requireNonNull(eventId, "EventId cannot be null");
+      requireNonNull(eventType, "EventType cannot be null");
+      requireNonNull(sourceInstanceId, "Source Instance ID cannot be null");
     }
 
     @Override
@@ -94,8 +95,8 @@
   }
 
   public void validate() {
-    Validate.notNull(header, "Header cannot be null");
-    Validate.notNull(body, "Body cannot be null");
+    requireNonNull(header, "Header cannot be null");
+    requireNonNull(body, "Body cannot be null");
     header.validate();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 2e0970f..2a1e155 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -20,8 +20,8 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
@@ -32,7 +32,7 @@
 public class StreamEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public StreamEventSubscriber(
-      Configuration configuration,
+      KafkaConfiguration configuration,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultiSiteGitRepositoryManager.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultiSiteGitRepositoryManager.java
index f609c49..8ce1f59 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultiSiteGitRepositoryManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultiSiteGitRepositoryManager.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.validation;
 
-import com.google.gerrit.reviewdb.client.Project.NameKey;
+import com.google.gerrit.reviewdb.client.Project;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.LocalDiskRepositoryManager;
 import com.google.gerrit.server.git.RepositoryCaseMismatchException;
@@ -39,22 +39,23 @@
   }
 
   @Override
-  public Repository openRepository(NameKey name) throws RepositoryNotFoundException, IOException {
+  public Repository openRepository(Project.NameKey name)
+      throws RepositoryNotFoundException, IOException {
     return wrap(name, gitRepositoryManager.openRepository(name));
   }
 
   @Override
-  public Repository createRepository(NameKey name)
+  public Repository createRepository(Project.NameKey name)
       throws RepositoryCaseMismatchException, RepositoryNotFoundException, IOException {
     return wrap(name, gitRepositoryManager.createRepository(name));
   }
 
   @Override
-  public SortedSet<NameKey> list() {
+  public SortedSet<Project.NameKey> list() {
     return gitRepositoryManager.list();
   }
 
-  private Repository wrap(NameKey projectName, Repository projectRepo) {
+  private Repository wrap(Project.NameKey projectName, Repository projectRepo) {
     return multiSiteRepoFactory.create(projectName.get(), projectRepo);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index 001fe1b..c47b4d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -21,7 +21,6 @@
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.CustomSharedRefEnforcementByProject;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.DefaultSharedRefEnforcement;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class ValidationModule extends FactoryModule {
   private final Configuration cfg;
@@ -40,14 +39,12 @@
     factory(BatchRefUpdateValidator.Factory.class);
 
     bind(GitRepositoryManager.class).to(MultiSiteGitRepositoryManager.class);
-    if (cfg.getZookeeperConfig().getEnforcementRules().isEmpty()) {
+    if (cfg.getSharedRefDb().getEnforcementRules().isEmpty()) {
       bind(SharedRefEnforcement.class).to(DefaultSharedRefEnforcement.class).in(Scopes.SINGLETON);
     } else {
       bind(SharedRefEnforcement.class)
           .to(CustomSharedRefEnforcementByProject.class)
           .in(Scopes.SINGLETON);
     }
-
-    install(new ZkValidationModule(cfg));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/CustomSharedRefEnforcementByProject.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/CustomSharedRefEnforcementByProject.java
index 7a806a8..77a0c0b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/CustomSharedRefEnforcementByProject.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/CustomSharedRefEnforcementByProject.java
@@ -41,7 +41,7 @@
     Map<String, Map<String, EnforcePolicy>> enforcementMap = new HashMap<>();
 
     for (Map.Entry<EnforcePolicy, String> enforcementEntry :
-        config.getZookeeperConfig().getEnforcementRules().entries()) {
+        config.getSharedRefDb().getEnforcementRules().entries()) {
       parseEnforcementEntry(enforcementMap, enforcementEntry);
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/DefaultSharedRefEnforcement.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/DefaultSharedRefEnforcement.java
index 6b495fb..63bab09 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/DefaultSharedRefEnforcement.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/DefaultSharedRefEnforcement.java
@@ -16,11 +16,10 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
-import java.util.Map;
 
 public class DefaultSharedRefEnforcement implements SharedRefEnforcement {
 
-  private static final Map<String, EnforcePolicy> PREDEF_ENFORCEMENTS =
+  private static final ImmutableMap<String, EnforcePolicy> PREDEF_ENFORCEMENTS =
       ImmutableMap.of("All-Users:refs/meta/external-ids", EnforcePolicy.DESIRED);
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
index 3e8f75a..1f41b1f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
@@ -14,34 +14,28 @@
 
 package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
 
-import com.google.gerrit.extensions.events.ProjectDeletedListener;
-import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.inject.AbstractModule;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
+import com.googlesource.gerrit.plugins.multisite.ZookeeperConfig;
 import com.googlesource.gerrit.plugins.multisite.validation.ZkConnectionConfig;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
 import org.apache.curator.framework.CuratorFramework;
 
 public class ZkValidationModule extends AbstractModule {
 
-  private Configuration cfg;
+  private ZookeeperConfig cfg;
 
   public ZkValidationModule(Configuration cfg) {
-    this.cfg = cfg;
+    this.cfg = new ZookeeperConfig(cfg.getMultiSiteConfig());
   }
 
   @Override
   protected void configure() {
     bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
-    bind(CuratorFramework.class).toInstance(cfg.getZookeeperConfig().buildCurator());
+    bind(CuratorFramework.class).toInstance(cfg.buildCurator());
 
     bind(ZkConnectionConfig.class)
         .toInstance(
-            new ZkConnectionConfig(
-                cfg.getZookeeperConfig().buildCasRetryPolicy(),
-                cfg.getZookeeperConfig().getZkInterProcessLockTimeOut()));
-
-    DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ProjectDeletedSharedDbCleanup.class);
+            new ZkConnectionConfig(cfg.buildCasRetryPolicy(), cfg.getZkInterProcessLockTimeOut()));
   }
 }
diff --git a/src/main/resources/.DS_Store b/src/main/resources/.DS_Store
new file mode 100644
index 0000000..d714c4e
--- /dev/null
+++ b/src/main/resources/.DS_Store
Binary files differ
diff --git a/src/main/resources/Documentation/.DS_Store b/src/main/resources/Documentation/.DS_Store
new file mode 100644
index 0000000..2c69d28
--- /dev/null
+++ b/src/main/resources/Documentation/.DS_Store
Binary files differ
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 5c5d75a..83082b2 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -86,3 +86,16 @@
 
 For further information and supported options, refer to [config](config.md)
 documentation.
+
+## Metrics
+
+@PLUGIN@ plugin exposes following metrics:
+
+### Broker message publisher
+* Broker message published count
+
+`metric=multi_site/broker/broker_message_publisher_counter/broker_msg_publisher_counter, type=com.codahale.metrics.Meter`
+
+* Broker failed to publish message count
+
+`metric=multi_site/broker/broker_message_publisher_failure_counter/broker_msg_publisher_failure_counter, type=com.codahale.metrics.Meter`
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
index 1dfda95..9d004c2 100644
--- a/src/main/resources/Documentation/build.md
+++ b/src/main/resources/Documentation/build.md
@@ -20,7 +20,7 @@
 The output is created in
 
 ```
-  bazel-genfiles/@PLUGIN@.jar
+  bazel-bin/@PLUGIN@.jar
 ```
 
 To package the plugin sources run:
@@ -68,7 +68,7 @@
 The output is created in
 
 ```
-  bazel-genfiles/plugins/@PLUGIN@/@PLUGIN@.jar
+  bazel-bin/plugins/@PLUGIN@/@PLUGIN@.jar
 ```
 
 This project can be imported into the Eclipse IDE:
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index be32c27..227d0e8 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -280,6 +280,8 @@
 
     Defaults: 1000
 
+#### Custom kafka properties:
+
 In addition to the above settings, custom Kafka properties can be explicitly set
 for `publisher` and `subscriber`.
 In order to be acknowledged, these properties need to be prefixed with the
diff --git a/src/main/resources/Documentation/git-replication-split-brain-detected.png b/src/main/resources/Documentation/git-replication-split-brain-detected.png
deleted file mode 100644
index dba5a81..0000000
--- a/src/main/resources/Documentation/git-replication-split-brain-detected.png
+++ /dev/null
Binary files differ
diff --git a/src/main/resources/Documentation/sources/architecture-first-iteration.xml b/src/main/resources/Documentation/sources/architecture-first-iteration.xml
new file mode 100644
index 0000000..c3aa4f9
--- /dev/null
+++ b/src/main/resources/Documentation/sources/architecture-first-iteration.xml
@@ -0,0 +1,2 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<mxfile modified="2019-06-07T17:30:04.207Z" host="www.draw.io" agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36" etag="zx6JNhjWm9CAjpC8XxAK" version="10.7.5" type="google"><diagram id="aLS-0ix4CCXe6ffUliya" name="Page-1">7V1bl6M2Ev41/dg+SOIiHvuSmeSczGY2nezsPGIb2yS08WLc051fv8JGGFTCgI0Ad6tfxshcPFTVV6X6SqUb8vD8+jn2Nqsv0dwPb7Axf70hjzcYY0op+ycdeTuMIGTZh5FlHMyzsePAU/CPnw0a2egumPvb0olJFIVJsCkPzqL12p8lpTEvjqMf5dMWUVh+6sZb+mDgaeaFcPRbME9Wh1FqGcfxn/1gueJPRkb2zdSb/b2Mo906e946WvuHb549fpvs1O3Km0c/CkPkpxvyEEdRcvj0/Prgh+l75W/scN2nim/znxz766TJBc7T9u3lP+Zff5KXnx8/mX/e2d//fYsPd3nxwl32KrIfm7zxd7P/3/npTdANuf+xChL/aePN0m9/MHVgY6vkOcy+XgRh+BCFUcyO9++C3M+97Sq//Dl68ab7O6dHsb8N/ikeR4mXFI6ZnvnFY38eFA8zbSiMwHeSvaYXP07818JQ9o4++9Gzn8Rv7JTs21uuk5kuO1x+P46KgSx+0qqgFW425mXKuMzvfZQK+5AJpoWQiBZSjZAQgUJyTImMLKJKSEgLSRBSLgAuJGxDIdlSIZmKhGQCmfhz5giywyhOVtEyWnvhT8fR+6PUjLKE/Ncg+W86PLGyo++Fbx5fsyv2B2/ZwV9+krxl/s/bJREbOj711yjaSKR/g8liscCzGRvfJnH0t1/4Zm5PbeZs02+8OLlLfSH7YhZ6220w48OfgjD/0Wv2Hg+/Gln8+Pv+2MiPj799f/RWPPrqxwEThh9ng0Pp4TbaxTP/hKxpplnsBSz95JQPzLQiVYWTeh37oZcEL+WYQaaj+0uZLLy3wgmbKFgnW6DC+f0vgB5Dgj12mKTi33hr9nmZfv41YgEPO+3TE/+WPa14AjAOFrJs0o+ztzBgNhDXo9b0YCy/TvOBPED6bZewu3C5jh++bKuEXqbtAvSSgpcyB4OAfC4Ar6ZAdA7IjRYSEGkICRw7OoaE59evKQgcdcxyBCUTlOfwP8ouOupPNbTIH+MgWvbEhjBVEM53rVOnsw+HH9AtiMF4aaS+efTqrVxrDXMiKC6y3Ilb+HOc8k0PFgcUuTPlgVOkn+++xtHrG3d107jSy5WVqMbBXYHfMihwVHkSouipRLDpzlO94zB7KPkrt2nsTHDZpgkVpmEV3gjcyzStCXHEm7EJRuEP94sPFsCH37+x4z9ib8GmSaK2MktLykpYVohsPi6ZonthsFynETOT3z5eTu02YHH3XfbFczCf7/VdhjNlG7iy6X6egxGihlLizLAgDmFlOGRDqf+mpd6p1InjTOTR4oByd4Dc7zabMPC3smjA9p5TKayn2/Sf2cpbL/2tVo5uIEHQDVOWTDdIn7pBISb4TDdmXiJTj1wdaqNIrSDnoIdZ1g+LDq4fLtCPr7tpGGxXbPCnF38/zz5DGT6wkLElOAgeVw4mZAwTpjow6FjqVMwQGDB/2m9YgCFFp6XetcdHWJwFDC52DMT++ABz6VrcZ4jbIiVh2zIaHvcpa5gHvE+FyDy18ct6EXtMprtZsot9Lf9O5F/GeIdKjN3q1dhNoAAyKvRfmgNtFp8LaG4hmFtGVp+5ZQwzeVySi2ifhJzlWVv7f7voIFOyWBD2Vxw6iDv0vVSmR1043KRxWC8KLWPi2NnW/Y31mKLDLom2h0x0cR4Q+otEAhVJmppOVXUWrJd/7PPUtyZUlRGoBjKcsqMn0PZlmqHO9G0grw5YB/75UCLjcApCTjsci2v4weGyusKapnRFwxofgdWYez5dSFkNe0b96WJAJKov4mnK2HdfxHOZMjqdKiPXK1TWK2fsenWWFY1XGelgyign2mxIxBPHynm8trwddlzxdqAUUzFRh2F6VpeyXeyrTbccxpk2nKf1WsuGYZZVMzSDTOB4eiYvQBs6Ac9TCO+xeKR9jTa1cx9/8J2kxuX76zl/QKbdbCS79+hquEfjSE0beD7iGvlQ6wIYeLt0SLhbhSttW9xpOgK4U+v0j7NOnq+mupN0Wrx8jlVXxdB1c7OSzWYmVTTYSxCDznw5YkyplYppzLGw3dSE6agmZgSyAU9JlOaDDRh6fPOnTz4D6Wito41OmGDHFApCiIQx6DneIEMj0wjijQYxw4dYM8bnSbWoRmwlgUl/a8ZIM6JET7RbARzF5YU2Jl9ENtREm0C65PEBa2/WibBJee48OPtNYL2zJj8vYbhoubzBQpLCtV7ZT9ItraA5rlEEHU05ru6DjsuUkXaqjJrjGoUyNuW4FEXAIF2FiO1AkgudTXIhw8SQ5RIAWzHLRSD9oYPvy/21WI5oWrDhTK/RN48DNM01uGrwQDzP7kPV6DfvZA6eER9B3inPK5XTSjUuvy5fNVo6qzefiQnwcYTaE3ouoSW7oWlbihgt9jQByCk6/fOQRU9doIbTMmEeRXNa18xpuU05LR6/jWQiZsJVD5rT6i9LRBAgtQYPLq6mA8towaBxrzDeLEu5U0dI1nZJiQdu2zCMTb1OXqDIAVvXouXnRrrvwDp66qRHndEaBx3ENpSshmljG8MGmVU0RV395Ggtzh3bJNOlBuzy5doTdG5m1iWwbZhluHDWqjg7ayoh2c6a13EegesvqtHeJjZXyJ4cUzPHBAr6mHM/2tS8uB32VvlTEXEJPVptVNPS1Tl5viInBFfz6Pz3MM1Yyot1h1/mYUIKjPfhapaa+Lgrr4kB8g1AmI6E5lImS6vTJTsD+7CP2wIWuWIHWEzOXgLD0GRi19+uK8bAEDsNYj6g1MFZsKXU5yD9XY9MBzSInVoK4AoCy0GthqxXh2Kd0jnDotjc8unclKEYxVNiv2sUIyARZNvnL4lnoIhw7e06gjHMu0/mT+oBwiB59AtT6VcGIOx+D95sJeeReC9MjXHV1JDQ3VS61kkCccr2KLM6pYUGhrjxVvyqh7jKaOcMgHMmYqtOeLfO8I0KD+oB3+AKmFYU+EeGL7jcgfYakcE+7k+76XYWB9PUJzGouKgls84b1asAEZbAIB4lD5Y4sjpddTCsD/MRC9QdmQ9zbYd47zlMx6YJ8wNn7jdDsHvyPh15L/CYflIMMFG6j88bpcY/rvMSW9MOnl6w31WS1LcrkqSOOzWGJPrU45YtSS+cGXwTgmTZBTXBN+E9eHpMLtgwP1qdUfBfgllK/mhgOw1sZSoPw47MMlxTllOwYf5ot5l7iVzGgXZdtRJ2DZHgM3nRwWBCht1CtJAva7zI5ZcXOkESt18Rw8m2FvGFk2ckmjFMqPQrYzUr9o0J7aQtlJ6MV+hcbVEa6qnKGvEtNY9NMhtFvq35/IrnqA1W4UxbI+CFy66xGMkgyIH1CoG8MEqckUBBHnEvfRs1bRQkaeMrS/UK+ZJ855eaojJ1glK0Ph6fVYaNr6kMe7zZmfoy7KYNGB01Hq+to7Jx2Wqc01XYYhcKpwe3xp/ZsSEVrOi4tKH/Zeof0ozqrYOMwjosodU4d/VV5iGeT8wT9gGvtuVPqwtOOzM0mAnTocU+6BMEQzDkYvqNLdQsj9eQOGZI7LtjsxykXEMogbJKNEzt+SRbndgMEl1MpE/rDRI7XaGvVzcORXqKhMT5lCcyxC0AlBGeJx6lNuCGbZh1HJB+S4XyT8I34xgsDlCz0lvHAWOOAzrfj+Y8cCJI7porgVO8gGSlmM0iAUSGDgXgWnCNilmSXBAs7/I+FCxSmCP/7MdxurBRZEF+/8bO+2XNoGfNBFRfhV0jzeDZW/p32w17yRmIbQocYZy++/Uy9AvE4RUImJRnv0iy+SYiEvkqFDAsyyotvhfF+DXcLQPJ+AnTNepNd/ySQ2LigpfQ1Vimsv0fKEwxFRezGJ+Yy2KImDKWWmaZFdFmRJZ1wsNfJDNkYCA0IIS04/pmTO8XG+Wrb7LK6Orgvlwcj2xJD0ze87r43qmp6L1T+Nq/7MIkuN0Ge0o/DKZfovkuNZZLLAL2j2B/stDY2P+l36y8+T4qH8SganS03s64XTUzK2VQiAzoxMZjVi3fcmtrc5o5Iqrq7buwVuY+1fc4fQ+Z5xnIqgaZfdaoaL1VDWdGkmA/jKb7jUd+9xe3j/cFkXYbTHw4Ib+VhTmUzF09v+sy4nRccY2sJPTpd4Ln6gles6UWYuoFN0u9qDNNPcFrKzPsQHPrdYLnwg4TGk4vmMCjEeIppPQ0nkqXvIwOUCHtoAG1RmiDIyrViNqlgM0RMhAaUM9aJjo8A6HxtK3MhmcgEJxVACFcPwNhNqIgHPjilVEQCMF185qDqNXS6+EgEAxUxmNYLd/y1XEQ+SJhTUI00tERkxAINp+oJiG0SFuIdCSUA0Iwbvw4SEls2Nyl32IIxDuL60ikieRpheTHG4nwH3BN9lX1lq8vEsknMToSaaKjI45EuNnoSKRrkY4lEsEfIhmCsCWwaJKkfs8xCNbZkBaWdX3ZEHx92ZDKt3yFMYjOhjQR+BVkQ7DOhigS6VhiEAKBse3CY+k6Xcl6XskCZf651QJlY4LscpdPftCmxef17FNXvyF7llGp3zH6cF53C44v1L3Le3JcoHvlbdCNZrqHSnpXuzj+PSlZQx3LuZCh++FxHM2Q1im3q6k9H9EediVAxB4P/DrnNYhw2tnAiJsb19pArtu1VoDH0dvBpoJSu6dbO4jnm24PXWoQubz5yUV+wNGOQIUjUNQT6owdudt5AnBBT67g8pb4Q7uCllZw1a6gcZsfYo7EDISuPXW+AFxg9tH6HhF3EDOo0Gj8foC9AWCridyhZpmWHJE737yh6kGVOi9e0A/08/TOFUP/R5oF4KazANJ5wuVMMzDLWo2NGugXL+Dbfik2A1i7oHdZVsHhVlJHLljwmKWHi1Rtr5ssS4rF/O3WW6YKwUmlS4iH8csEVfjK8loCWVc+S5FMkAOlAqRwoFcb//dbs5y3wnuhHD2K78VCMuJT3XtpUNDRqqdkieQqYYkEt0ZCguXK0YrX7FtUNqwCflqx/9v85shsXoIrY6QnW0gmb7ddjhuwZO9qOfioYywlTa0PknsM4vTi9SL2mKvfzZJdDOt3xhEWnLDraJ3wuP+qVcd2xJbncMtJZEk0R1kogTpost0T1y1mFahhMN8imS0RIiuHqHS79XMb3oSrPltgj4PiwETYxgL3MWOnsDBmFj2nb/xuPX/ykwa4U7Tl/fJ+uBljY8QJvakf3nuzv5d7dZVV0jS2dP4/uzhCSzdfNYQmHIejC/NHt4QI2JJXz/GbRIvF1lfSPRzRESVrGlfMmA4t5mtumWwQrknZyHfGLeJMNRxdgEB8Bd4QCHSZf5F0JHgnoJD3Ye8AFAi1URmxOwEFLOwaSvsDBHItgKDeeLkwlVMIIjlFxS32Otu6R9wdo5/4AmZY3guUdFbnzqAkX0yaCecWdRRg4NJtTSENoRJMBq2JHBmYOD2BidiWitrCyqOuwER8ECZ1REzFL1OLPpK2vO8Ffaoq9M9CH9LVfEZg2wTtUwk3g9aflkuw0Um4KUxmCLKLkxlj4tZVaQwzleFepJ4n7jxqqpjIsMM4ipKiEsXeZvUlmvvpGf8H</diagram></mxfile>
\ No newline at end of file
diff --git a/src/main/resources/Documentation/git-replication-healthy.txt b/src/main/resources/Documentation/sources/git-replication-healthy.txt
similarity index 100%
rename from src/main/resources/Documentation/git-replication-healthy.txt
rename to src/main/resources/Documentation/sources/git-replication-healthy.txt
diff --git a/src/main/resources/Documentation/git-replication-split-brain-detected.txt b/src/main/resources/Documentation/sources/git-replication-split-brain-detected.txt
similarity index 63%
rename from src/main/resources/Documentation/git-replication-split-brain-detected.txt
rename to src/main/resources/Documentation/sources/git-replication-split-brain-detected.txt
index b249b9f..79b0c69 100644
--- a/src/main/resources/Documentation/git-replication-split-brain-detected.txt
+++ b/src/main/resources/Documentation/sources/git-replication-split-brain-detected.txt
@@ -2,16 +2,16 @@
 
 participant Client1 
 participant Instance1
-participant Ref-DB Coordinator
+participant Global Ref-DB
 participant Instance2
 participant Client2
 
 state over Client1, Client2, Instance1, Instance2: W0
 state over Client1 : W0 -> W1
 Client1 -> +Instance1: Push W1
-Instance1 -> +Ref-DB Coordinator: CAS if state == W0 set state W0 -> W1
-state over Ref-DB Coordinator : W0 -> W1
-Ref-DB Coordinator -> -Instance1 : ACK
+Instance1 -> +Global Ref-DB: CAS if state == W0 set state W0 -> W1
+state over Global Ref-DB : W0 -> W1
+Global Ref-DB -> -Instance1 : ACK
 state over Instance1 : W0 -> W1
 Instance1 -> -Client1: Ack W1
 
@@ -20,8 +20,8 @@
 state over Client2 : W0 -> W2
 Client2 -> +Instance2: Push W2
 
-Instance2 -> +Ref-DB Coordinator: CAS if state == W0 set state W0 -> W2
-Ref-DB Coordinator -> -Instance2 : NACK
+Instance2 -> +Global Ref-DB: CAS if state == W0 set state W0 -> W2
+Global Ref-DB -> -Instance2 : NACK
 
 Instance2 -> -Client2 : Push failed -- RO Mode
 
@@ -36,9 +36,9 @@
 Client2 -> Instance2: Pull W1
 state over Client2 : W0 -> W1 -> W2
 Client2 -> Instance2: Push W2
-Instance2 -> +Ref-DB Coordinator: CAS if state == W1 set state W1 -> W2
-state over Ref-DB Coordinator: W0 -> W1 -> W2
-Ref-DB Coordinator -> -Instance2 : ACK
+Instance2 -> +Global Ref-DB: CAS if state == W1 set state W1 -> W2
+state over Global Ref-DB: W0 -> W1 -> W2
+Global Ref-DB -> -Instance2 : ACK
 state over Instance2: W0 -> W1 -> W2 
 Instance2 -> -Client2 : ACK 
 
diff --git a/src/main/resources/Documentation/git-replication-split-brain.txt b/src/main/resources/Documentation/sources/git-replication-split-brain.txt
similarity index 100%
rename from src/main/resources/Documentation/git-replication-split-brain.txt
rename to src/main/resources/Documentation/sources/git-replication-split-brain.txt
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 4ceead9..0863f55 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -18,15 +18,10 @@
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.CACHE_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.PATTERN_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_THREAD_POOL_SIZE;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KAFKA_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
 
 import com.google.common.collect.ImmutableList;
@@ -129,84 +124,6 @@
   }
 
   @Test
-  public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property.equals(kafkaPropertyValue)).isTrue();
-  }
-
-  @Test
-  public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
-    final String kafkaPropertyName = "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property.equals(kafkaPropertyValue)).isTrue();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
-    final String kafkaPropertyName = "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
-  public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
-    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
-    final String kafkaPropertyValue = "aValue";
-    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
-    globalPluginConfig.setString(
-        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
-    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
-    assertThat(property).isNull();
-  }
-
-  @Test
   public void shouldReturnValidationErrorsWhenReplicationOnStartupIsEnabled() throws Exception {
     Config replicationConfig = new Config();
     replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
new file mode 100644
index 0000000..ab0d3bc
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
@@ -0,0 +1,121 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConfigurationTest {
+
+  private Config globalPluginConfig;
+
+  @Before
+  public void setUp() {
+    globalPluginConfig = new Config();
+  }
+
+  private KafkaConfiguration getConfiguration() {
+    return new KafkaConfiguration(globalPluginConfig);
+  }
+
+  @Test
+  public void kafkaSubscriberPropertiesAreSetWhenSectionIsEnabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+    assertThat(property.equals(kafkaPropertyValue)).isTrue();
+  }
+
+  @Test
+  public void kafkaSubscriberPropertiesAreNotSetWhenSectionIsDisabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, false);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+
+  @Test
+  public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
+    final String kafkaPropertyName = "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+
+  @Test
+  public void kafkaPublisherPropertiesAreSetWhenSectionIsEnabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+    assertThat(property.equals(kafkaPropertyValue)).isTrue();
+  }
+
+  @Test
+  public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
+    final String kafkaPropertyName = "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, true);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+
+  @Test
+  public void kafkaPublisherPropertiesAreNotSetWhenSectionIsDisabled() {
+    final String kafkaPropertyName = KAFKA_PROPERTY_PREFIX + "fooBarBaz";
+    final String kafkaPropertyValue = "aValue";
+    globalPluginConfig.setBoolean(KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, ENABLE_KEY, false);
+    globalPluginConfig.setString(
+        KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
+
+    final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
+
+    assertThat(property).isNull();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 5281371..556917b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -36,13 +36,16 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
+  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+  private KafkaConfiguration kafkaConfig;
+
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private Module module;
 
   @Before
   public void setUp() {
-    module = new Module(configMock);
+    module = new Module(configMock, kafkaConfig);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index be5be85..fc14bed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -15,6 +15,10 @@
 package com.googlesource.gerrit.plugins.multisite.broker.kafka;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.google.gerrit.extensions.client.ChangeKind;
 import com.google.gerrit.reviewdb.client.Account;
@@ -23,6 +27,7 @@
 import com.google.gerrit.server.data.AccountAttribute;
 import com.google.gerrit.server.data.ApprovalAttribute;
 import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.util.time.TimeUtil;
 import com.google.gson.Gson;
@@ -30,71 +35,51 @@
 import com.google.gson.JsonObject;
 import com.googlesource.gerrit.plugins.multisite.DisabledMessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class BrokerPublisherTest {
+
+  @Mock private BrokerMetrics brokerMetrics;
+  @Mock private BrokerSession session;
   private BrokerPublisher publisher;
+
   private MessageLogger NO_MSG_LOG = new DisabledMessageLogger();
   private Gson gson = new EventGsonProvider().get();
 
+  private String accountName = "Foo Bar";
+  private String accountEmail = "foo@bar.com";
+  private String accountUsername = "foobar";
+  private String approvalType = ChangeKind.REWORK.toString();
+
+  private String approvalDescription = "ApprovalDescription";
+  private String approvalValue = "+2";
+  private String oldApprovalValue = "+1";
+  private Long approvalGrantedOn = 123L;
+  private String commentDescription = "Patch Set 1: Code-Review+2";
+  private String projectName = "project";
+  private String refName = "refs/heads/master";
+  private String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
+  private Long eventCreatedOn = 123L;
+
   @Before
   public void setUp() {
-    publisher = new BrokerPublisher(new TestBrokerSession(), gson, UUID.randomUUID(), NO_MSG_LOG);
+    publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), NO_MSG_LOG, brokerMetrics);
   }
 
   @Test
   public void shouldSerializeCommentAddedEvent() {
 
-    final String accountName = "Foo Bar";
-    final String accountEmail = "foo@bar.com";
-    final String accountUsername = "foobar";
-    final String approvalType = ChangeKind.REWORK.toString();
-
-    final String approvalDescription = "ApprovalDescription";
-    final String approvalValue = "+2";
-    final String oldApprovalValue = "+1";
-    final Long approvalGrantedOn = 123L;
-    final String commentDescription = "Patch Set 1: Code-Review+2";
-    final String projectName = "project";
-    final String refName = "refs/heads/master";
-    final String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
-    final Long eventCreatedOn = 123L;
-
-    final Change change =
-        new Change(
-            Change.key(changeId),
-            Change.id(1),
-            Account.id(1),
-            BranchNameKey.create(projectName, refName),
-            TimeUtil.nowTs());
-
-    CommentAddedEvent event = new CommentAddedEvent(change);
-    AccountAttribute accountAttribute = new AccountAttribute();
-    accountAttribute.email = accountEmail;
-    accountAttribute.name = accountName;
-    accountAttribute.username = accountUsername;
-
-    event.eventCreatedOn = eventCreatedOn;
-    event.approvals =
-        () -> {
-          ApprovalAttribute approvalAttribute = new ApprovalAttribute();
-          approvalAttribute.value = approvalValue;
-          approvalAttribute.oldValue = oldApprovalValue;
-          approvalAttribute.description = approvalDescription;
-          approvalAttribute.by = accountAttribute;
-          approvalAttribute.type = ChangeKind.REWORK.toString();
-          approvalAttribute.grantedOn = approvalGrantedOn;
-
-          return new ApprovalAttribute[] {approvalAttribute};
-        };
-
-    event.author = () -> accountAttribute;
-    event.comment = commentDescription;
+    Event event = createSampleEvent();
 
     String expectedSerializedCommentEvent =
         "{\"author\": {\"name\": \""
@@ -138,22 +123,55 @@
     assertThat(publisher.eventToJson(event)).isEqualTo(expectedCommentEventJsonObject);
   }
 
-  private class TestBrokerSession implements BrokerSession {
+  @Test
+  public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
+    Event event = createSampleEvent();
+    when(session.publishEvent(any(), any())).thenReturn(true);
+    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
+  }
 
-    @Override
-    public boolean isOpen() {
-      return false;
-    }
+  @Test
+  public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
+    Event event = createSampleEvent();
+    when(session.publishEvent(any(), any())).thenReturn(false);
 
-    @Override
-    public void connect() {}
+    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
+  }
 
-    @Override
-    public void disconnect() {}
+  private Event createSampleEvent() {
+    final Change change =
+        new Change(
+            Change.key(changeId),
+            Change.id(1),
+            Account.id(1),
+            BranchNameKey.create(projectName, refName),
+            TimeUtil.nowTs());
 
-    @Override
-    public boolean publishEvent(EventFamily eventFamily, String payload) {
-      return false;
-    }
+    CommentAddedEvent event = new CommentAddedEvent(change);
+    AccountAttribute accountAttribute = new AccountAttribute();
+    accountAttribute.email = accountEmail;
+    accountAttribute.name = accountName;
+    accountAttribute.username = accountUsername;
+
+    event.eventCreatedOn = eventCreatedOn;
+    event.approvals =
+        () -> {
+          ApprovalAttribute approvalAttribute = new ApprovalAttribute();
+          approvalAttribute.value = approvalValue;
+          approvalAttribute.oldValue = oldApprovalValue;
+          approvalAttribute.description = approvalDescription;
+          approvalAttribute.by = accountAttribute;
+          approvalAttribute.type = ChangeKind.REWORK.toString();
+          approvalAttribute.grantedOn = approvalGrantedOn;
+
+          return new ApprovalAttribute[] {approvalAttribute};
+        };
+
+    event.author = () -> accountAttribute;
+    event.comment = commentDescription;
+
+    return event;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 02d1b7e..60381ae 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -39,6 +39,7 @@
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import java.io.IOException;
@@ -69,7 +70,7 @@
 
   public static class KafkaTestContainerModule extends LifecycleModule {
 
-    public class KafkaStopAtShutdown implements LifecycleListener {
+    public static class KafkaStopAtShutdown implements LifecycleListener {
       private final KafkaContainer kafka;
 
       public KafkaStopAtShutdown(KafkaContainer kafka) {
@@ -95,7 +96,8 @@
       this.config =
           new FileBasedConfig(
               sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
-      this.multiSiteModule = new Module(new Configuration(config, new Config()));
+      this.multiSiteModule =
+          new Module(new Configuration(config, new Config()), new KafkaConfiguration(config), true);
     }
 
     @Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
index 30455b4..e44683d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gson.Gson;
@@ -44,7 +45,8 @@
                 + "\"body\": {}"
                 + "}",
             eventId, eventType, sourceInstanceId, eventCreatedOn);
-    final SourceAwareEventWrapper event = deserializer.deserialize("ignored", eventJson.getBytes());
+    final SourceAwareEventWrapper event =
+        deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
 
     assertThat(event.getBody().entrySet()).isEmpty();
     assertThat(event.getHeader().getEventId()).isEqualTo(eventId);
@@ -55,11 +57,11 @@
 
   @Test(expected = RuntimeException.class)
   public void kafkaEventDeserializerShouldFailForInvalidJson() {
-    deserializer.deserialize("ignored", "this is not a JSON string".getBytes());
+    deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
   }
 
   @Test(expected = RuntimeException.class)
   public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
-    deserializer.deserialize("ignored", "{}".getBytes());
+    deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/CustomSharedRefEnforcementByProjectTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/CustomSharedRefEnforcementByProjectTest.java
index e4d7861..d63436c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/CustomSharedRefEnforcementByProjectTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/CustomSharedRefEnforcementByProjectTest.java
@@ -18,7 +18,7 @@
 import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase.newRef;
 
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.Configuration.ZookeeperConfig;
+import com.googlesource.gerrit.plugins.multisite.Configuration.SharedRefDatabase;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.CustomSharedRefEnforcementByProject;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement.EnforcePolicy;
@@ -34,22 +34,22 @@
 
   @Before
   public void setUp() {
-    Config multiSiteConfig = new Config();
-    multiSiteConfig.setStringList(
-        ZookeeperConfig.SECTION,
-        ZookeeperConfig.SUBSECTION_ENFORCEMENT_RULES,
+    Config sharedRefDbConfig = new Config();
+    sharedRefDbConfig.setStringList(
+        SharedRefDatabase.SECTION,
+        SharedRefDatabase.SUBSECTION_ENFORCEMENT_RULES,
         EnforcePolicy.DESIRED.name(),
         Arrays.asList(
             "ProjectOne",
             "ProjectTwo:refs/heads/master/test",
             "ProjectTwo:refs/heads/master/test2"));
-    multiSiteConfig.setString(
-        ZookeeperConfig.SECTION,
-        ZookeeperConfig.SUBSECTION_ENFORCEMENT_RULES,
+    sharedRefDbConfig.setString(
+        SharedRefDatabase.SECTION,
+        SharedRefDatabase.SUBSECTION_ENFORCEMENT_RULES,
         EnforcePolicy.IGNORED.name(),
         ":refs/heads/master/test");
 
-    refEnforcement = newCustomRefEnforcement(multiSiteConfig);
+    refEnforcement = newCustomRefEnforcement(sharedRefDbConfig);
   }
 
   @Test
@@ -138,18 +138,18 @@
 
   private SharedRefEnforcement newCustomRefEnforcementWithValue(
       EnforcePolicy policy, String... projectAndRefs) {
-    Config multiSiteConfig = new Config();
-    multiSiteConfig.setStringList(
-        ZookeeperConfig.SECTION,
-        ZookeeperConfig.SUBSECTION_ENFORCEMENT_RULES,
+    Config sharedRefDbConfiguration = new Config();
+    sharedRefDbConfiguration.setStringList(
+        SharedRefDatabase.SECTION,
+        SharedRefDatabase.SUBSECTION_ENFORCEMENT_RULES,
         policy.name(),
         Arrays.asList(projectAndRefs));
-    return newCustomRefEnforcement(multiSiteConfig);
+    return newCustomRefEnforcement(sharedRefDbConfiguration);
   }
 
-  private SharedRefEnforcement newCustomRefEnforcement(Config multiSiteConfig) {
+  private SharedRefEnforcement newCustomRefEnforcement(Config sharedRefDbConfig) {
     return new CustomSharedRefEnforcementByProject(
-        new Configuration(multiSiteConfig, new Config()));
+        new Configuration(sharedRefDbConfig, new Config()));
   }
 
   @Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
index 70f6428..3237e3a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
@@ -17,7 +17,7 @@
 import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.pathFor;
 import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.writeObjectId;
 
-import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.ZookeeperConfig;
 import org.apache.curator.framework.CuratorFramework;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
@@ -38,7 +38,7 @@
   }
 
   private ZookeeperContainer container;
-  private Configuration configuration;
+  private ZookeeperConfig configuration;
   private CuratorFramework curator;
 
   public CuratorFramework getCurator() {
@@ -49,32 +49,23 @@
     return container;
   }
 
-  public Configuration getConfig() {
-    return configuration;
-  }
-
   @SuppressWarnings("resource")
   public ZookeeperTestContainerSupport(boolean migrationMode) {
     container = new ZookeeperContainer().withExposedPorts(2181).waitingFor(Wait.forListeningPort());
     container.start();
     Integer zkHostPort = container.getMappedPort(2181);
-    Config splitBrainconfig = new Config();
+    Config sharedRefDbConfig = new Config();
     String connectString = container.getContainerIpAddress() + ":" + zkHostPort;
-    splitBrainconfig.setBoolean("ref-database", null, "enabled", true);
-    splitBrainconfig.setString("ref-database", "zookeeper", "connectString", connectString);
-    splitBrainconfig.setString(
+    sharedRefDbConfig.setBoolean("ref-database", null, "enabled", true);
+    sharedRefDbConfig.setString("ref-database", "zookeeper", "connectString", connectString);
+    sharedRefDbConfig.setString(
         "ref-database",
-        Configuration.ZookeeperConfig.SUBSECTION,
-        Configuration.ZookeeperConfig.KEY_CONNECT_STRING,
+        ZookeeperConfig.SUBSECTION,
+        ZookeeperConfig.KEY_CONNECT_STRING,
         connectString);
-    splitBrainconfig.setBoolean(
-        "ref-database",
-        Configuration.ZookeeperConfig.SUBSECTION,
-        Configuration.ZookeeperConfig.KEY_MIGRATE,
-        migrationMode);
 
-    configuration = new Configuration(splitBrainconfig, new Config());
-    this.curator = configuration.getZookeeperConfig().buildCurator();
+    configuration = new ZookeeperConfig(sharedRefDbConfig);
+    this.curator = configuration.buildCurator();
   }
 
   public void cleanup() {