Implement Zookeeper backend for consistency check

Initial Zookeeper-based SharedDfsStorage implementation based on
the original implementation of Dave Borowitz of zookeeper-refdb
(https://gerrit-review.googlesource.com/c/plugins/zookeeper-refdb/+/35787)

Feature: Issue 10554
Change-Id: Ica87fe2c8a2c4aca5f9331bc0f7dcfb834d94727
diff --git a/BUILD b/BUILD
index 38da77a..e132e67 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,10 @@
     deps = [
         "@commons-lang3//jar",
         "@kafka_client//jar",
+        "@curator-framework//jar",
+        "@curator-recipes//jar",
+        "@curator-client//jar",
+        "@zookeeper//jar",
     ],
 )
 
@@ -46,5 +50,9 @@
         "@kafka_client//jar",
         "@testcontainers-kafka//jar",
         "//lib/testcontainers",
+        "@curator-framework//jar",
+        "@curator-recipes//jar",
+        "@curator-test//jar",
+        "@zookeeper//jar",
     ],
 )
diff --git a/README.md b/README.md
index 345fefb..9f03102 100644
--- a/README.md
+++ b/README.md
@@ -84,6 +84,12 @@
 
 [kafka "subscriber"]
   enabled = true
+
+[split-brain]
+  enabled = true
+
+[split-brain "zookeeper"]
+  connectString = "localhost:2181"
 ```
 
 For more details on the configuration settings, please refer to the
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 678edd5..a8fabbb 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -19,6 +19,8 @@
     )
 
     BYTE_BUDDY_VER = "1.8.15"
+    CURATOR_VER = "4.2.0"
+    CURATOR_TEST_VER = "2.12.0"
 
     maven_jar(
         name = "byte_buddy",
@@ -55,3 +57,33 @@
         artifact = "org.apache.commons:commons-lang3:3.6",
         sha1 = "9d28a6b23650e8a7e9063c04588ace6cf7012c17",
     )
+
+    maven_jar(
+        name = "curator-test",
+        artifact = "org.apache.curator:curator-test:" + CURATOR_TEST_VER,
+        sha1 = "0a797be57ba95b67688a7615f7ad41ee6b3ceff0"
+    )
+
+    maven_jar(
+        name = "curator-framework",
+        artifact = "org.apache.curator:curator-framework:" + CURATOR_VER,
+        sha1 = "5b1cc87e17b8fe4219b057f6025662a693538861"
+    )
+
+    maven_jar(
+        name = "curator-recipes",
+        artifact = "org.apache.curator:curator-recipes:" + CURATOR_VER,
+        sha1 = "7f775be5a7062c2477c51533b9d008f70411ba8e"
+    )
+
+    maven_jar(
+        name = "curator-client",
+        artifact = "org.apache.curator:curator-client:" + CURATOR_VER,
+        sha1 = "d5d50930b8dd189f92c40258a6ba97675fea3e15"
+        )
+
+    maven_jar(
+        name = "zookeeper",
+        artifact = "org.apache.zookeeper:zookeeper:3.4.8",
+        sha1 = "933ea2ed15e6a0e24b788973e3d128ff163c3136"
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index acfaae2..da4ea93 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
@@ -23,6 +25,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,6 +33,10 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.eclipse.jgit.lib.Config;
 import org.slf4j.Logger;
@@ -55,6 +62,7 @@
   static final boolean DEFAULT_ENABLE_PROCESSING = true;
   static final String KAFKA_SECTION = "kafka";
   public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+  public static final Boolean DEFAULT_SPLIT_BRAIN = false;
 
   private final KafkaPublisher publisher;
   private final Cache cache;
@@ -62,6 +70,7 @@
   private final Index index;
   private final KafkaSubscriber subscriber;
   private final Kafka kafka;
+  private final SplitBrain splitBrain;
 
   @Inject
   Configuration(PluginConfigFactory pluginConfigFactory, @PluginName String pluginName) {
@@ -76,6 +85,11 @@
     cache = new Cache(cfg);
     event = new Event(cfg);
     index = new Index(cfg);
+    splitBrain = new SplitBrain(cfg);
+  }
+
+  public SplitBrain getSplitBrain() {
+    return splitBrain;
   }
 
   public Kafka getKafka() {
@@ -102,9 +116,15 @@
     return subscriber;
   }
 
-  private static int getInt(Config cfg, String section, String name, int defaultValue) {
+  private static boolean getBoolean(
+      Config cfg, String section, String subsection, String name, boolean defaultValue) {
+    return cfg.getBoolean(section, subsection, name, defaultValue);
+  }
+
+  private static int getInt(
+      Config cfg, String section, String subSection, String name, int defaultValue) {
     try {
-      return cfg.getInt(section, name, defaultValue);
+      return cfg.getInt(section, subSection, name, defaultValue);
     } catch (IllegalArgumentException e) {
       log.error("invalid value for {}; using default value {}", name, defaultValue);
       log.debug("Failed to retrieve integer value: {}", e.getMessage(), e);
@@ -330,7 +350,8 @@
 
     private Cache(Config cfg) {
       super(cfg, CACHE_SECTION);
-      threadPoolSize = getInt(cfg, CACHE_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
+      threadPoolSize =
+          getInt(cfg, CACHE_SECTION, null, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
       patterns = Arrays.asList(cfg.getStringList(CACHE_SECTION, null, PATTERN_KEY));
     }
 
@@ -364,10 +385,13 @@
 
     private Index(Config cfg) {
       super(cfg, INDEX_SECTION);
-      threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
-      retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
-      maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
-      numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+      threadPoolSize =
+          getInt(cfg, INDEX_SECTION, null, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
+      retryInterval =
+          getInt(cfg, INDEX_SECTION, null, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
+      maxTries = getInt(cfg, INDEX_SECTION, null, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
+      numStripedLocks =
+          getInt(cfg, INDEX_SECTION, null, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
     }
 
     public int threadPoolSize() {
@@ -386,4 +410,135 @@
       return numStripedLocks;
     }
   }
+
+  public static class Zookeeper {
+    public static final int defaultSessionTimeoutMs;
+    public static final int defaultConnectionTimeoutMs;
+    public static final String DEFAULT_ZK_CONNECT = "localhost:2181";
+    private final int DEFAULT_LOCK_TIMEOUT_MS = 10000;
+    private final int DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS = 1000;
+    private final int DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS = 3000;
+    private final int DEFAULT_RETRY_POLICY_MAX_RETRIES = 3;
+
+    static {
+      CuratorFrameworkFactory.Builder b = CuratorFrameworkFactory.builder();
+      defaultSessionTimeoutMs = b.getSessionTimeoutMs();
+      defaultConnectionTimeoutMs = b.getConnectionTimeoutMs();
+    }
+
+    public static final String SUBSECTION = "zookeeper";
+    public static final String KEY_CONNECT_STRING = "connectString";
+    public static final String KEY_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+    public static final String KEY_CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
+    public static final String KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS = "retryPolicyBaseSleepTimeMs";
+    public static final String KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS = "retryPolicyMaxSleepTimeMs";
+    public static final String KEY_RETRY_POLICY_MAX_RETRIES = "retryPolicyMaxRetries";
+    public static final String KEY_LOCK_TIMEOUT_MS = "lockTimeoutMs";
+    public static final String KEY_ROOT_NODE = "rootNode";
+
+    private final String connectionString;
+    private final String root;
+    private final int sessionTimeoutMs;
+    private final int connectionTimeoutMs;
+    private final int baseSleepTimeMs;
+    private final int maxSleepTimeMs;
+    private final int maxRetries;
+    private CuratorFramework build;
+
+    public Duration getLockTimeout() {
+      return lockTimeout;
+    }
+
+    private final Duration lockTimeout;
+
+    private Zookeeper(Config cfg) {
+      connectionString =
+          getString(cfg, SplitBrain.SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
+      root = getString(cfg, SplitBrain.SECTION, SUBSECTION, KEY_ROOT_NODE, "gerrit/multi-site");
+      sessionTimeoutMs =
+          getInt(
+              cfg, SplitBrain.SECTION, SUBSECTION, KEY_SESSION_TIMEOUT_MS, defaultSessionTimeoutMs);
+      connectionTimeoutMs =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_CONNECTION_TIMEOUT_MS,
+              defaultConnectionTimeoutMs);
+
+      baseSleepTimeMs =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS,
+              DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS);
+
+      maxSleepTimeMs =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS,
+              DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS);
+
+      maxRetries =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_RETRY_POLICY_MAX_RETRIES,
+              DEFAULT_RETRY_POLICY_MAX_RETRIES);
+
+      lockTimeout =
+          Duration.ofMillis(
+              getInt(
+                  cfg,
+                  SplitBrain.SECTION,
+                  SUBSECTION,
+                  KEY_LOCK_TIMEOUT_MS,
+                  DEFAULT_LOCK_TIMEOUT_MS));
+
+      checkArgument(StringUtils.isNotEmpty(connectionString), "zookeeper.%s contains no servers");
+    }
+
+    public CuratorFramework buildCurator() {
+      if (build == null) {
+        this.build =
+            CuratorFrameworkFactory.builder()
+                .connectString(connectionString)
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .connectionTimeoutMs(connectionTimeoutMs)
+                .retryPolicy(
+                    new BoundedExponentialBackoffRetry(baseSleepTimeMs, maxSleepTimeMs, maxRetries))
+                .namespace(root)
+                .build();
+        this.build.start();
+      }
+
+      return this.build;
+    }
+  }
+
+  public static class SplitBrain {
+    private final boolean enabled;
+
+    private final Zookeeper zookeeper;
+    static final String SECTION = "split-brain";
+    static final String ENABLED_KEY = "enabled";
+
+    private SplitBrain(Config cfg) {
+
+      this.enabled = getBoolean(cfg, SECTION, null, ENABLED_KEY, DEFAULT_SPLIT_BRAIN);
+      zookeeper = this.enabled ? new Zookeeper(cfg) : null;
+    }
+
+    public boolean enabled() {
+      return enabled;
+    }
+
+    public Zookeeper getZookeeper() {
+      return zookeeper;
+    }
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 0d9ab52..4170239 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -82,7 +82,9 @@
       install(new BrokerForwarderModule(config.kafkaPublisher()));
     }
 
-    install(new ValidationModule());
+    if (config.getSplitBrain().enabled()) {
+      install(new ValidationModule(config));
+    }
 
     bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index f2a6b94..c1b80eb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -17,15 +17,29 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.git.validators.RefOperationValidationListener;
 import com.google.inject.AbstractModule;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoOpDfsRefDatabase;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase;
+import java.time.Duration;
+import org.apache.curator.framework.CuratorFramework;
 
 public class ValidationModule extends AbstractModule {
 
+  private Configuration cfg;
+
+  public ValidationModule(Configuration cfg) {
+    this.cfg = cfg;
+  }
+
   @Override
   protected void configure() {
     DynamicSet.bind(binder(), RefOperationValidationListener.class).to(InSyncChangeValidator.class);
 
-    bind(SharedRefDatabase.class).to(NoOpDfsRefDatabase.class);
+    bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
+    bind(CuratorFramework.class).toInstance(cfg.getSplitBrain().getZookeeper().buildCurator());
+    bind(Duration.class)
+        .annotatedWith(Names.named("ZkLockTimeout"))
+        .toInstance(cfg.getSplitBrain().getZookeeper().getLockTimeout());
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java
index 801fc3b..defa6ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java
@@ -15,17 +15,11 @@
 package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb;
 
 import java.io.IOException;
-import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 
 public class NoOpDfsRefDatabase implements SharedRefDatabase {
 
   @Override
-  public Ref newRef(String refName, ObjectId objectId) {
-    return null;
-  }
-
-  @Override
   public boolean compareAndPut(String project, Ref oldRef, Ref newRef) throws IOException {
     return true;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java
index b995df9..c886751 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java
@@ -16,6 +16,7 @@
 
 import java.io.IOException;
 import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.Ref;
 
 public interface SharedRefDatabase {
@@ -69,14 +70,16 @@
    * @param refName ref name
    * @param objectId object id
    */
-  Ref newRef(String refName, ObjectId objectId);
+  default Ref newRef(String refName, ObjectId objectId) {
+    return new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, refName, objectId);
+  }
 
   /**
    * Utility method for new refs.
    *
    * @param project project name of the ref
    * @param newRef new reference to store.
-   * @return
+   * @return true if the operation was successful; false otherwise.
    * @throws IOException
    */
   default boolean compareAndCreate(String project, Ref newRef) throws IOException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
new file mode 100644
index 0000000..ab34aca
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.google.common.base.Objects;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+
+public class ZkRefInfo {
+
+  private final String refName;
+  private final String projectName;
+  private final ObjectId objectId;
+
+  public ZkRefInfo(final String projectName, final String refName, final ObjectId objectId) {
+    this.projectName = projectName;
+    this.objectId = objectId;
+    this.refName = refName;
+  }
+
+  public ZkRefInfo(final String projectName, final Ref ref) {
+    this(projectName, ref.getName(), ref.getObjectId());
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    ZkRefInfo zkRefInfo = (ZkRefInfo) other;
+    return Objects.equal(refName, zkRefInfo.refName)
+        && Objects.equal(projectName, zkRefInfo.projectName)
+        && Objects.equal(objectId, zkRefInfo.objectId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(refName, projectName, objectId);
+  }
+
+  public String refName() {
+    return refName;
+  }
+
+  public String projectName() {
+    return projectName;
+  }
+
+  public ObjectId objectId() {
+    return objectId;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
new file mode 100644
index 0000000..85d8227
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
@@ -0,0 +1,141 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static org.apache.zookeeper.CreateMode.PERSISTENT;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+
+public class ZkRefInfoDAO {
+  public static final String OBJECT_ID_PATH = "objectId";
+
+  public static String pathFor(ZkRefInfo info) {
+    return "/" + info.projectName() + "/" + info.refName();
+  }
+
+  public static String pathFor(String projectName, Ref ref) {
+    return pathFor(projectName, ref.getName());
+  }
+
+  public static String pathFor(String projectName, String refName) {
+    return "/" + projectName + "/" + refName;
+  }
+
+  private final CuratorFramework client;
+
+  public ZkRefInfoDAO(CuratorFramework client) {
+    this.client = client;
+  }
+
+  public Optional<ZkRefInfo> read(String projectName, String refName) throws Exception {
+    final String rootPath = pathFor(projectName, refName);
+
+    if (!exists(rootPath)) {
+      return Optional.empty();
+    }
+
+    final ObjectId objectId = readObjectIdAt(rootPath + "/" + OBJECT_ID_PATH);
+
+    return Optional.of(new ZkRefInfo(projectName, refName, objectId));
+  }
+
+  public void update(ZkRefInfo info) throws Exception {
+    writeInTransaction(info, () -> client.transactionOp().setData());
+  }
+
+  public void create(ZkRefInfo info) throws Exception {
+    client.createContainers(pathFor(info));
+    writeInTransaction(info, () -> client.transactionOp().create().withMode(PERSISTENT));
+  }
+
+  private void writeInTransaction(
+      ZkRefInfo info, Supplier<PathAndBytesable<CuratorOp>> writeOpBuilderSupplier)
+      throws Exception {
+    String commonPath = pathFor(info);
+    final List<CuratorTransactionResult> curatorTransactionResults =
+        client
+            .transaction()
+            .forOperations(
+                writeOpBuilderSupplier
+                    .get()
+                    .forPath(commonPath + "/" + OBJECT_ID_PATH, writeObjectId(info.objectId())));
+
+    for (CuratorTransactionResult result : curatorTransactionResults) {
+      if (result.getError() != 0)
+        throw new IOException(
+            String.format(
+                "Error with code %d trying to write path %s ",
+                result.getError(), result.getForPath()));
+    }
+  }
+
+  private boolean exists(String path) throws Exception {
+    return client.checkExists().forPath(path) != null;
+  }
+
+  private ObjectId readObjectIdAt(String path) throws Exception {
+    Optional<ObjectId> objectId = parseAt(path, ObjectId::fromRaw);
+    if (!objectId.isPresent()) {
+      throw new CorruptedZkStorageException(
+          String.format("Not able to read objectId from zookeeper path %s", path));
+    }
+    return objectId.get();
+  }
+
+  private byte[] writeObjectId(ObjectId value) throws IOException {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    final DataOutputStream stream = new DataOutputStream(out);
+    value.copyRawTo(stream);
+    return out.toByteArray();
+  }
+
+  private <T> Optional<T> parseAt(String path, Function<byte[], T> parser) throws Exception {
+    if (client.checkExists().forPath(path) == null) return Optional.empty();
+    return Optional.ofNullable(client.getData().forPath(path)).map(parser);
+  }
+
+  static class CorruptedZkStorageException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public CorruptedZkStorageException(String message) {
+      super(message);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
new file mode 100644
index 0000000..7f858c4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
@@ -0,0 +1,172 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import javax.inject.Named;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.Locker;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+
+public class ZkSharedRefDatabase implements SharedRefDatabase {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final CuratorFramework client;
+  private final Duration lockTimeout;
+
+  @Inject
+  public ZkSharedRefDatabase(
+      CuratorFramework client, @Named("ZkLockTimeout") Duration lockTimeout) {
+    this.client = client;
+    this.lockTimeout = lockTimeout;
+  }
+
+  @Override
+  public boolean compareAndRemove(String project, Ref oldRef) throws IOException {
+    return compareAndPut(project, oldRef, TombstoneRef.forRef(oldRef));
+  }
+
+  @Override
+  public boolean compareAndPut(String projectName, Ref oldRef, Ref newRef) throws IOException {
+    boolean isCreate = oldRef == NULL_REF;
+
+    final ZkRefInfoDAO marshaller = new ZkRefInfoDAO(client);
+
+    final InterProcessMutex refPathMutex =
+        new InterProcessMutex(client, "/locks" + ZkRefInfoDAO.pathFor(projectName, newRef));
+
+    try (Locker locker = new Locker(refPathMutex, lockTimeout.toMillis(), MILLISECONDS)) {
+      final Optional<ZkRefInfo> infoCurrentlyInZkMaybe =
+          marshaller.read(projectName, newRef.getName());
+      final ZkRefInfo newRefInfo = new ZkRefInfo(projectName, newRef);
+
+      if (isCreate) {
+        return doCreate(marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+      }
+      return doUpdate(oldRef, marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+
+    } catch (Exception e) {
+      throw new IOException(
+          String.format(
+              "Error trying to perform CAS at path %s", ZkRefInfoDAO.pathFor(projectName, newRef)),
+          e);
+    }
+  }
+
+  private boolean doUpdate(
+      Ref oldRef,
+      ZkRefInfoDAO marshaller,
+      Optional<ZkRefInfo> infoCurrentlyInZkMaybe,
+      ZkRefInfo newRefInfo)
+      throws Exception {
+    if (!infoCurrentlyInZkMaybe.isPresent()) {
+      logger.atWarning().log(
+          "Asked to update ref %s but it is not in ZK at path %s",
+          newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
+      return false;
+    }
+
+    if (!infoCurrentlyInZkMaybe.get().objectId().equals(oldRef.getObjectId())) {
+      logger.atWarning().log(
+          "Old Ref %s does not match the current Rf content in Zookeeper %s. Not applying the update.",
+          oldRef.getObjectId(), infoCurrentlyInZkMaybe.get().objectId());
+      return false;
+    }
+
+    marshaller.update(newRefInfo);
+
+    return true;
+  }
+
+  private boolean doCreate(
+      ZkRefInfoDAO marshaller, Optional<ZkRefInfo> infoCurrentlyInZkMaybe, ZkRefInfo newRefInfo)
+      throws Exception {
+    if (infoCurrentlyInZkMaybe.isPresent()) {
+      logger.atWarning().log(
+          "Asked to create ref %s but it is already in ZK at path %s",
+          newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
+      return false;
+    }
+
+    marshaller.create(newRefInfo);
+
+    return true;
+  }
+
+  /**
+   * When deleting a Ref this temporary Ref Tombstone will be created and then cleaned-up at a later
+   * stage by the garbage collection
+   */
+  static class TombstoneRef implements Ref {
+    static TombstoneRef forRef(final Ref targetRef) {
+      return new TombstoneRef(targetRef.getName());
+    }
+
+    private final String name;
+
+    private TombstoneRef(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public boolean isSymbolic() {
+      return false;
+    }
+
+    @Override
+    public Ref getLeaf() {
+      return null;
+    }
+
+    @Override
+    public Ref getTarget() {
+      return null;
+    }
+
+    @Override
+    public ObjectId getObjectId() {
+      return ObjectId.zeroId();
+    }
+
+    @Override
+    public ObjectId getPeeledObjectId() {
+      return null;
+    }
+
+    @Override
+    public boolean isPeeled() {
+      return false;
+    }
+
+    @Override
+    public Storage getStorage() {
+      return Storage.NETWORK;
+    }
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b10d790..a66383b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -42,6 +42,19 @@
   cacheEventEnabled = true
   projectListEventEnabled = true
   streamEventEnabled = true
+
+[split-brain]
+  enabled = true
+  
+[split-brain "zookeeper"]
+  connectString = "localhost:2181"
+  rootNode = "/gerrit/multi-site"  
+  sessionTimeoutMs = 1000
+  connectionTimeoutMs = 1000
+  retryPolicyBaseSleepTimeMs = 1000
+  retryPolicyMaxSleepTimeMs = 3000
+  retryPolicyMaxRetries = 3
+  lockTimeoutMs = 10000
 ```
 
 ## Configuration parameters
@@ -151,6 +164,41 @@
 :   Polling interval for checking incoming events
     Defaults: 1000
 
+```split-brain.zookeeper.connectString```
+:   Connection string to  zookeeper
+
+```split-brain.zookeeper.rootNode```
+:   Root node to use under Zookeeper to store/retrieve information
+    Defaults: "/gerrit/multi-site"
+
+
+```split-brain.zookeeper.sessionTimeoutMs```
+:   Root node to use under Zookeeper to store/retrieve information
+    Defaults: 1000
+
+```split-brain.zookeeper.connectionTimeoutMs```
+:   Root node to use under Zookeeper to store/retrieve information
+    Defaults: 1000
+
+```split-brain.zookeeper.retryPolicyBaseSleepTimeMs```
+:   Configuration for the base sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Zookeeper connection
+    Defaults: 1000
+
+```split-brain.zookeeper.retryPolicyMaxSleepTimeMs```
+:   Configuration for the max sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Zookeeper connection
+    Defaults: 3000
+
+```split-brain.zookeeper.retryPolicyMaxRetries```
+:   Configuration for the max number of retries to use to create the BoundedExponentialBackoffRetry policy
+used for the Zookeeper connection
+    Defaults: 3
+
+```split-brain.zookeeper.lockTimeoutMs```
+:   Configuration for InterProcessMutex lock timeout
+    Defaults: 10000
+
 #### Custom kafka properties:
 
 In addition to the above settings, custom Kafka properties can be explicitly set for `publisher` and `subscriber`.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index a4bebb7..a066094 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -17,6 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.CACHE_SECTION;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.PATTERN_KEY;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_SPLIT_BRAIN;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_THREAD_POOL_SIZE;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
 import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
@@ -32,6 +33,8 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.gerrit.server.config.PluginConfigFactory;
+import com.googlesource.gerrit.plugins.multisite.Configuration.SplitBrain;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Zookeeper;
 import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,6 +48,7 @@
   private static final String INVALID_INT = "invalidInt";
   private static final String PLUGIN_NAME = "multi-site";
   private static final int THREAD_POOL_SIZE = 1;
+  private static final boolean SPLIT_BRAIN_ENABLED = true;
 
   @Mock private PluginConfigFactory pluginConfigFactoryMock;
   private Config globalPluginConfig;
@@ -96,6 +100,22 @@
   }
 
   @Test
+  public void testGetEnabledSplitBrain() throws Exception {
+    assertThat(getConfiguration().getSplitBrain().enabled()).isEqualTo(DEFAULT_SPLIT_BRAIN);
+
+    globalPluginConfig.setBoolean(
+        SplitBrain.SECTION, null, SplitBrain.ENABLED_KEY, SPLIT_BRAIN_ENABLED);
+    // If split-brain enabled, zookeeper 'connect' is required
+    globalPluginConfig.setString(
+        SplitBrain.SECTION,
+        Zookeeper.SUBSECTION,
+        Zookeeper.KEY_CONNECT_STRING,
+        Zookeeper.DEFAULT_ZK_CONNECT);
+
+    assertThat(getConfiguration().getSplitBrain().enabled()).isEqualTo(SPLIT_BRAIN_ENABLED);
+  }
+
+  @Test
   public void testGetCacheSynchronize() throws Exception {
     assertThat(getConfiguration().cache().synchronize()).isEqualTo(DEFAULT_SYNCHRONIZE);
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 69bd4e9..449c272 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -104,6 +104,7 @@
       config.setString("kafka", null, "bootstrapServers", kafka.getBootstrapServers());
       config.setBoolean("kafka", "publisher", "enabled", true);
       config.setBoolean("kafka", "subscriber", "enabled", true);
+      config.setBoolean("split-brain", null, "enabled", false);
       Configuration multiSiteConfig = new Configuration(config);
       bind(Configuration.class).toInstance(multiSiteConfig);
       install(new Module(multiSiteConfig, noteDb));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
index 0c6833d..43377f8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
@@ -14,34 +14,87 @@
 
 package com.googlesource.gerrit.plugins.multisite.validation;
 
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.LogThreshold;
 import com.google.gerrit.acceptance.NoHttpd;
 import com.google.gerrit.acceptance.PushOneCommit;
 import com.google.gerrit.acceptance.TestPlugin;
-import com.google.inject.AbstractModule;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZookeeperTestContainerSupport;
+import org.junit.Before;
 import org.junit.Test;
 
 @NoHttpd
 @LogThreshold(level = "INFO")
 @TestPlugin(
     name = "multi-site",
-    sysModule = "com.googlesource.gerrit.plugins.multisite.validation.ValidationIT$Module")
+    sysModule =
+        "com.googlesource.gerrit.plugins.multisite.validation.ValidationIT$ZookeeperTestModule")
 public class ValidationIT extends LightweightPluginDaemonTest {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  public static class Module extends AbstractModule {
+  static {
+    System.setProperty("gerrit.notedb", "ON");
+  }
+
+  public static class ZookeeperTestModule extends LifecycleModule {
+
+    public class ZookeeperStopAtShutdown implements LifecycleListener {
+      private final ZookeeperTestContainerSupport zookeeperContainer;
+
+      public ZookeeperStopAtShutdown(ZookeeperTestContainerSupport zk) {
+        this.zookeeperContainer = zk;
+      }
+
+      @Override
+      public void stop() {
+        zookeeperContainer.cleanup();
+      }
+
+      @Override
+      public void start() {
+        // Do nothing
+      }
+    }
+
+    private final NoteDbStatus noteDb;
+
+    @Inject
+    public ZookeeperTestModule(NoteDbStatus noteDb) {
+      this.noteDb = noteDb;
+    }
+
     @Override
     protected void configure() {
-      install(new ValidationModule());
+      ZookeeperTestContainerSupport zookeeperContainer = new ZookeeperTestContainerSupport();
+      Configuration multiSiteConfig = zookeeperContainer.getConfig();
+      bind(Configuration.class).toInstance(multiSiteConfig);
+      install(new Module(multiSiteConfig, noteDb));
+
+      listener().toInstance(new ZookeeperStopAtShutdown(zookeeperContainer));
+    }
+  }
+
+  @Override
+  @Before
+  public void setUpTestPlugin() throws Exception {
+    super.setUpTestPlugin();
+
+    if (!notesMigration.commitChangeWrites()) {
+      throw new IllegalStateException("NoteDb is mandatory for running the multi-site plugin");
     }
   }
 
   @Test
   public void inSyncChangeValidatorShouldAcceptNewChange() throws Exception {
-    final PushOneCommit.Result change = createChange("refs/for/master");
-
+    // FIXME: The code does not work for already existing refs (need a migration step for first
+    // run - T0). Using "refs/heads/master2" in this test for now
+    final PushOneCommit.Result change =
+        createCommitAndPush(testRepo, "refs/heads/master2", "msg", "file", "content");
     change.assertOkStatus();
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefSharedDatabaseTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefSharedDatabaseTest.java
new file mode 100644
index 0000000..e0f0a9f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefSharedDatabaseTest.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.RefFixture;
+import java.io.IOException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Ref.Storage;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class RefSharedDatabaseTest implements RefFixture {
+  @Rule public TestName nameRule = new TestName();
+
+  @Override
+  public String testBranch() {
+    return "branch_" + nameRule.getMethodName();
+  }
+
+  @Test
+  public void shouldCreateANewRef() {
+
+    ObjectId objectId = AN_OBJECT_ID_1;
+    String refName = aBranchRef();
+
+    Ref aNewRef =
+        new SharedRefDatabase() {
+
+          @Override
+          public boolean compareAndRemove(String project, Ref oldRef) throws IOException {
+            return false;
+          }
+
+          @Override
+          public boolean compareAndPut(String project, Ref oldRef, Ref newRef) throws IOException {
+            return false;
+          }
+        }.newRef(refName, objectId);
+
+    assertThat(aNewRef.getName()).isEqualTo(refName);
+    assertThat(aNewRef.getObjectId()).isEqualTo(objectId);
+    assertThat(aNewRef.getStorage()).isEqualTo(Storage.NETWORK);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
new file mode 100644
index 0000000..1e27337
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.google.gerrit.reviewdb.client.RefNames;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Ignore;
+
+@Ignore
+public interface RefFixture {
+
+  static final String ALLOWED_CHARS = "abcdefghilmnopqrstuvz";
+  static final String ALLOWED_DIGITS = "1234567890";
+  static final String ALLOWED_NAME_CHARS =
+      ALLOWED_CHARS + ALLOWED_CHARS.toUpperCase() + ALLOWED_DIGITS;
+  static final String A_TEST_PROJECT_NAME = "A_TEST_PROJECT_NAME";
+  static final ObjectId AN_OBJECT_ID_1 = new ObjectId(1, 2, 3, 4, 5);
+  static final ObjectId AN_OBJECT_ID_2 = new ObjectId(1, 2, 3, 4, 6);
+  static final ObjectId AN_OBJECT_ID_3 = new ObjectId(1, 2, 3, 4, 7);
+
+  default ZkRefInfo aZkRefInfo(ObjectId objectId) {
+    return new ZkRefInfo(A_TEST_PROJECT_NAME, aBranchRef(), objectId);
+  }
+
+  default String aBranchRef() {
+    return RefNames.REFS_HEADS + testBranch();
+  }
+
+  String testBranch();
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
new file mode 100644
index 0000000..bc3323c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
@@ -0,0 +1,141 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.TombstoneRef;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class ZKRefsSharedDatabaseTest implements RefFixture {
+  @Rule public TestName nameRule = new TestName();
+
+  ZookeeperTestContainerSupport zookeeperContainer;
+  ZkSharedRefDatabase zkSharedRefDatabase;
+  ZkRefInfoDAO marshaller;
+
+  @Before
+  public void setup() {
+    zookeeperContainer = new ZookeeperTestContainerSupport();
+    zkSharedRefDatabase =
+        new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
+    marshaller = zookeeperContainer.getMarshaller();
+  }
+
+  @After
+  public void cleanup() {
+    zookeeperContainer.cleanup();
+  }
+
+  @Test
+  public void shouldCompareAndPutSuccessfully() throws Exception {
+    Ref oldRef = refOf(AN_OBJECT_ID_1);
+    Ref newRef = refOf(AN_OBJECT_ID_2);
+    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+    marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+    assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, newRef)).isTrue();
+  }
+
+  @Test
+  public void compareAndPutShouldFailIfTheObjectionHasNotTheExpectedValue() throws Exception {
+    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+    Ref oldRef = refOf(AN_OBJECT_ID_1);
+    Ref expectedRef = refOf(AN_OBJECT_ID_2);
+
+    marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+    assertThat(zkSharedRefDatabase.compareAndPut(projectName, expectedRef, refOf(AN_OBJECT_ID_3)))
+        .isFalse();
+  }
+
+  @Test
+  public void compareAndPutShouldFaiIfTheObjectionDoesNotExist() throws IOException {
+    Ref oldRef = refOf(AN_OBJECT_ID_1);
+    assertThat(
+            zkSharedRefDatabase.compareAndPut(
+                RefFixture.A_TEST_PROJECT_NAME, oldRef, refOf(AN_OBJECT_ID_2)))
+        .isFalse();
+  }
+
+  @Test
+  public void shouldCompareAndRemoveSuccessfully() throws Exception {
+    Ref oldRef = refOf(AN_OBJECT_ID_1);
+    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+    marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+    assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
+  }
+
+  @Test
+  public void shouldReplaceTheRefWithATombstoneAfterCompareAndPutRemove() throws Exception {
+    Ref oldRef = refOf(AN_OBJECT_ID_1);
+    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+    marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+    assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
+
+    Optional<ZkRefInfo> inZk = marshaller.read(projectName, oldRef.getName());
+    assertThat(inZk.isPresent()).isTrue();
+    inZk.get().equals(TombstoneRef.forRef(oldRef));
+  }
+
+  @Test
+  public void shouldNotCompareAndPutSuccessfullyAfterACompareAndRemove() throws Exception {
+    Ref oldRef = refOf(AN_OBJECT_ID_1);
+    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+    marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+    zkSharedRefDatabase.compareAndRemove(projectName, oldRef);
+    assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, refOf(AN_OBJECT_ID_2)))
+        .isFalse();
+  }
+
+  private Ref refOf(ObjectId objectId) {
+    return zkSharedRefDatabase.newRef(aBranchRef(), objectId);
+  }
+
+  @Override
+  public String testBranch() {
+    return "branch_" + nameRule.getMethodName();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
new file mode 100644
index 0000000..e50cd9b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.hamcrest.CoreMatchers.nullValue;
+
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkRefInfoDAO.CorruptedZkStorageException;
+import java.time.Duration;
+import java.util.Optional;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+
+public class ZkRefInfoMarshallerTest implements RefFixture {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Rule public TestName nameRule = new TestName();
+
+  ZookeeperTestContainerSupport zookeeperContainer;
+  ZkSharedRefDatabase zkSharedRefDatabase;
+  ZkRefInfoDAO marshaller;
+  CuratorFramework curator;
+
+  @Before
+  public void setup() {
+    zookeeperContainer = new ZookeeperTestContainerSupport();
+    zkSharedRefDatabase =
+        new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
+    marshaller = zookeeperContainer.getMarshaller();
+    curator = zookeeperContainer.getCurator();
+  }
+
+  @After
+  public void cleanup() {
+    zookeeperContainer.cleanup();
+  }
+
+  @Test
+  public void shouldCreateAZRefInfo() throws Exception {
+    ZkRefInfo refInfo = aZkRefInfo(AN_OBJECT_ID_1);
+
+    marshaller.create(refInfo);
+
+    Optional<ZkRefInfo> readRefInfo = marshaller.read(refInfo.projectName(), refInfo.refName());
+
+    assertThat(readRefInfo).isEqualTo(Optional.of(refInfo));
+  }
+
+  @Test
+  public void shouldReturnEmptyIfARefDoesNotExist() throws Exception {
+    assertThat(marshaller.read(A_TEST_PROJECT_NAME, aBranchRef())).isEqualTo(Optional.empty());
+  }
+
+  @Test
+  public void shouldUpdateAZrefInfo() throws Exception {
+    ZkRefInfo newRefInfo = aZkRefInfo(AN_OBJECT_ID_1);
+    ZkRefInfo updateRefInfo =
+        new ZkRefInfo(newRefInfo.projectName(), newRefInfo.refName(), AN_OBJECT_ID_2);
+
+    marshaller.create(newRefInfo);
+    marshaller.update(updateRefInfo);
+
+    Optional<ZkRefInfo> readUpdatedRefInfo =
+        marshaller.read(updateRefInfo.projectName(), updateRefInfo.refName());
+
+    assertThat(readUpdatedRefInfo).isEqualTo(Optional.of(updateRefInfo));
+  }
+
+  @Test
+  public void shouldFailToReadZkRefInfoIfSomeOfTheInfoIsMissing() throws Exception {
+    String projectName = A_TEST_PROJECT_NAME;
+    String refName = aBranchRef();
+
+    curator.createContainers(ZkRefInfoDAO.pathFor(projectName, refName));
+
+    expectedException.expect(CorruptedZkStorageException.class);
+    expectedException.expectCause(nullValue(Exception.class));
+
+    marshaller.read(projectName, refName);
+  }
+
+  @Override
+  public String testBranch() {
+    return "branch_" + nameRule.getMethodName();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
new file mode 100644
index 0000000..dff2aab
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import org.apache.curator.framework.CuratorFramework;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Ignore;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+@Ignore
+public class ZookeeperTestContainerSupport {
+
+  static class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
+    public static String ZOOKEEPER_VERSION = "3.4.13";
+
+    public ZookeeperContainer() {
+      super("zookeeper:" + ZOOKEEPER_VERSION);
+    }
+  }
+
+  private ZookeeperContainer container;
+  private ZkRefInfoDAO marshaller;
+  private Configuration configuration;
+  private CuratorFramework curator;
+
+  public CuratorFramework getCurator() {
+    return curator;
+  }
+
+  public ZookeeperContainer getContainer() {
+    return container;
+  }
+
+  public ZkRefInfoDAO getMarshaller() {
+    return marshaller;
+  }
+
+  public Configuration getConfig() {
+    return configuration;
+  }
+
+  public ZookeeperTestContainerSupport() {
+    container = new ZookeeperContainer();
+    container.addExposedPorts(2181);
+    container.waitingFor(Wait.forListeningPort());
+    container.start();
+    Integer zkHostPort = container.getMappedPort(2181);
+    Config splitBrainconfig = new Config();
+    String connectString = "localhost:" + zkHostPort;
+    splitBrainconfig.setBoolean("split-brain", null, "enabled", true);
+    splitBrainconfig.setString("split-brain", "zookeeper", "connectString", connectString);
+
+    configuration = new Configuration(splitBrainconfig);
+    this.curator = configuration.getSplitBrain().getZookeeper().buildCurator();
+
+    this.marshaller = new ZkRefInfoDAO(this.curator);
+  }
+
+  public void cleanup() {
+    this.curator.delete();
+    this.container.stop();
+  }
+}