Simplify the Zookeeper-based SharedDB code

Store the ObjectId instead of a complex object, massively
reduce the amount of code needed.

Feature: Issue 10554
Change-Id: I050d85584e0970f18e09c3107ca4e392affb6027
diff --git a/BUILD b/BUILD
index e132e67..c2c508b 100644
--- a/BUILD
+++ b/BUILD
@@ -53,6 +53,7 @@
         "@curator-framework//jar",
         "@curator-recipes//jar",
         "@curator-test//jar",
+        "@curator-client//jar",
         "@zookeeper//jar",
     ],
 )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index da4ea93..8bb2a8b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -25,7 +25,6 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,6 +33,7 @@
 import java.util.Properties;
 import java.util.UUID;
 import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.BoundedExponentialBackoffRetry;
@@ -419,6 +419,9 @@
     private final int DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS = 1000;
     private final int DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS = 3000;
     private final int DEFAULT_RETRY_POLICY_MAX_RETRIES = 3;
+    private final int DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = 100;
+    private final int DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = 300;
+    private final int DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES = 3;
 
     static {
       CuratorFrameworkFactory.Builder b = CuratorFrameworkFactory.builder();
@@ -435,6 +438,9 @@
     public static final String KEY_RETRY_POLICY_MAX_RETRIES = "retryPolicyMaxRetries";
     public static final String KEY_LOCK_TIMEOUT_MS = "lockTimeoutMs";
     public static final String KEY_ROOT_NODE = "rootNode";
+    public final String KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = "casRetryPolicyBaseSleepTimeMs";
+    public final String KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = "casRetryPolicyMaxSleepTimeMs";
+    public final String KEY_CAS_RETRY_POLICY_MAX_RETRIES = "casRetryPolicyMaxRetries";
 
     private final String connectionString;
     private final String root;
@@ -443,14 +449,12 @@
     private final int baseSleepTimeMs;
     private final int maxSleepTimeMs;
     private final int maxRetries;
+    private final int casBaseSleepTimeMs;
+    private final int casMaxSleepTimeMs;
+    private final int casMaxRetries;
+
     private CuratorFramework build;
 
-    public Duration getLockTimeout() {
-      return lockTimeout;
-    }
-
-    private final Duration lockTimeout;
-
     private Zookeeper(Config cfg) {
       connectionString =
           getString(cfg, SplitBrain.SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
@@ -490,14 +494,29 @@
               KEY_RETRY_POLICY_MAX_RETRIES,
               DEFAULT_RETRY_POLICY_MAX_RETRIES);
 
-      lockTimeout =
-          Duration.ofMillis(
-              getInt(
-                  cfg,
-                  SplitBrain.SECTION,
-                  SUBSECTION,
-                  KEY_LOCK_TIMEOUT_MS,
-                  DEFAULT_LOCK_TIMEOUT_MS));
+      casBaseSleepTimeMs =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS,
+              DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS);
+
+      casMaxSleepTimeMs =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS,
+              DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS);
+
+      casMaxRetries =
+          getInt(
+              cfg,
+              SplitBrain.SECTION,
+              SUBSECTION,
+              KEY_CAS_RETRY_POLICY_MAX_RETRIES,
+              DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES);
 
       checkArgument(StringUtils.isNotEmpty(connectionString), "zookeeper.%s contains no servers");
     }
@@ -518,6 +537,11 @@
 
       return this.build;
     }
+
+    public RetryPolicy buildCasRetryPolicy() {
+      return new BoundedExponentialBackoffRetry(
+          casBaseSleepTimeMs, casMaxSleepTimeMs, casMaxRetries);
+    }
   }
 
   public static class SplitBrain {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index c1b80eb..b3f99a6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -17,12 +17,8 @@
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.git.validators.RefOperationValidationListener;
 import com.google.inject.AbstractModule;
-import com.google.inject.name.Names;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase;
-import java.time.Duration;
-import org.apache.curator.framework.CuratorFramework;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
 
 public class ValidationModule extends AbstractModule {
 
@@ -36,10 +32,6 @@
   protected void configure() {
     DynamicSet.bind(binder(), RefOperationValidationListener.class).to(InSyncChangeValidator.class);
 
-    bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
-    bind(CuratorFramework.class).toInstance(cfg.getSplitBrain().getZookeeper().buildCurator());
-    bind(Duration.class)
-        .annotatedWith(Names.named("ZkLockTimeout"))
-        .toInstance(cfg.getSplitBrain().getZookeeper().getLockTimeout());
+    install(new ZkValidationModule(cfg));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
deleted file mode 100644
index ab34aca..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// Copyright (C) 2018 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-
-import com.google.common.base.Objects;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.Ref;
-
-public class ZkRefInfo {
-
-  private final String refName;
-  private final String projectName;
-  private final ObjectId objectId;
-
-  public ZkRefInfo(final String projectName, final String refName, final ObjectId objectId) {
-    this.projectName = projectName;
-    this.objectId = objectId;
-    this.refName = refName;
-  }
-
-  public ZkRefInfo(final String projectName, final Ref ref) {
-    this(projectName, ref.getName(), ref.getObjectId());
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (other == null || getClass() != other.getClass()) {
-      return false;
-    }
-    ZkRefInfo zkRefInfo = (ZkRefInfo) other;
-    return Objects.equal(refName, zkRefInfo.refName)
-        && Objects.equal(projectName, zkRefInfo.projectName)
-        && Objects.equal(objectId, zkRefInfo.objectId);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(refName, projectName, objectId);
-  }
-
-  public String refName() {
-    return refName;
-  }
-
-  public String projectName() {
-    return projectName;
-  }
-
-  public ObjectId objectId() {
-    return objectId;
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
deleted file mode 100644
index 85d8227..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
+++ /dev/null
@@ -1,141 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// Copyright (C) 2018 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-
-import static org.apache.zookeeper.CreateMode.PERSISTENT;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.PathAndBytesable;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.Ref;
-
-public class ZkRefInfoDAO {
-  public static final String OBJECT_ID_PATH = "objectId";
-
-  public static String pathFor(ZkRefInfo info) {
-    return "/" + info.projectName() + "/" + info.refName();
-  }
-
-  public static String pathFor(String projectName, Ref ref) {
-    return pathFor(projectName, ref.getName());
-  }
-
-  public static String pathFor(String projectName, String refName) {
-    return "/" + projectName + "/" + refName;
-  }
-
-  private final CuratorFramework client;
-
-  public ZkRefInfoDAO(CuratorFramework client) {
-    this.client = client;
-  }
-
-  public Optional<ZkRefInfo> read(String projectName, String refName) throws Exception {
-    final String rootPath = pathFor(projectName, refName);
-
-    if (!exists(rootPath)) {
-      return Optional.empty();
-    }
-
-    final ObjectId objectId = readObjectIdAt(rootPath + "/" + OBJECT_ID_PATH);
-
-    return Optional.of(new ZkRefInfo(projectName, refName, objectId));
-  }
-
-  public void update(ZkRefInfo info) throws Exception {
-    writeInTransaction(info, () -> client.transactionOp().setData());
-  }
-
-  public void create(ZkRefInfo info) throws Exception {
-    client.createContainers(pathFor(info));
-    writeInTransaction(info, () -> client.transactionOp().create().withMode(PERSISTENT));
-  }
-
-  private void writeInTransaction(
-      ZkRefInfo info, Supplier<PathAndBytesable<CuratorOp>> writeOpBuilderSupplier)
-      throws Exception {
-    String commonPath = pathFor(info);
-    final List<CuratorTransactionResult> curatorTransactionResults =
-        client
-            .transaction()
-            .forOperations(
-                writeOpBuilderSupplier
-                    .get()
-                    .forPath(commonPath + "/" + OBJECT_ID_PATH, writeObjectId(info.objectId())));
-
-    for (CuratorTransactionResult result : curatorTransactionResults) {
-      if (result.getError() != 0)
-        throw new IOException(
-            String.format(
-                "Error with code %d trying to write path %s ",
-                result.getError(), result.getForPath()));
-    }
-  }
-
-  private boolean exists(String path) throws Exception {
-    return client.checkExists().forPath(path) != null;
-  }
-
-  private ObjectId readObjectIdAt(String path) throws Exception {
-    Optional<ObjectId> objectId = parseAt(path, ObjectId::fromRaw);
-    if (!objectId.isPresent()) {
-      throw new CorruptedZkStorageException(
-          String.format("Not able to read objectId from zookeeper path %s", path));
-    }
-    return objectId.get();
-  }
-
-  private byte[] writeObjectId(ObjectId value) throws IOException {
-    final ByteArrayOutputStream out = new ByteArrayOutputStream();
-    final DataOutputStream stream = new DataOutputStream(out);
-    value.copyRawTo(stream);
-    return out.toByteArray();
-  }
-
-  private <T> Optional<T> parseAt(String path, Function<byte[], T> parser) throws Exception {
-    if (client.checkExists().forPath(path) == null) return Optional.empty();
-    return Optional.ofNullable(client.getData().forPath(path)).map(parser);
-  }
-
-  static class CorruptedZkStorageException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public CorruptedZkStorageException(String message) {
-      super(message);
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
index 7f858c4..c1272a7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
@@ -14,18 +14,18 @@
 
 package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
+import com.google.common.base.MoreObjects;
 import com.google.common.flogger.FluentLogger;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
 import javax.inject.Named;
+import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.recipes.locks.Locker;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 
@@ -33,140 +33,62 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final CuratorFramework client;
-  private final Duration lockTimeout;
+  private final RetryPolicy retryPolicy;
 
   @Inject
   public ZkSharedRefDatabase(
-      CuratorFramework client, @Named("ZkLockTimeout") Duration lockTimeout) {
+      CuratorFramework client, @Named("ZkLockRetryPolicy") RetryPolicy retryPolicy) {
     this.client = client;
-    this.lockTimeout = lockTimeout;
+    this.retryPolicy = retryPolicy;
   }
 
   @Override
   public boolean compareAndRemove(String project, Ref oldRef) throws IOException {
-    return compareAndPut(project, oldRef, TombstoneRef.forRef(oldRef));
+    return compareAndPut(project, oldRef, NULL_REF);
   }
 
   @Override
   public boolean compareAndPut(String projectName, Ref oldRef, Ref newRef) throws IOException {
-    boolean isCreate = oldRef == NULL_REF;
+    final DistributedAtomicValue distributedRefValue =
+        new DistributedAtomicValue(client, pathFor(projectName, oldRef, newRef), retryPolicy);
 
-    final ZkRefInfoDAO marshaller = new ZkRefInfoDAO(client);
-
-    final InterProcessMutex refPathMutex =
-        new InterProcessMutex(client, "/locks" + ZkRefInfoDAO.pathFor(projectName, newRef));
-
-    try (Locker locker = new Locker(refPathMutex, lockTimeout.toMillis(), MILLISECONDS)) {
-      final Optional<ZkRefInfo> infoCurrentlyInZkMaybe =
-          marshaller.read(projectName, newRef.getName());
-      final ZkRefInfo newRefInfo = new ZkRefInfo(projectName, newRef);
-
-      if (isCreate) {
-        return doCreate(marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+    try {
+      if (oldRef == NULL_REF) {
+        return distributedRefValue.initialize(writeObjectId(newRef.getObjectId()));
       }
-      return doUpdate(oldRef, marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+      final ObjectId newValue =
+          newRef.getObjectId() == null ? ObjectId.zeroId() : newRef.getObjectId();
+      final AtomicValue<byte[]> newDistributedValue =
+          distributedRefValue.compareAndSet(
+              writeObjectId(oldRef.getObjectId()), writeObjectId(newValue));
 
+      return newDistributedValue.succeeded();
     } catch (Exception e) {
+      logger.atWarning().withCause(e).log(
+          "Error trying to perform CAS at path %s", pathFor(projectName, oldRef, newRef));
       throw new IOException(
           String.format(
-              "Error trying to perform CAS at path %s", ZkRefInfoDAO.pathFor(projectName, newRef)),
+              "Error trying to perform CAS at path %s", pathFor(projectName, oldRef, newRef)),
           e);
     }
   }
 
-  private boolean doUpdate(
-      Ref oldRef,
-      ZkRefInfoDAO marshaller,
-      Optional<ZkRefInfo> infoCurrentlyInZkMaybe,
-      ZkRefInfo newRefInfo)
-      throws Exception {
-    if (!infoCurrentlyInZkMaybe.isPresent()) {
-      logger.atWarning().log(
-          "Asked to update ref %s but it is not in ZK at path %s",
-          newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
-      return false;
-    }
-
-    if (!infoCurrentlyInZkMaybe.get().objectId().equals(oldRef.getObjectId())) {
-      logger.atWarning().log(
-          "Old Ref %s does not match the current Rf content in Zookeeper %s. Not applying the update.",
-          oldRef.getObjectId(), infoCurrentlyInZkMaybe.get().objectId());
-      return false;
-    }
-
-    marshaller.update(newRefInfo);
-
-    return true;
+  static String pathFor(String projectName, Ref oldRef, Ref newRef) {
+    return pathFor(projectName, MoreObjects.firstNonNull(oldRef.getName(), newRef.getName()));
   }
 
-  private boolean doCreate(
-      ZkRefInfoDAO marshaller, Optional<ZkRefInfo> infoCurrentlyInZkMaybe, ZkRefInfo newRefInfo)
-      throws Exception {
-    if (infoCurrentlyInZkMaybe.isPresent()) {
-      logger.atWarning().log(
-          "Asked to create ref %s but it is already in ZK at path %s",
-          newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
-      return false;
-    }
-
-    marshaller.create(newRefInfo);
-
-    return true;
+  static String pathFor(String projectName, String refName) {
+    return "/" + projectName + "/" + refName;
   }
 
-  /**
-   * When deleting a Ref this temporary Ref Tombstone will be created and then cleaned-up at a later
-   * stage by the garbage collection
-   */
-  static class TombstoneRef implements Ref {
-    static TombstoneRef forRef(final Ref targetRef) {
-      return new TombstoneRef(targetRef.getName());
-    }
+  static ObjectId readObjectId(byte[] value) {
+    return ObjectId.fromRaw(value);
+  }
 
-    private final String name;
-
-    private TombstoneRef(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public boolean isSymbolic() {
-      return false;
-    }
-
-    @Override
-    public Ref getLeaf() {
-      return null;
-    }
-
-    @Override
-    public Ref getTarget() {
-      return null;
-    }
-
-    @Override
-    public ObjectId getObjectId() {
-      return ObjectId.zeroId();
-    }
-
-    @Override
-    public ObjectId getPeeledObjectId() {
-      return null;
-    }
-
-    @Override
-    public boolean isPeeled() {
-      return false;
-    }
-
-    @Override
-    public Storage getStorage() {
-      return Storage.NETWORK;
-    }
+  static byte[] writeObjectId(ObjectId value) throws IOException {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    final DataOutputStream stream = new DataOutputStream(out);
+    value.copyRawTo(stream);
+    return out.toByteArray();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
new file mode 100644
index 0000000..d4a3126
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+
+public class ZkValidationModule extends AbstractModule {
+
+  private Configuration cfg;
+
+  public ZkValidationModule(Configuration cfg) {
+    this.cfg = cfg;
+  }
+
+  @Override
+  protected void configure() {
+    bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
+    bind(CuratorFramework.class).toInstance(cfg.getSplitBrain().getZookeeper().buildCurator());
+    bind(RetryPolicy.class)
+        .annotatedWith(Names.named("ZkLockRetryPolicy"))
+        .toInstance(cfg.getSplitBrain().getZookeeper().buildCasRetryPolicy());
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index a66383b..ca3f87c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -45,16 +45,18 @@
 
 [split-brain]
   enabled = true
-  
+
 [split-brain "zookeeper"]
   connectString = "localhost:2181"
-  rootNode = "/gerrit/multi-site"  
+  rootNode = "/gerrit/multi-site"
   sessionTimeoutMs = 1000
   connectionTimeoutMs = 1000
   retryPolicyBaseSleepTimeMs = 1000
   retryPolicyMaxSleepTimeMs = 3000
   retryPolicyMaxRetries = 3
-  lockTimeoutMs = 10000
+  casRetryPolicyBaseSleepTimeMs = 100
+  casRetryPolicyMaxSleepTimeMs = 100
+  casRetryPolicyMaxRetries = 3
 ```
 
 ## Configuration parameters
@@ -136,7 +138,7 @@
 :   Enable publication of project list events, ignored when `kafka.publisher.enabled` is false
     Defaults: true
 
-```kafka.publisher.streamEventEnabled```    
+```kafka.publisher.streamEventEnabled``` 
 :   Enable publication of stream events, ignored when `kafka.publisher.enabled` is false
     Defaults: true
 
@@ -195,10 +197,21 @@
 used for the Zookeeper connection
     Defaults: 3
 
-```split-brain.zookeeper.lockTimeoutMs```
-:   Configuration for InterProcessMutex lock timeout
-    Defaults: 10000
-
+```split-brain.zookeeper.casRetryPolicyBaseSleepTimeMs```
+:   Configuration for the base sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Compare and Swap operations on Zookeeper
+    Defaults: 1000
+    
+```split-brain.zookeeper.casRetryPolicyMaxSleepTimeMs```
+:   Configuration for the max sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Compare and Swap operations on Zookeeper
+    Defaults: 3000
+    
+```split-brain.zookeeper.casRetryPolicyMaxRetries```
+:   Configuration for the max number of retries to use to create the BoundedExponentialBackoffRetry policy
+used for the Compare and Swap operations on Zookeeper
+    Defaults: 3
+    
 #### Custom kafka properties:
 
 In addition to the above settings, custom Kafka properties can be explicitly set for `publisher` and `subscriber`.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
index 43377f8..fb63793 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
@@ -95,6 +95,7 @@
     // run - T0). Using "refs/heads/master2" in this test for now
     final PushOneCommit.Result change =
         createCommitAndPush(testRepo, "refs/heads/master2", "msg", "file", "content");
+
     change.assertOkStatus();
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
index 1e27337..f2b0493 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
@@ -43,10 +43,6 @@
   static final ObjectId AN_OBJECT_ID_2 = new ObjectId(1, 2, 3, 4, 6);
   static final ObjectId AN_OBJECT_ID_3 = new ObjectId(1, 2, 3, 4, 7);
 
-  default ZkRefInfo aZkRefInfo(ObjectId objectId) {
-    return new ZkRefInfo(A_TEST_PROJECT_NAME, aBranchRef(), objectId);
-  }
-
   default String aBranchRef() {
     return RefNames.REFS_HEADS + testBranch();
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
deleted file mode 100644
index e50cd9b..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.hamcrest.CoreMatchers.nullValue;
-
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkRefInfoDAO.CorruptedZkStorageException;
-import java.time.Duration;
-import java.util.Optional;
-import org.apache.curator.framework.CuratorFramework;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TestName;
-
-public class ZkRefInfoMarshallerTest implements RefFixture {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Rule public TestName nameRule = new TestName();
-
-  ZookeeperTestContainerSupport zookeeperContainer;
-  ZkSharedRefDatabase zkSharedRefDatabase;
-  ZkRefInfoDAO marshaller;
-  CuratorFramework curator;
-
-  @Before
-  public void setup() {
-    zookeeperContainer = new ZookeeperTestContainerSupport();
-    zkSharedRefDatabase =
-        new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
-    marshaller = zookeeperContainer.getMarshaller();
-    curator = zookeeperContainer.getCurator();
-  }
-
-  @After
-  public void cleanup() {
-    zookeeperContainer.cleanup();
-  }
-
-  @Test
-  public void shouldCreateAZRefInfo() throws Exception {
-    ZkRefInfo refInfo = aZkRefInfo(AN_OBJECT_ID_1);
-
-    marshaller.create(refInfo);
-
-    Optional<ZkRefInfo> readRefInfo = marshaller.read(refInfo.projectName(), refInfo.refName());
-
-    assertThat(readRefInfo).isEqualTo(Optional.of(refInfo));
-  }
-
-  @Test
-  public void shouldReturnEmptyIfARefDoesNotExist() throws Exception {
-    assertThat(marshaller.read(A_TEST_PROJECT_NAME, aBranchRef())).isEqualTo(Optional.empty());
-  }
-
-  @Test
-  public void shouldUpdateAZrefInfo() throws Exception {
-    ZkRefInfo newRefInfo = aZkRefInfo(AN_OBJECT_ID_1);
-    ZkRefInfo updateRefInfo =
-        new ZkRefInfo(newRefInfo.projectName(), newRefInfo.refName(), AN_OBJECT_ID_2);
-
-    marshaller.create(newRefInfo);
-    marshaller.update(updateRefInfo);
-
-    Optional<ZkRefInfo> readUpdatedRefInfo =
-        marshaller.read(updateRefInfo.projectName(), updateRefInfo.refName());
-
-    assertThat(readUpdatedRefInfo).isEqualTo(Optional.of(updateRefInfo));
-  }
-
-  @Test
-  public void shouldFailToReadZkRefInfoIfSomeOfTheInfoIsMissing() throws Exception {
-    String projectName = A_TEST_PROJECT_NAME;
-    String refName = aBranchRef();
-
-    curator.createContainers(ZkRefInfoDAO.pathFor(projectName, refName));
-
-    expectedException.expect(CorruptedZkStorageException.class);
-    expectedException.expectCause(nullValue(Exception.class));
-
-    marshaller.read(projectName, refName);
-  }
-
-  @Override
-  public String testBranch() {
-    return "branch_" + nameRule.getMethodName();
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabaseTest.java
similarity index 75%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabaseTest.java
index bc3323c..b5ac1e4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabaseTest.java
@@ -29,10 +29,8 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.TombstoneRef;
 import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
+import org.apache.curator.retry.RetryNTimes;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.junit.After;
@@ -41,19 +39,17 @@
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-public class ZKRefsSharedDatabaseTest implements RefFixture {
+public class ZkSharedRefDatabaseTest implements RefFixture {
   @Rule public TestName nameRule = new TestName();
 
   ZookeeperTestContainerSupport zookeeperContainer;
   ZkSharedRefDatabase zkSharedRefDatabase;
-  ZkRefInfoDAO marshaller;
 
   @Before
   public void setup() {
     zookeeperContainer = new ZookeeperTestContainerSupport();
     zkSharedRefDatabase =
-        new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
-    marshaller = zookeeperContainer.getMarshaller();
+        new ZkSharedRefDatabase(zookeeperContainer.getCurator(), new RetryNTimes(5, 30));
   }
 
   @After
@@ -62,24 +58,34 @@
   }
 
   @Test
+  public void shouldCompareAndCreateSuccessfully() throws Exception {
+  	Ref ref = refOf(AN_OBJECT_ID_1);
+
+  	assertThat(zkSharedRefDatabase.compareAndCreate(A_TEST_PROJECT_NAME, ref)).isTrue();
+
+  	assertThat(zookeeperContainer.readRefValueFromZk(A_TEST_PROJECT_NAME, ref))
+  			.isEqualTo(ref.getObjectId());
+  }
+
+  @Test
   public void shouldCompareAndPutSuccessfully() throws Exception {
     Ref oldRef = refOf(AN_OBJECT_ID_1);
     Ref newRef = refOf(AN_OBJECT_ID_2);
-    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+    String projectName = A_TEST_PROJECT_NAME;
 
-    marshaller.create(new ZkRefInfo(projectName, oldRef));
+    zookeeperContainer.createRefInZk(projectName, oldRef);
 
     assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, newRef)).isTrue();
   }
 
   @Test
   public void compareAndPutShouldFailIfTheObjectionHasNotTheExpectedValue() throws Exception {
-    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+    String projectName = A_TEST_PROJECT_NAME;
 
     Ref oldRef = refOf(AN_OBJECT_ID_1);
     Ref expectedRef = refOf(AN_OBJECT_ID_2);
 
-    marshaller.create(new ZkRefInfo(projectName, oldRef));
+    zookeeperContainer.createRefInZk(projectName, oldRef);
 
     assertThat(zkSharedRefDatabase.compareAndPut(projectName, expectedRef, refOf(AN_OBJECT_ID_3)))
         .isFalse();
@@ -89,17 +95,16 @@
   public void compareAndPutShouldFaiIfTheObjectionDoesNotExist() throws IOException {
     Ref oldRef = refOf(AN_OBJECT_ID_1);
     assertThat(
-            zkSharedRefDatabase.compareAndPut(
-                RefFixture.A_TEST_PROJECT_NAME, oldRef, refOf(AN_OBJECT_ID_2)))
+            zkSharedRefDatabase.compareAndPut(A_TEST_PROJECT_NAME, oldRef, refOf(AN_OBJECT_ID_2)))
         .isFalse();
   }
 
   @Test
   public void shouldCompareAndRemoveSuccessfully() throws Exception {
     Ref oldRef = refOf(AN_OBJECT_ID_1);
-    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+    String projectName = A_TEST_PROJECT_NAME;
 
-    marshaller.create(new ZkRefInfo(projectName, oldRef));
+    zookeeperContainer.createRefInZk(projectName, oldRef);
 
     assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
   }
@@ -107,23 +112,22 @@
   @Test
   public void shouldReplaceTheRefWithATombstoneAfterCompareAndPutRemove() throws Exception {
     Ref oldRef = refOf(AN_OBJECT_ID_1);
-    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+    String projectName = A_TEST_PROJECT_NAME;
 
-    marshaller.create(new ZkRefInfo(projectName, oldRef));
+    zookeeperContainer.createRefInZk(projectName, oldRef);
 
     assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
 
-    Optional<ZkRefInfo> inZk = marshaller.read(projectName, oldRef.getName());
-    assertThat(inZk.isPresent()).isTrue();
-    inZk.get().equals(TombstoneRef.forRef(oldRef));
+    assertThat(zookeeperContainer.readRefValueFromZk(projectName, oldRef))
+        .isEqualTo(ObjectId.zeroId());
   }
 
   @Test
   public void shouldNotCompareAndPutSuccessfullyAfterACompareAndRemove() throws Exception {
     Ref oldRef = refOf(AN_OBJECT_ID_1);
-    String projectName = RefFixture.A_TEST_PROJECT_NAME;
+    String projectName = A_TEST_PROJECT_NAME;
 
-    marshaller.create(new ZkRefInfo(projectName, oldRef));
+    zookeeperContainer.createRefInZk(projectName, oldRef);
 
     zkSharedRefDatabase.compareAndRemove(projectName, oldRef);
     assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, refOf(AN_OBJECT_ID_2)))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
index dff2aab..3d041ae 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
@@ -27,9 +27,15 @@
 
 package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
 
+import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase.NULL_REF;
+import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.pathFor;
+import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.writeObjectId;
+
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import org.apache.curator.framework.CuratorFramework;
 import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
 import org.junit.Ignore;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.wait.strategy.Wait;
@@ -46,7 +52,6 @@
   }
 
   private ZookeeperContainer container;
-  private ZkRefInfoDAO marshaller;
   private Configuration configuration;
   private CuratorFramework curator;
 
@@ -58,18 +63,13 @@
     return container;
   }
 
-  public ZkRefInfoDAO getMarshaller() {
-    return marshaller;
-  }
-
   public Configuration getConfig() {
     return configuration;
   }
 
+  @SuppressWarnings("resource")
   public ZookeeperTestContainerSupport() {
-    container = new ZookeeperContainer();
-    container.addExposedPorts(2181);
-    container.waitingFor(Wait.forListeningPort());
+    container = new ZookeeperContainer().withExposedPorts(2181).waitingFor(Wait.forListeningPort());
     container.start();
     Integer zkHostPort = container.getMappedPort(2181);
     Config splitBrainconfig = new Config();
@@ -79,12 +79,22 @@
 
     configuration = new Configuration(splitBrainconfig);
     this.curator = configuration.getSplitBrain().getZookeeper().buildCurator();
-
-    this.marshaller = new ZkRefInfoDAO(this.curator);
   }
 
   public void cleanup() {
     this.curator.delete();
     this.container.stop();
   }
+
+  public ObjectId readRefValueFromZk(String projectName, Ref ref) throws Exception {
+    final byte[] bytes = curator.getData().forPath(pathFor(projectName, NULL_REF, ref));
+    return ZkSharedRefDatabase.readObjectId(bytes);
+  }
+
+  public void createRefInZk(String projectName, Ref ref) throws Exception {
+    curator
+        .create()
+        .creatingParentContainersIfNeeded()
+        .forPath(pathFor(projectName, NULL_REF, ref), writeObjectId(ref.getObjectId()));
+  }
 }