Implement Zookeeper backend for consistency check
Initial Zookeeper-based SharedDfsStorage implementation based on
the original implementation of Dave Borowitz of zookeeper-refdb
(https://gerrit-review.googlesource.com/c/plugins/zookeeper-refdb/+/35787)
Feature: Issue 10554
Change-Id: Ica87fe2c8a2c4aca5f9331bc0f7dcfb834d94727
diff --git a/BUILD b/BUILD
index 38da77a..e132e67 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,10 @@
deps = [
"@commons-lang3//jar",
"@kafka_client//jar",
+ "@curator-framework//jar",
+ "@curator-recipes//jar",
+ "@curator-client//jar",
+ "@zookeeper//jar",
],
)
@@ -46,5 +50,9 @@
"@kafka_client//jar",
"@testcontainers-kafka//jar",
"//lib/testcontainers",
+ "@curator-framework//jar",
+ "@curator-recipes//jar",
+ "@curator-test//jar",
+ "@zookeeper//jar",
],
)
diff --git a/README.md b/README.md
index 345fefb..9f03102 100644
--- a/README.md
+++ b/README.md
@@ -84,6 +84,12 @@
[kafka "subscriber"]
enabled = true
+
+[split-brain]
+ enabled = true
+
+[split-brain "zookeeper"]
+ connectString = "localhost:2181"
```
For more details on the configuration settings, please refer to the
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 678edd5..a8fabbb 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -19,6 +19,8 @@
)
BYTE_BUDDY_VER = "1.8.15"
+ CURATOR_VER = "4.2.0"
+ CURATOR_TEST_VER = "2.12.0"
maven_jar(
name = "byte_buddy",
@@ -55,3 +57,33 @@
artifact = "org.apache.commons:commons-lang3:3.6",
sha1 = "9d28a6b23650e8a7e9063c04588ace6cf7012c17",
)
+
+ maven_jar(
+ name = "curator-test",
+ artifact = "org.apache.curator:curator-test:" + CURATOR_TEST_VER,
+ sha1 = "0a797be57ba95b67688a7615f7ad41ee6b3ceff0"
+ )
+
+ maven_jar(
+ name = "curator-framework",
+ artifact = "org.apache.curator:curator-framework:" + CURATOR_VER,
+ sha1 = "5b1cc87e17b8fe4219b057f6025662a693538861"
+ )
+
+ maven_jar(
+ name = "curator-recipes",
+ artifact = "org.apache.curator:curator-recipes:" + CURATOR_VER,
+ sha1 = "7f775be5a7062c2477c51533b9d008f70411ba8e"
+ )
+
+ maven_jar(
+ name = "curator-client",
+ artifact = "org.apache.curator:curator-client:" + CURATOR_VER,
+ sha1 = "d5d50930b8dd189f92c40258a6ba97675fea3e15"
+ )
+
+ maven_jar(
+ name = "zookeeper",
+ artifact = "org.apache.zookeeper:zookeeper:3.4.8",
+ sha1 = "933ea2ed15e6a0e24b788973e3d128ff163c3136"
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index acfaae2..da4ea93 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.multisite;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
@@ -23,6 +25,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -30,6 +33,10 @@
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
@@ -55,6 +62,7 @@
static final boolean DEFAULT_ENABLE_PROCESSING = true;
static final String KAFKA_SECTION = "kafka";
public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+ public static final Boolean DEFAULT_SPLIT_BRAIN = false;
private final KafkaPublisher publisher;
private final Cache cache;
@@ -62,6 +70,7 @@
private final Index index;
private final KafkaSubscriber subscriber;
private final Kafka kafka;
+ private final SplitBrain splitBrain;
@Inject
Configuration(PluginConfigFactory pluginConfigFactory, @PluginName String pluginName) {
@@ -76,6 +85,11 @@
cache = new Cache(cfg);
event = new Event(cfg);
index = new Index(cfg);
+ splitBrain = new SplitBrain(cfg);
+ }
+
+ public SplitBrain getSplitBrain() {
+ return splitBrain;
}
public Kafka getKafka() {
@@ -102,9 +116,15 @@
return subscriber;
}
- private static int getInt(Config cfg, String section, String name, int defaultValue) {
+ private static boolean getBoolean(
+ Config cfg, String section, String subsection, String name, boolean defaultValue) {
+ return cfg.getBoolean(section, subsection, name, defaultValue);
+ }
+
+ private static int getInt(
+ Config cfg, String section, String subSection, String name, int defaultValue) {
try {
- return cfg.getInt(section, name, defaultValue);
+ return cfg.getInt(section, subSection, name, defaultValue);
} catch (IllegalArgumentException e) {
log.error("invalid value for {}; using default value {}", name, defaultValue);
log.debug("Failed to retrieve integer value: {}", e.getMessage(), e);
@@ -330,7 +350,8 @@
private Cache(Config cfg) {
super(cfg, CACHE_SECTION);
- threadPoolSize = getInt(cfg, CACHE_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
+ threadPoolSize =
+ getInt(cfg, CACHE_SECTION, null, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
patterns = Arrays.asList(cfg.getStringList(CACHE_SECTION, null, PATTERN_KEY));
}
@@ -364,10 +385,13 @@
private Index(Config cfg) {
super(cfg, INDEX_SECTION);
- threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
- retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
- maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
- numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+ threadPoolSize =
+ getInt(cfg, INDEX_SECTION, null, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
+ retryInterval =
+ getInt(cfg, INDEX_SECTION, null, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
+ maxTries = getInt(cfg, INDEX_SECTION, null, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
+ numStripedLocks =
+ getInt(cfg, INDEX_SECTION, null, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
}
public int threadPoolSize() {
@@ -386,4 +410,135 @@
return numStripedLocks;
}
}
+
+ public static class Zookeeper {
+ public static final int defaultSessionTimeoutMs;
+ public static final int defaultConnectionTimeoutMs;
+ public static final String DEFAULT_ZK_CONNECT = "localhost:2181";
+ private final int DEFAULT_LOCK_TIMEOUT_MS = 10000;
+ private final int DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS = 1000;
+ private final int DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS = 3000;
+ private final int DEFAULT_RETRY_POLICY_MAX_RETRIES = 3;
+
+ static {
+ CuratorFrameworkFactory.Builder b = CuratorFrameworkFactory.builder();
+ defaultSessionTimeoutMs = b.getSessionTimeoutMs();
+ defaultConnectionTimeoutMs = b.getConnectionTimeoutMs();
+ }
+
+ public static final String SUBSECTION = "zookeeper";
+ public static final String KEY_CONNECT_STRING = "connectString";
+ public static final String KEY_SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+ public static final String KEY_CONNECTION_TIMEOUT_MS = "connectionTimeoutMs";
+ public static final String KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS = "retryPolicyBaseSleepTimeMs";
+ public static final String KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS = "retryPolicyMaxSleepTimeMs";
+ public static final String KEY_RETRY_POLICY_MAX_RETRIES = "retryPolicyMaxRetries";
+ public static final String KEY_LOCK_TIMEOUT_MS = "lockTimeoutMs";
+ public static final String KEY_ROOT_NODE = "rootNode";
+
+ private final String connectionString;
+ private final String root;
+ private final int sessionTimeoutMs;
+ private final int connectionTimeoutMs;
+ private final int baseSleepTimeMs;
+ private final int maxSleepTimeMs;
+ private final int maxRetries;
+ private CuratorFramework build;
+
+ public Duration getLockTimeout() {
+ return lockTimeout;
+ }
+
+ private final Duration lockTimeout;
+
+ private Zookeeper(Config cfg) {
+ connectionString =
+ getString(cfg, SplitBrain.SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
+ root = getString(cfg, SplitBrain.SECTION, SUBSECTION, KEY_ROOT_NODE, "gerrit/multi-site");
+ sessionTimeoutMs =
+ getInt(
+ cfg, SplitBrain.SECTION, SUBSECTION, KEY_SESSION_TIMEOUT_MS, defaultSessionTimeoutMs);
+ connectionTimeoutMs =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_CONNECTION_TIMEOUT_MS,
+ defaultConnectionTimeoutMs);
+
+ baseSleepTimeMs =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_RETRY_POLICY_BASE_SLEEP_TIME_MS,
+ DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS);
+
+ maxSleepTimeMs =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_RETRY_POLICY_MAX_SLEEP_TIME_MS,
+ DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS);
+
+ maxRetries =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_RETRY_POLICY_MAX_RETRIES,
+ DEFAULT_RETRY_POLICY_MAX_RETRIES);
+
+ lockTimeout =
+ Duration.ofMillis(
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_LOCK_TIMEOUT_MS,
+ DEFAULT_LOCK_TIMEOUT_MS));
+
+ checkArgument(StringUtils.isNotEmpty(connectionString), "zookeeper.%s contains no servers");
+ }
+
+ public CuratorFramework buildCurator() {
+ if (build == null) {
+ this.build =
+ CuratorFrameworkFactory.builder()
+ .connectString(connectionString)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .connectionTimeoutMs(connectionTimeoutMs)
+ .retryPolicy(
+ new BoundedExponentialBackoffRetry(baseSleepTimeMs, maxSleepTimeMs, maxRetries))
+ .namespace(root)
+ .build();
+ this.build.start();
+ }
+
+ return this.build;
+ }
+ }
+
+ public static class SplitBrain {
+ private final boolean enabled;
+
+ private final Zookeeper zookeeper;
+ static final String SECTION = "split-brain";
+ static final String ENABLED_KEY = "enabled";
+
+ private SplitBrain(Config cfg) {
+
+ this.enabled = getBoolean(cfg, SECTION, null, ENABLED_KEY, DEFAULT_SPLIT_BRAIN);
+ zookeeper = this.enabled ? new Zookeeper(cfg) : null;
+ }
+
+ public boolean enabled() {
+ return enabled;
+ }
+
+ public Zookeeper getZookeeper() {
+ return zookeeper;
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 0d9ab52..4170239 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -82,7 +82,9 @@
install(new BrokerForwarderModule(config.kafkaPublisher()));
}
- install(new ValidationModule());
+ if (config.getSplitBrain().enabled()) {
+ install(new ValidationModule(config));
+ }
bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index f2a6b94..c1b80eb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -17,15 +17,29 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.git.validators.RefOperationValidationListener;
import com.google.inject.AbstractModule;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoOpDfsRefDatabase;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase;
+import java.time.Duration;
+import org.apache.curator.framework.CuratorFramework;
public class ValidationModule extends AbstractModule {
+ private Configuration cfg;
+
+ public ValidationModule(Configuration cfg) {
+ this.cfg = cfg;
+ }
+
@Override
protected void configure() {
DynamicSet.bind(binder(), RefOperationValidationListener.class).to(InSyncChangeValidator.class);
- bind(SharedRefDatabase.class).to(NoOpDfsRefDatabase.class);
+ bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
+ bind(CuratorFramework.class).toInstance(cfg.getSplitBrain().getZookeeper().buildCurator());
+ bind(Duration.class)
+ .annotatedWith(Names.named("ZkLockTimeout"))
+ .toInstance(cfg.getSplitBrain().getZookeeper().getLockTimeout());
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java
index 801fc3b..defa6ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/NoOpDfsRefDatabase.java
@@ -15,17 +15,11 @@
package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb;
import java.io.IOException;
-import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
public class NoOpDfsRefDatabase implements SharedRefDatabase {
@Override
- public Ref newRef(String refName, ObjectId objectId) {
- return null;
- }
-
- @Override
public boolean compareAndPut(String project, Ref oldRef, Ref newRef) throws IOException {
return true;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java
index b995df9..c886751 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/SharedRefDatabase.java
@@ -16,6 +16,7 @@
import java.io.IOException;
import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
import org.eclipse.jgit.lib.Ref;
public interface SharedRefDatabase {
@@ -69,14 +70,16 @@
* @param refName ref name
* @param objectId object id
*/
- Ref newRef(String refName, ObjectId objectId);
+ default Ref newRef(String refName, ObjectId objectId) {
+ return new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, refName, objectId);
+ }
/**
* Utility method for new refs.
*
* @param project project name of the ref
* @param newRef new reference to store.
- * @return
+ * @return true if the operation was successful; false otherwise.
* @throws IOException
*/
default boolean compareAndCreate(String project, Ref newRef) throws IOException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
new file mode 100644
index 0000000..ab34aca
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.google.common.base.Objects;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+
+public class ZkRefInfo {
+
+ private final String refName;
+ private final String projectName;
+ private final ObjectId objectId;
+
+ public ZkRefInfo(final String projectName, final String refName, final ObjectId objectId) {
+ this.projectName = projectName;
+ this.objectId = objectId;
+ this.refName = refName;
+ }
+
+ public ZkRefInfo(final String projectName, final Ref ref) {
+ this(projectName, ref.getName(), ref.getObjectId());
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ ZkRefInfo zkRefInfo = (ZkRefInfo) other;
+ return Objects.equal(refName, zkRefInfo.refName)
+ && Objects.equal(projectName, zkRefInfo.projectName)
+ && Objects.equal(objectId, zkRefInfo.objectId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(refName, projectName, objectId);
+ }
+
+ public String refName() {
+ return refName;
+ }
+
+ public String projectName() {
+ return projectName;
+ }
+
+ public ObjectId objectId() {
+ return objectId;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
new file mode 100644
index 0000000..85d8227
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
@@ -0,0 +1,141 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static org.apache.zookeeper.CreateMode.PERSISTENT;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+
+public class ZkRefInfoDAO {
+ public static final String OBJECT_ID_PATH = "objectId";
+
+ public static String pathFor(ZkRefInfo info) {
+ return "/" + info.projectName() + "/" + info.refName();
+ }
+
+ public static String pathFor(String projectName, Ref ref) {
+ return pathFor(projectName, ref.getName());
+ }
+
+ public static String pathFor(String projectName, String refName) {
+ return "/" + projectName + "/" + refName;
+ }
+
+ private final CuratorFramework client;
+
+ public ZkRefInfoDAO(CuratorFramework client) {
+ this.client = client;
+ }
+
+ public Optional<ZkRefInfo> read(String projectName, String refName) throws Exception {
+ final String rootPath = pathFor(projectName, refName);
+
+ if (!exists(rootPath)) {
+ return Optional.empty();
+ }
+
+ final ObjectId objectId = readObjectIdAt(rootPath + "/" + OBJECT_ID_PATH);
+
+ return Optional.of(new ZkRefInfo(projectName, refName, objectId));
+ }
+
+ public void update(ZkRefInfo info) throws Exception {
+ writeInTransaction(info, () -> client.transactionOp().setData());
+ }
+
+ public void create(ZkRefInfo info) throws Exception {
+ client.createContainers(pathFor(info));
+ writeInTransaction(info, () -> client.transactionOp().create().withMode(PERSISTENT));
+ }
+
+ private void writeInTransaction(
+ ZkRefInfo info, Supplier<PathAndBytesable<CuratorOp>> writeOpBuilderSupplier)
+ throws Exception {
+ String commonPath = pathFor(info);
+ final List<CuratorTransactionResult> curatorTransactionResults =
+ client
+ .transaction()
+ .forOperations(
+ writeOpBuilderSupplier
+ .get()
+ .forPath(commonPath + "/" + OBJECT_ID_PATH, writeObjectId(info.objectId())));
+
+ for (CuratorTransactionResult result : curatorTransactionResults) {
+ if (result.getError() != 0)
+ throw new IOException(
+ String.format(
+ "Error with code %d trying to write path %s ",
+ result.getError(), result.getForPath()));
+ }
+ }
+
+ private boolean exists(String path) throws Exception {
+ return client.checkExists().forPath(path) != null;
+ }
+
+ private ObjectId readObjectIdAt(String path) throws Exception {
+ Optional<ObjectId> objectId = parseAt(path, ObjectId::fromRaw);
+ if (!objectId.isPresent()) {
+ throw new CorruptedZkStorageException(
+ String.format("Not able to read objectId from zookeeper path %s", path));
+ }
+ return objectId.get();
+ }
+
+ private byte[] writeObjectId(ObjectId value) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final DataOutputStream stream = new DataOutputStream(out);
+ value.copyRawTo(stream);
+ return out.toByteArray();
+ }
+
+ private <T> Optional<T> parseAt(String path, Function<byte[], T> parser) throws Exception {
+ if (client.checkExists().forPath(path) == null) return Optional.empty();
+ return Optional.ofNullable(client.getData().forPath(path)).map(parser);
+ }
+
+ static class CorruptedZkStorageException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public CorruptedZkStorageException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
new file mode 100644
index 0000000..7f858c4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
@@ -0,0 +1,172 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import javax.inject.Named;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.recipes.locks.Locker;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+
+public class ZkSharedRefDatabase implements SharedRefDatabase {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final CuratorFramework client;
+ private final Duration lockTimeout;
+
+ @Inject
+ public ZkSharedRefDatabase(
+ CuratorFramework client, @Named("ZkLockTimeout") Duration lockTimeout) {
+ this.client = client;
+ this.lockTimeout = lockTimeout;
+ }
+
+ @Override
+ public boolean compareAndRemove(String project, Ref oldRef) throws IOException {
+ return compareAndPut(project, oldRef, TombstoneRef.forRef(oldRef));
+ }
+
+ @Override
+ public boolean compareAndPut(String projectName, Ref oldRef, Ref newRef) throws IOException {
+ boolean isCreate = oldRef == NULL_REF;
+
+ final ZkRefInfoDAO marshaller = new ZkRefInfoDAO(client);
+
+ final InterProcessMutex refPathMutex =
+ new InterProcessMutex(client, "/locks" + ZkRefInfoDAO.pathFor(projectName, newRef));
+
+ try (Locker locker = new Locker(refPathMutex, lockTimeout.toMillis(), MILLISECONDS)) {
+ final Optional<ZkRefInfo> infoCurrentlyInZkMaybe =
+ marshaller.read(projectName, newRef.getName());
+ final ZkRefInfo newRefInfo = new ZkRefInfo(projectName, newRef);
+
+ if (isCreate) {
+ return doCreate(marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+ }
+ return doUpdate(oldRef, marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+
+ } catch (Exception e) {
+ throw new IOException(
+ String.format(
+ "Error trying to perform CAS at path %s", ZkRefInfoDAO.pathFor(projectName, newRef)),
+ e);
+ }
+ }
+
+ private boolean doUpdate(
+ Ref oldRef,
+ ZkRefInfoDAO marshaller,
+ Optional<ZkRefInfo> infoCurrentlyInZkMaybe,
+ ZkRefInfo newRefInfo)
+ throws Exception {
+ if (!infoCurrentlyInZkMaybe.isPresent()) {
+ logger.atWarning().log(
+ "Asked to update ref %s but it is not in ZK at path %s",
+ newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
+ return false;
+ }
+
+ if (!infoCurrentlyInZkMaybe.get().objectId().equals(oldRef.getObjectId())) {
+ logger.atWarning().log(
+ "Old Ref %s does not match the current Rf content in Zookeeper %s. Not applying the update.",
+ oldRef.getObjectId(), infoCurrentlyInZkMaybe.get().objectId());
+ return false;
+ }
+
+ marshaller.update(newRefInfo);
+
+ return true;
+ }
+
+ private boolean doCreate(
+ ZkRefInfoDAO marshaller, Optional<ZkRefInfo> infoCurrentlyInZkMaybe, ZkRefInfo newRefInfo)
+ throws Exception {
+ if (infoCurrentlyInZkMaybe.isPresent()) {
+ logger.atWarning().log(
+ "Asked to create ref %s but it is already in ZK at path %s",
+ newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
+ return false;
+ }
+
+ marshaller.create(newRefInfo);
+
+ return true;
+ }
+
+ /**
+ * When deleting a Ref this temporary Ref Tombstone will be created and then cleaned-up at a later
+ * stage by the garbage collection
+ */
+ static class TombstoneRef implements Ref {
+ static TombstoneRef forRef(final Ref targetRef) {
+ return new TombstoneRef(targetRef.getName());
+ }
+
+ private final String name;
+
+ private TombstoneRef(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean isSymbolic() {
+ return false;
+ }
+
+ @Override
+ public Ref getLeaf() {
+ return null;
+ }
+
+ @Override
+ public Ref getTarget() {
+ return null;
+ }
+
+ @Override
+ public ObjectId getObjectId() {
+ return ObjectId.zeroId();
+ }
+
+ @Override
+ public ObjectId getPeeledObjectId() {
+ return null;
+ }
+
+ @Override
+ public boolean isPeeled() {
+ return false;
+ }
+
+ @Override
+ public Storage getStorage() {
+ return Storage.NETWORK;
+ }
+ }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b10d790..a66383b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -42,6 +42,19 @@
cacheEventEnabled = true
projectListEventEnabled = true
streamEventEnabled = true
+
+[split-brain]
+ enabled = true
+
+[split-brain "zookeeper"]
+ connectString = "localhost:2181"
+ rootNode = "/gerrit/multi-site"
+ sessionTimeoutMs = 1000
+ connectionTimeoutMs = 1000
+ retryPolicyBaseSleepTimeMs = 1000
+ retryPolicyMaxSleepTimeMs = 3000
+ retryPolicyMaxRetries = 3
+ lockTimeoutMs = 10000
```
## Configuration parameters
@@ -151,6 +164,41 @@
: Polling interval for checking incoming events
Defaults: 1000
+```split-brain.zookeeper.connectString```
+: Connection string to zookeeper
+
+```split-brain.zookeeper.rootNode```
+: Root node to use under Zookeeper to store/retrieve information
+ Defaults: "/gerrit/multi-site"
+
+
+```split-brain.zookeeper.sessionTimeoutMs```
+: Root node to use under Zookeeper to store/retrieve information
+ Defaults: 1000
+
+```split-brain.zookeeper.connectionTimeoutMs```
+: Root node to use under Zookeeper to store/retrieve information
+ Defaults: 1000
+
+```split-brain.zookeeper.retryPolicyBaseSleepTimeMs```
+: Configuration for the base sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Zookeeper connection
+ Defaults: 1000
+
+```split-brain.zookeeper.retryPolicyMaxSleepTimeMs```
+: Configuration for the max sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Zookeeper connection
+ Defaults: 3000
+
+```split-brain.zookeeper.retryPolicyMaxRetries```
+: Configuration for the max number of retries to use to create the BoundedExponentialBackoffRetry policy
+used for the Zookeeper connection
+ Defaults: 3
+
+```split-brain.zookeeper.lockTimeoutMs```
+: Configuration for InterProcessMutex lock timeout
+ Defaults: 10000
+
#### Custom kafka properties:
In addition to the above settings, custom Kafka properties can be explicitly set for `publisher` and `subscriber`.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index a4bebb7..a066094 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -17,6 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.CACHE_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Cache.PATTERN_KEY;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_SPLIT_BRAIN;
import static com.googlesource.gerrit.plugins.multisite.Configuration.DEFAULT_THREAD_POOL_SIZE;
import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
@@ -32,6 +33,8 @@
import com.google.common.collect.ImmutableList;
import com.google.gerrit.server.config.PluginConfigFactory;
+import com.googlesource.gerrit.plugins.multisite.Configuration.SplitBrain;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Zookeeper;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
@@ -45,6 +48,7 @@
private static final String INVALID_INT = "invalidInt";
private static final String PLUGIN_NAME = "multi-site";
private static final int THREAD_POOL_SIZE = 1;
+ private static final boolean SPLIT_BRAIN_ENABLED = true;
@Mock private PluginConfigFactory pluginConfigFactoryMock;
private Config globalPluginConfig;
@@ -96,6 +100,22 @@
}
@Test
+ public void testGetEnabledSplitBrain() throws Exception {
+ assertThat(getConfiguration().getSplitBrain().enabled()).isEqualTo(DEFAULT_SPLIT_BRAIN);
+
+ globalPluginConfig.setBoolean(
+ SplitBrain.SECTION, null, SplitBrain.ENABLED_KEY, SPLIT_BRAIN_ENABLED);
+ // If split-brain enabled, zookeeper 'connect' is required
+ globalPluginConfig.setString(
+ SplitBrain.SECTION,
+ Zookeeper.SUBSECTION,
+ Zookeeper.KEY_CONNECT_STRING,
+ Zookeeper.DEFAULT_ZK_CONNECT);
+
+ assertThat(getConfiguration().getSplitBrain().enabled()).isEqualTo(SPLIT_BRAIN_ENABLED);
+ }
+
+ @Test
public void testGetCacheSynchronize() throws Exception {
assertThat(getConfiguration().cache().synchronize()).isEqualTo(DEFAULT_SYNCHRONIZE);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 69bd4e9..449c272 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -104,6 +104,7 @@
config.setString("kafka", null, "bootstrapServers", kafka.getBootstrapServers());
config.setBoolean("kafka", "publisher", "enabled", true);
config.setBoolean("kafka", "subscriber", "enabled", true);
+ config.setBoolean("split-brain", null, "enabled", false);
Configuration multiSiteConfig = new Configuration(config);
bind(Configuration.class).toInstance(multiSiteConfig);
install(new Module(multiSiteConfig, noteDb));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
index 0c6833d..43377f8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
@@ -14,34 +14,87 @@
package com.googlesource.gerrit.plugins.multisite.validation;
-import com.google.common.flogger.FluentLogger;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.LogThreshold;
import com.google.gerrit.acceptance.NoHttpd;
import com.google.gerrit.acceptance.PushOneCommit;
import com.google.gerrit.acceptance.TestPlugin;
-import com.google.inject.AbstractModule;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZookeeperTestContainerSupport;
+import org.junit.Before;
import org.junit.Test;
@NoHttpd
@LogThreshold(level = "INFO")
@TestPlugin(
name = "multi-site",
- sysModule = "com.googlesource.gerrit.plugins.multisite.validation.ValidationIT$Module")
+ sysModule =
+ "com.googlesource.gerrit.plugins.multisite.validation.ValidationIT$ZookeeperTestModule")
public class ValidationIT extends LightweightPluginDaemonTest {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- public static class Module extends AbstractModule {
+ static {
+ System.setProperty("gerrit.notedb", "ON");
+ }
+
+ public static class ZookeeperTestModule extends LifecycleModule {
+
+ public class ZookeeperStopAtShutdown implements LifecycleListener {
+ private final ZookeeperTestContainerSupport zookeeperContainer;
+
+ public ZookeeperStopAtShutdown(ZookeeperTestContainerSupport zk) {
+ this.zookeeperContainer = zk;
+ }
+
+ @Override
+ public void stop() {
+ zookeeperContainer.cleanup();
+ }
+
+ @Override
+ public void start() {
+ // Do nothing
+ }
+ }
+
+ private final NoteDbStatus noteDb;
+
+ @Inject
+ public ZookeeperTestModule(NoteDbStatus noteDb) {
+ this.noteDb = noteDb;
+ }
+
@Override
protected void configure() {
- install(new ValidationModule());
+ ZookeeperTestContainerSupport zookeeperContainer = new ZookeeperTestContainerSupport();
+ Configuration multiSiteConfig = zookeeperContainer.getConfig();
+ bind(Configuration.class).toInstance(multiSiteConfig);
+ install(new Module(multiSiteConfig, noteDb));
+
+ listener().toInstance(new ZookeeperStopAtShutdown(zookeeperContainer));
+ }
+ }
+
+ @Override
+ @Before
+ public void setUpTestPlugin() throws Exception {
+ super.setUpTestPlugin();
+
+ if (!notesMigration.commitChangeWrites()) {
+ throw new IllegalStateException("NoteDb is mandatory for running the multi-site plugin");
}
}
@Test
public void inSyncChangeValidatorShouldAcceptNewChange() throws Exception {
- final PushOneCommit.Result change = createChange("refs/for/master");
-
+ // FIXME: The code does not work for already existing refs (need a migration step for first
+ // run - T0). Using "refs/heads/master2" in this test for now
+ final PushOneCommit.Result change =
+ createCommitAndPush(testRepo, "refs/heads/master2", "msg", "file", "content");
change.assertOkStatus();
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefSharedDatabaseTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefSharedDatabaseTest.java
new file mode 100644
index 0000000..e0f0a9f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefSharedDatabaseTest.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.RefFixture;
+import java.io.IOException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Ref.Storage;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class RefSharedDatabaseTest implements RefFixture {
+ @Rule public TestName nameRule = new TestName();
+
+ @Override
+ public String testBranch() {
+ return "branch_" + nameRule.getMethodName();
+ }
+
+ @Test
+ public void shouldCreateANewRef() {
+
+ ObjectId objectId = AN_OBJECT_ID_1;
+ String refName = aBranchRef();
+
+ Ref aNewRef =
+ new SharedRefDatabase() {
+
+ @Override
+ public boolean compareAndRemove(String project, Ref oldRef) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean compareAndPut(String project, Ref oldRef, Ref newRef) throws IOException {
+ return false;
+ }
+ }.newRef(refName, objectId);
+
+ assertThat(aNewRef.getName()).isEqualTo(refName);
+ assertThat(aNewRef.getObjectId()).isEqualTo(objectId);
+ assertThat(aNewRef.getStorage()).isEqualTo(Storage.NETWORK);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
new file mode 100644
index 0000000..1e27337
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.google.gerrit.reviewdb.client.RefNames;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Ignore;
+
+@Ignore
+public interface RefFixture {
+
+ static final String ALLOWED_CHARS = "abcdefghilmnopqrstuvz";
+ static final String ALLOWED_DIGITS = "1234567890";
+ static final String ALLOWED_NAME_CHARS =
+ ALLOWED_CHARS + ALLOWED_CHARS.toUpperCase() + ALLOWED_DIGITS;
+ static final String A_TEST_PROJECT_NAME = "A_TEST_PROJECT_NAME";
+ static final ObjectId AN_OBJECT_ID_1 = new ObjectId(1, 2, 3, 4, 5);
+ static final ObjectId AN_OBJECT_ID_2 = new ObjectId(1, 2, 3, 4, 6);
+ static final ObjectId AN_OBJECT_ID_3 = new ObjectId(1, 2, 3, 4, 7);
+
+ default ZkRefInfo aZkRefInfo(ObjectId objectId) {
+ return new ZkRefInfo(A_TEST_PROJECT_NAME, aBranchRef(), objectId);
+ }
+
+ default String aBranchRef() {
+ return RefNames.REFS_HEADS + testBranch();
+ }
+
+ String testBranch();
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
new file mode 100644
index 0000000..bc3323c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
@@ -0,0 +1,141 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.TombstoneRef;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class ZKRefsSharedDatabaseTest implements RefFixture {
+ @Rule public TestName nameRule = new TestName();
+
+ ZookeeperTestContainerSupport zookeeperContainer;
+ ZkSharedRefDatabase zkSharedRefDatabase;
+ ZkRefInfoDAO marshaller;
+
+ @Before
+ public void setup() {
+ zookeeperContainer = new ZookeeperTestContainerSupport();
+ zkSharedRefDatabase =
+ new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
+ marshaller = zookeeperContainer.getMarshaller();
+ }
+
+ @After
+ public void cleanup() {
+ zookeeperContainer.cleanup();
+ }
+
+ @Test
+ public void shouldCompareAndPutSuccessfully() throws Exception {
+ Ref oldRef = refOf(AN_OBJECT_ID_1);
+ Ref newRef = refOf(AN_OBJECT_ID_2);
+ String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+ marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+ assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, newRef)).isTrue();
+ }
+
+ @Test
+ public void compareAndPutShouldFailIfTheObjectionHasNotTheExpectedValue() throws Exception {
+ String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+ Ref oldRef = refOf(AN_OBJECT_ID_1);
+ Ref expectedRef = refOf(AN_OBJECT_ID_2);
+
+ marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+ assertThat(zkSharedRefDatabase.compareAndPut(projectName, expectedRef, refOf(AN_OBJECT_ID_3)))
+ .isFalse();
+ }
+
+ @Test
+ public void compareAndPutShouldFaiIfTheObjectionDoesNotExist() throws IOException {
+ Ref oldRef = refOf(AN_OBJECT_ID_1);
+ assertThat(
+ zkSharedRefDatabase.compareAndPut(
+ RefFixture.A_TEST_PROJECT_NAME, oldRef, refOf(AN_OBJECT_ID_2)))
+ .isFalse();
+ }
+
+ @Test
+ public void shouldCompareAndRemoveSuccessfully() throws Exception {
+ Ref oldRef = refOf(AN_OBJECT_ID_1);
+ String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+ marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+ assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
+ }
+
+ @Test
+ public void shouldReplaceTheRefWithATombstoneAfterCompareAndPutRemove() throws Exception {
+ Ref oldRef = refOf(AN_OBJECT_ID_1);
+ String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+ marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+ assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
+
+ Optional<ZkRefInfo> inZk = marshaller.read(projectName, oldRef.getName());
+ assertThat(inZk.isPresent()).isTrue();
+ inZk.get().equals(TombstoneRef.forRef(oldRef));
+ }
+
+ @Test
+ public void shouldNotCompareAndPutSuccessfullyAfterACompareAndRemove() throws Exception {
+ Ref oldRef = refOf(AN_OBJECT_ID_1);
+ String projectName = RefFixture.A_TEST_PROJECT_NAME;
+
+ marshaller.create(new ZkRefInfo(projectName, oldRef));
+
+ zkSharedRefDatabase.compareAndRemove(projectName, oldRef);
+ assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, refOf(AN_OBJECT_ID_2)))
+ .isFalse();
+ }
+
+ private Ref refOf(ObjectId objectId) {
+ return zkSharedRefDatabase.newRef(aBranchRef(), objectId);
+ }
+
+ @Override
+ public String testBranch() {
+ return "branch_" + nameRule.getMethodName();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
new file mode 100644
index 0000000..e50cd9b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.hamcrest.CoreMatchers.nullValue;
+
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkRefInfoDAO.CorruptedZkStorageException;
+import java.time.Duration;
+import java.util.Optional;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+
+public class ZkRefInfoMarshallerTest implements RefFixture {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+ @Rule public TestName nameRule = new TestName();
+
+ ZookeeperTestContainerSupport zookeeperContainer;
+ ZkSharedRefDatabase zkSharedRefDatabase;
+ ZkRefInfoDAO marshaller;
+ CuratorFramework curator;
+
+ @Before
+ public void setup() {
+ zookeeperContainer = new ZookeeperTestContainerSupport();
+ zkSharedRefDatabase =
+ new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
+ marshaller = zookeeperContainer.getMarshaller();
+ curator = zookeeperContainer.getCurator();
+ }
+
+ @After
+ public void cleanup() {
+ zookeeperContainer.cleanup();
+ }
+
+ @Test
+ public void shouldCreateAZRefInfo() throws Exception {
+ ZkRefInfo refInfo = aZkRefInfo(AN_OBJECT_ID_1);
+
+ marshaller.create(refInfo);
+
+ Optional<ZkRefInfo> readRefInfo = marshaller.read(refInfo.projectName(), refInfo.refName());
+
+ assertThat(readRefInfo).isEqualTo(Optional.of(refInfo));
+ }
+
+ @Test
+ public void shouldReturnEmptyIfARefDoesNotExist() throws Exception {
+ assertThat(marshaller.read(A_TEST_PROJECT_NAME, aBranchRef())).isEqualTo(Optional.empty());
+ }
+
+ @Test
+ public void shouldUpdateAZrefInfo() throws Exception {
+ ZkRefInfo newRefInfo = aZkRefInfo(AN_OBJECT_ID_1);
+ ZkRefInfo updateRefInfo =
+ new ZkRefInfo(newRefInfo.projectName(), newRefInfo.refName(), AN_OBJECT_ID_2);
+
+ marshaller.create(newRefInfo);
+ marshaller.update(updateRefInfo);
+
+ Optional<ZkRefInfo> readUpdatedRefInfo =
+ marshaller.read(updateRefInfo.projectName(), updateRefInfo.refName());
+
+ assertThat(readUpdatedRefInfo).isEqualTo(Optional.of(updateRefInfo));
+ }
+
+ @Test
+ public void shouldFailToReadZkRefInfoIfSomeOfTheInfoIsMissing() throws Exception {
+ String projectName = A_TEST_PROJECT_NAME;
+ String refName = aBranchRef();
+
+ curator.createContainers(ZkRefInfoDAO.pathFor(projectName, refName));
+
+ expectedException.expect(CorruptedZkStorageException.class);
+ expectedException.expectCause(nullValue(Exception.class));
+
+ marshaller.read(projectName, refName);
+ }
+
+ @Override
+ public String testBranch() {
+ return "branch_" + nameRule.getMethodName();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
new file mode 100644
index 0000000..dff2aab
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import org.apache.curator.framework.CuratorFramework;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Ignore;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+@Ignore
+public class ZookeeperTestContainerSupport {
+
+ static class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
+ public static String ZOOKEEPER_VERSION = "3.4.13";
+
+ public ZookeeperContainer() {
+ super("zookeeper:" + ZOOKEEPER_VERSION);
+ }
+ }
+
+ private ZookeeperContainer container;
+ private ZkRefInfoDAO marshaller;
+ private Configuration configuration;
+ private CuratorFramework curator;
+
+ public CuratorFramework getCurator() {
+ return curator;
+ }
+
+ public ZookeeperContainer getContainer() {
+ return container;
+ }
+
+ public ZkRefInfoDAO getMarshaller() {
+ return marshaller;
+ }
+
+ public Configuration getConfig() {
+ return configuration;
+ }
+
+ public ZookeeperTestContainerSupport() {
+ container = new ZookeeperContainer();
+ container.addExposedPorts(2181);
+ container.waitingFor(Wait.forListeningPort());
+ container.start();
+ Integer zkHostPort = container.getMappedPort(2181);
+ Config splitBrainconfig = new Config();
+ String connectString = "localhost:" + zkHostPort;
+ splitBrainconfig.setBoolean("split-brain", null, "enabled", true);
+ splitBrainconfig.setString("split-brain", "zookeeper", "connectString", connectString);
+
+ configuration = new Configuration(splitBrainconfig);
+ this.curator = configuration.getSplitBrain().getZookeeper().buildCurator();
+
+ this.marshaller = new ZkRefInfoDAO(this.curator);
+ }
+
+ public void cleanup() {
+ this.curator.delete();
+ this.container.stop();
+ }
+}