Simplify the Zookeeper-based SharedDB code
Store the ObjectId instead of a complex object, massively
reduce the amount of code needed.
Feature: Issue 10554
Change-Id: I050d85584e0970f18e09c3107ca4e392affb6027
diff --git a/BUILD b/BUILD
index e132e67..c2c508b 100644
--- a/BUILD
+++ b/BUILD
@@ -53,6 +53,7 @@
"@curator-framework//jar",
"@curator-recipes//jar",
"@curator-test//jar",
+ "@curator-client//jar",
"@zookeeper//jar",
],
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index da4ea93..8bb2a8b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -25,7 +25,6 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -34,6 +33,7 @@
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
@@ -419,6 +419,9 @@
private final int DEFAULT_RETRY_POLICY_BASE_SLEEP_TIME_MS = 1000;
private final int DEFAULT_RETRY_POLICY_MAX_SLEEP_TIME_MS = 3000;
private final int DEFAULT_RETRY_POLICY_MAX_RETRIES = 3;
+ private final int DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = 100;
+ private final int DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = 300;
+ private final int DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES = 3;
static {
CuratorFrameworkFactory.Builder b = CuratorFrameworkFactory.builder();
@@ -435,6 +438,9 @@
public static final String KEY_RETRY_POLICY_MAX_RETRIES = "retryPolicyMaxRetries";
public static final String KEY_LOCK_TIMEOUT_MS = "lockTimeoutMs";
public static final String KEY_ROOT_NODE = "rootNode";
+ public final String KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS = "casRetryPolicyBaseSleepTimeMs";
+ public final String KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS = "casRetryPolicyMaxSleepTimeMs";
+ public final String KEY_CAS_RETRY_POLICY_MAX_RETRIES = "casRetryPolicyMaxRetries";
private final String connectionString;
private final String root;
@@ -443,14 +449,12 @@
private final int baseSleepTimeMs;
private final int maxSleepTimeMs;
private final int maxRetries;
+ private final int casBaseSleepTimeMs;
+ private final int casMaxSleepTimeMs;
+ private final int casMaxRetries;
+
private CuratorFramework build;
- public Duration getLockTimeout() {
- return lockTimeout;
- }
-
- private final Duration lockTimeout;
-
private Zookeeper(Config cfg) {
connectionString =
getString(cfg, SplitBrain.SECTION, SUBSECTION, KEY_CONNECT_STRING, DEFAULT_ZK_CONNECT);
@@ -490,14 +494,29 @@
KEY_RETRY_POLICY_MAX_RETRIES,
DEFAULT_RETRY_POLICY_MAX_RETRIES);
- lockTimeout =
- Duration.ofMillis(
- getInt(
- cfg,
- SplitBrain.SECTION,
- SUBSECTION,
- KEY_LOCK_TIMEOUT_MS,
- DEFAULT_LOCK_TIMEOUT_MS));
+ casBaseSleepTimeMs =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS,
+ DEFAULT_CAS_RETRY_POLICY_BASE_SLEEP_TIME_MS);
+
+ casMaxSleepTimeMs =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS,
+ DEFAULT_CAS_RETRY_POLICY_MAX_SLEEP_TIME_MS);
+
+ casMaxRetries =
+ getInt(
+ cfg,
+ SplitBrain.SECTION,
+ SUBSECTION,
+ KEY_CAS_RETRY_POLICY_MAX_RETRIES,
+ DEFAULT_CAS_RETRY_POLICY_MAX_RETRIES);
checkArgument(StringUtils.isNotEmpty(connectionString), "zookeeper.%s contains no servers");
}
@@ -518,6 +537,11 @@
return this.build;
}
+
+ public RetryPolicy buildCasRetryPolicy() {
+ return new BoundedExponentialBackoffRetry(
+ casBaseSleepTimeMs, casMaxSleepTimeMs, casMaxRetries);
+ }
}
public static class SplitBrain {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index c1b80eb..b3f99a6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -17,12 +17,8 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.git.validators.RefOperationValidationListener;
import com.google.inject.AbstractModule;
-import com.google.inject.name.Names;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase;
-import java.time.Duration;
-import org.apache.curator.framework.CuratorFramework;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkValidationModule;
public class ValidationModule extends AbstractModule {
@@ -36,10 +32,6 @@
protected void configure() {
DynamicSet.bind(binder(), RefOperationValidationListener.class).to(InSyncChangeValidator.class);
- bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
- bind(CuratorFramework.class).toInstance(cfg.getSplitBrain().getZookeeper().buildCurator());
- bind(Duration.class)
- .annotatedWith(Names.named("ZkLockTimeout"))
- .toInstance(cfg.getSplitBrain().getZookeeper().getLockTimeout());
+ install(new ZkValidationModule(cfg));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
deleted file mode 100644
index ab34aca..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfo.java
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// Copyright (C) 2018 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-
-import com.google.common.base.Objects;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.Ref;
-
-public class ZkRefInfo {
-
- private final String refName;
- private final String projectName;
- private final ObjectId objectId;
-
- public ZkRefInfo(final String projectName, final String refName, final ObjectId objectId) {
- this.projectName = projectName;
- this.objectId = objectId;
- this.refName = refName;
- }
-
- public ZkRefInfo(final String projectName, final Ref ref) {
- this(projectName, ref.getName(), ref.getObjectId());
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null || getClass() != other.getClass()) {
- return false;
- }
- ZkRefInfo zkRefInfo = (ZkRefInfo) other;
- return Objects.equal(refName, zkRefInfo.refName)
- && Objects.equal(projectName, zkRefInfo.projectName)
- && Objects.equal(objectId, zkRefInfo.objectId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(refName, projectName, objectId);
- }
-
- public String refName() {
- return refName;
- }
-
- public String projectName() {
- return projectName;
- }
-
- public ObjectId objectId() {
- return objectId;
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
deleted file mode 100644
index 85d8227..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoDAO.java
+++ /dev/null
@@ -1,141 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// Copyright (C) 2018 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-
-import static org.apache.zookeeper.CreateMode.PERSISTENT;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.PathAndBytesable;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.Ref;
-
-public class ZkRefInfoDAO {
- public static final String OBJECT_ID_PATH = "objectId";
-
- public static String pathFor(ZkRefInfo info) {
- return "/" + info.projectName() + "/" + info.refName();
- }
-
- public static String pathFor(String projectName, Ref ref) {
- return pathFor(projectName, ref.getName());
- }
-
- public static String pathFor(String projectName, String refName) {
- return "/" + projectName + "/" + refName;
- }
-
- private final CuratorFramework client;
-
- public ZkRefInfoDAO(CuratorFramework client) {
- this.client = client;
- }
-
- public Optional<ZkRefInfo> read(String projectName, String refName) throws Exception {
- final String rootPath = pathFor(projectName, refName);
-
- if (!exists(rootPath)) {
- return Optional.empty();
- }
-
- final ObjectId objectId = readObjectIdAt(rootPath + "/" + OBJECT_ID_PATH);
-
- return Optional.of(new ZkRefInfo(projectName, refName, objectId));
- }
-
- public void update(ZkRefInfo info) throws Exception {
- writeInTransaction(info, () -> client.transactionOp().setData());
- }
-
- public void create(ZkRefInfo info) throws Exception {
- client.createContainers(pathFor(info));
- writeInTransaction(info, () -> client.transactionOp().create().withMode(PERSISTENT));
- }
-
- private void writeInTransaction(
- ZkRefInfo info, Supplier<PathAndBytesable<CuratorOp>> writeOpBuilderSupplier)
- throws Exception {
- String commonPath = pathFor(info);
- final List<CuratorTransactionResult> curatorTransactionResults =
- client
- .transaction()
- .forOperations(
- writeOpBuilderSupplier
- .get()
- .forPath(commonPath + "/" + OBJECT_ID_PATH, writeObjectId(info.objectId())));
-
- for (CuratorTransactionResult result : curatorTransactionResults) {
- if (result.getError() != 0)
- throw new IOException(
- String.format(
- "Error with code %d trying to write path %s ",
- result.getError(), result.getForPath()));
- }
- }
-
- private boolean exists(String path) throws Exception {
- return client.checkExists().forPath(path) != null;
- }
-
- private ObjectId readObjectIdAt(String path) throws Exception {
- Optional<ObjectId> objectId = parseAt(path, ObjectId::fromRaw);
- if (!objectId.isPresent()) {
- throw new CorruptedZkStorageException(
- String.format("Not able to read objectId from zookeeper path %s", path));
- }
- return objectId.get();
- }
-
- private byte[] writeObjectId(ObjectId value) throws IOException {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- final DataOutputStream stream = new DataOutputStream(out);
- value.copyRawTo(stream);
- return out.toByteArray();
- }
-
- private <T> Optional<T> parseAt(String path, Function<byte[], T> parser) throws Exception {
- if (client.checkExists().forPath(path) == null) return Optional.empty();
- return Optional.ofNullable(client.getData().forPath(path)).map(parser);
- }
-
- static class CorruptedZkStorageException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public CorruptedZkStorageException(String message) {
- super(message);
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
index 7f858c4..c1272a7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
@@ -14,18 +14,18 @@
package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
+import com.google.common.base.MoreObjects;
import com.google.common.flogger.FluentLogger;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
import javax.inject.Named;
+import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.recipes.locks.Locker;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
@@ -33,140 +33,62 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final CuratorFramework client;
- private final Duration lockTimeout;
+ private final RetryPolicy retryPolicy;
@Inject
public ZkSharedRefDatabase(
- CuratorFramework client, @Named("ZkLockTimeout") Duration lockTimeout) {
+ CuratorFramework client, @Named("ZkLockRetryPolicy") RetryPolicy retryPolicy) {
this.client = client;
- this.lockTimeout = lockTimeout;
+ this.retryPolicy = retryPolicy;
}
@Override
public boolean compareAndRemove(String project, Ref oldRef) throws IOException {
- return compareAndPut(project, oldRef, TombstoneRef.forRef(oldRef));
+ return compareAndPut(project, oldRef, NULL_REF);
}
@Override
public boolean compareAndPut(String projectName, Ref oldRef, Ref newRef) throws IOException {
- boolean isCreate = oldRef == NULL_REF;
+ final DistributedAtomicValue distributedRefValue =
+ new DistributedAtomicValue(client, pathFor(projectName, oldRef, newRef), retryPolicy);
- final ZkRefInfoDAO marshaller = new ZkRefInfoDAO(client);
-
- final InterProcessMutex refPathMutex =
- new InterProcessMutex(client, "/locks" + ZkRefInfoDAO.pathFor(projectName, newRef));
-
- try (Locker locker = new Locker(refPathMutex, lockTimeout.toMillis(), MILLISECONDS)) {
- final Optional<ZkRefInfo> infoCurrentlyInZkMaybe =
- marshaller.read(projectName, newRef.getName());
- final ZkRefInfo newRefInfo = new ZkRefInfo(projectName, newRef);
-
- if (isCreate) {
- return doCreate(marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+ try {
+ if (oldRef == NULL_REF) {
+ return distributedRefValue.initialize(writeObjectId(newRef.getObjectId()));
}
- return doUpdate(oldRef, marshaller, infoCurrentlyInZkMaybe, newRefInfo);
+ final ObjectId newValue =
+ newRef.getObjectId() == null ? ObjectId.zeroId() : newRef.getObjectId();
+ final AtomicValue<byte[]> newDistributedValue =
+ distributedRefValue.compareAndSet(
+ writeObjectId(oldRef.getObjectId()), writeObjectId(newValue));
+ return newDistributedValue.succeeded();
} catch (Exception e) {
+ logger.atWarning().withCause(e).log(
+ "Error trying to perform CAS at path %s", pathFor(projectName, oldRef, newRef));
throw new IOException(
String.format(
- "Error trying to perform CAS at path %s", ZkRefInfoDAO.pathFor(projectName, newRef)),
+ "Error trying to perform CAS at path %s", pathFor(projectName, oldRef, newRef)),
e);
}
}
- private boolean doUpdate(
- Ref oldRef,
- ZkRefInfoDAO marshaller,
- Optional<ZkRefInfo> infoCurrentlyInZkMaybe,
- ZkRefInfo newRefInfo)
- throws Exception {
- if (!infoCurrentlyInZkMaybe.isPresent()) {
- logger.atWarning().log(
- "Asked to update ref %s but it is not in ZK at path %s",
- newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
- return false;
- }
-
- if (!infoCurrentlyInZkMaybe.get().objectId().equals(oldRef.getObjectId())) {
- logger.atWarning().log(
- "Old Ref %s does not match the current Rf content in Zookeeper %s. Not applying the update.",
- oldRef.getObjectId(), infoCurrentlyInZkMaybe.get().objectId());
- return false;
- }
-
- marshaller.update(newRefInfo);
-
- return true;
+ static String pathFor(String projectName, Ref oldRef, Ref newRef) {
+ return pathFor(projectName, MoreObjects.firstNonNull(oldRef.getName(), newRef.getName()));
}
- private boolean doCreate(
- ZkRefInfoDAO marshaller, Optional<ZkRefInfo> infoCurrentlyInZkMaybe, ZkRefInfo newRefInfo)
- throws Exception {
- if (infoCurrentlyInZkMaybe.isPresent()) {
- logger.atWarning().log(
- "Asked to create ref %s but it is already in ZK at path %s",
- newRefInfo.refName(), ZkRefInfoDAO.pathFor(newRefInfo));
- return false;
- }
-
- marshaller.create(newRefInfo);
-
- return true;
+ static String pathFor(String projectName, String refName) {
+ return "/" + projectName + "/" + refName;
}
- /**
- * When deleting a Ref this temporary Ref Tombstone will be created and then cleaned-up at a later
- * stage by the garbage collection
- */
- static class TombstoneRef implements Ref {
- static TombstoneRef forRef(final Ref targetRef) {
- return new TombstoneRef(targetRef.getName());
- }
+ static ObjectId readObjectId(byte[] value) {
+ return ObjectId.fromRaw(value);
+ }
- private final String name;
-
- private TombstoneRef(String name) {
- this.name = name;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public boolean isSymbolic() {
- return false;
- }
-
- @Override
- public Ref getLeaf() {
- return null;
- }
-
- @Override
- public Ref getTarget() {
- return null;
- }
-
- @Override
- public ObjectId getObjectId() {
- return ObjectId.zeroId();
- }
-
- @Override
- public ObjectId getPeeledObjectId() {
- return null;
- }
-
- @Override
- public boolean isPeeled() {
- return false;
- }
-
- @Override
- public Storage getStorage() {
- return Storage.NETWORK;
- }
+ static byte[] writeObjectId(ObjectId value) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final DataOutputStream stream = new DataOutputStream(out);
+ value.copyRawTo(stream);
+ return out.toByteArray();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
new file mode 100644
index 0000000..d4a3126
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkValidationModule.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+
+public class ZkValidationModule extends AbstractModule {
+
+ private Configuration cfg;
+
+ public ZkValidationModule(Configuration cfg) {
+ this.cfg = cfg;
+ }
+
+ @Override
+ protected void configure() {
+ bind(SharedRefDatabase.class).to(ZkSharedRefDatabase.class);
+ bind(CuratorFramework.class).toInstance(cfg.getSplitBrain().getZookeeper().buildCurator());
+ bind(RetryPolicy.class)
+ .annotatedWith(Names.named("ZkLockRetryPolicy"))
+ .toInstance(cfg.getSplitBrain().getZookeeper().buildCasRetryPolicy());
+ }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index a66383b..ca3f87c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -45,16 +45,18 @@
[split-brain]
enabled = true
-
+
[split-brain "zookeeper"]
connectString = "localhost:2181"
- rootNode = "/gerrit/multi-site"
+ rootNode = "/gerrit/multi-site"
sessionTimeoutMs = 1000
connectionTimeoutMs = 1000
retryPolicyBaseSleepTimeMs = 1000
retryPolicyMaxSleepTimeMs = 3000
retryPolicyMaxRetries = 3
- lockTimeoutMs = 10000
+ casRetryPolicyBaseSleepTimeMs = 100
+ casRetryPolicyMaxSleepTimeMs = 100
+ casRetryPolicyMaxRetries = 3
```
## Configuration parameters
@@ -136,7 +138,7 @@
: Enable publication of project list events, ignored when `kafka.publisher.enabled` is false
Defaults: true
-```kafka.publisher.streamEventEnabled```
+```kafka.publisher.streamEventEnabled```
: Enable publication of stream events, ignored when `kafka.publisher.enabled` is false
Defaults: true
@@ -195,10 +197,21 @@
used for the Zookeeper connection
Defaults: 3
-```split-brain.zookeeper.lockTimeoutMs```
-: Configuration for InterProcessMutex lock timeout
- Defaults: 10000
-
+```split-brain.zookeeper.casRetryPolicyBaseSleepTimeMs```
+: Configuration for the base sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Compare and Swap operations on Zookeeper
+ Defaults: 1000
+
+```split-brain.zookeeper.casRetryPolicyMaxSleepTimeMs```
+: Configuration for the max sleep timeout (iun ms) to use to create the BoundedExponentialBackoffRetry policy
+used for the Compare and Swap operations on Zookeeper
+ Defaults: 3000
+
+```split-brain.zookeeper.casRetryPolicyMaxRetries```
+: Configuration for the max number of retries to use to create the BoundedExponentialBackoffRetry policy
+used for the Compare and Swap operations on Zookeeper
+ Defaults: 3
+
#### Custom kafka properties:
In addition to the above settings, custom Kafka properties can be explicitly set for `publisher` and `subscriber`.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
index 43377f8..fb63793 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationIT.java
@@ -95,6 +95,7 @@
// run - T0). Using "refs/heads/master2" in this test for now
final PushOneCommit.Result change =
createCommitAndPush(testRepo, "refs/heads/master2", "msg", "file", "content");
+
change.assertOkStatus();
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
index 1e27337..f2b0493 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/RefFixture.java
@@ -43,10 +43,6 @@
static final ObjectId AN_OBJECT_ID_2 = new ObjectId(1, 2, 3, 4, 6);
static final ObjectId AN_OBJECT_ID_3 = new ObjectId(1, 2, 3, 4, 7);
- default ZkRefInfo aZkRefInfo(ObjectId objectId) {
- return new ZkRefInfo(A_TEST_PROJECT_NAME, aBranchRef(), objectId);
- }
-
default String aBranchRef() {
return RefNames.REFS_HEADS + testBranch();
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
deleted file mode 100644
index e50cd9b..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkRefInfoMarshallerTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.hamcrest.CoreMatchers.nullValue;
-
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkRefInfoDAO.CorruptedZkStorageException;
-import java.time.Duration;
-import java.util.Optional;
-import org.apache.curator.framework.CuratorFramework;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TestName;
-
-public class ZkRefInfoMarshallerTest implements RefFixture {
- @Rule public ExpectedException expectedException = ExpectedException.none();
- @Rule public TestName nameRule = new TestName();
-
- ZookeeperTestContainerSupport zookeeperContainer;
- ZkSharedRefDatabase zkSharedRefDatabase;
- ZkRefInfoDAO marshaller;
- CuratorFramework curator;
-
- @Before
- public void setup() {
- zookeeperContainer = new ZookeeperTestContainerSupport();
- zkSharedRefDatabase =
- new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
- marshaller = zookeeperContainer.getMarshaller();
- curator = zookeeperContainer.getCurator();
- }
-
- @After
- public void cleanup() {
- zookeeperContainer.cleanup();
- }
-
- @Test
- public void shouldCreateAZRefInfo() throws Exception {
- ZkRefInfo refInfo = aZkRefInfo(AN_OBJECT_ID_1);
-
- marshaller.create(refInfo);
-
- Optional<ZkRefInfo> readRefInfo = marshaller.read(refInfo.projectName(), refInfo.refName());
-
- assertThat(readRefInfo).isEqualTo(Optional.of(refInfo));
- }
-
- @Test
- public void shouldReturnEmptyIfARefDoesNotExist() throws Exception {
- assertThat(marshaller.read(A_TEST_PROJECT_NAME, aBranchRef())).isEqualTo(Optional.empty());
- }
-
- @Test
- public void shouldUpdateAZrefInfo() throws Exception {
- ZkRefInfo newRefInfo = aZkRefInfo(AN_OBJECT_ID_1);
- ZkRefInfo updateRefInfo =
- new ZkRefInfo(newRefInfo.projectName(), newRefInfo.refName(), AN_OBJECT_ID_2);
-
- marshaller.create(newRefInfo);
- marshaller.update(updateRefInfo);
-
- Optional<ZkRefInfo> readUpdatedRefInfo =
- marshaller.read(updateRefInfo.projectName(), updateRefInfo.refName());
-
- assertThat(readUpdatedRefInfo).isEqualTo(Optional.of(updateRefInfo));
- }
-
- @Test
- public void shouldFailToReadZkRefInfoIfSomeOfTheInfoIsMissing() throws Exception {
- String projectName = A_TEST_PROJECT_NAME;
- String refName = aBranchRef();
-
- curator.createContainers(ZkRefInfoDAO.pathFor(projectName, refName));
-
- expectedException.expect(CorruptedZkStorageException.class);
- expectedException.expectCause(nullValue(Exception.class));
-
- marshaller.read(projectName, refName);
- }
-
- @Override
- public String testBranch() {
- return "branch_" + nameRule.getMethodName();
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabaseTest.java
similarity index 75%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabaseTest.java
index bc3323c..b5ac1e4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZKRefsSharedDatabaseTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabaseTest.java
@@ -29,10 +29,8 @@
import static com.google.common.truth.Truth.assertThat;
-import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.TombstoneRef;
import java.io.IOException;
-import java.time.Duration;
-import java.util.Optional;
+import org.apache.curator.retry.RetryNTimes;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.junit.After;
@@ -41,19 +39,17 @@
import org.junit.Test;
import org.junit.rules.TestName;
-public class ZKRefsSharedDatabaseTest implements RefFixture {
+public class ZkSharedRefDatabaseTest implements RefFixture {
@Rule public TestName nameRule = new TestName();
ZookeeperTestContainerSupport zookeeperContainer;
ZkSharedRefDatabase zkSharedRefDatabase;
- ZkRefInfoDAO marshaller;
@Before
public void setup() {
zookeeperContainer = new ZookeeperTestContainerSupport();
zkSharedRefDatabase =
- new ZkSharedRefDatabase(zookeeperContainer.getCurator(), Duration.ofMinutes(10));
- marshaller = zookeeperContainer.getMarshaller();
+ new ZkSharedRefDatabase(zookeeperContainer.getCurator(), new RetryNTimes(5, 30));
}
@After
@@ -62,24 +58,34 @@
}
@Test
+ public void shouldCompareAndCreateSuccessfully() throws Exception {
+ Ref ref = refOf(AN_OBJECT_ID_1);
+
+ assertThat(zkSharedRefDatabase.compareAndCreate(A_TEST_PROJECT_NAME, ref)).isTrue();
+
+ assertThat(zookeeperContainer.readRefValueFromZk(A_TEST_PROJECT_NAME, ref))
+ .isEqualTo(ref.getObjectId());
+ }
+
+ @Test
public void shouldCompareAndPutSuccessfully() throws Exception {
Ref oldRef = refOf(AN_OBJECT_ID_1);
Ref newRef = refOf(AN_OBJECT_ID_2);
- String projectName = RefFixture.A_TEST_PROJECT_NAME;
+ String projectName = A_TEST_PROJECT_NAME;
- marshaller.create(new ZkRefInfo(projectName, oldRef));
+ zookeeperContainer.createRefInZk(projectName, oldRef);
assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, newRef)).isTrue();
}
@Test
public void compareAndPutShouldFailIfTheObjectionHasNotTheExpectedValue() throws Exception {
- String projectName = RefFixture.A_TEST_PROJECT_NAME;
+ String projectName = A_TEST_PROJECT_NAME;
Ref oldRef = refOf(AN_OBJECT_ID_1);
Ref expectedRef = refOf(AN_OBJECT_ID_2);
- marshaller.create(new ZkRefInfo(projectName, oldRef));
+ zookeeperContainer.createRefInZk(projectName, oldRef);
assertThat(zkSharedRefDatabase.compareAndPut(projectName, expectedRef, refOf(AN_OBJECT_ID_3)))
.isFalse();
@@ -89,17 +95,16 @@
public void compareAndPutShouldFaiIfTheObjectionDoesNotExist() throws IOException {
Ref oldRef = refOf(AN_OBJECT_ID_1);
assertThat(
- zkSharedRefDatabase.compareAndPut(
- RefFixture.A_TEST_PROJECT_NAME, oldRef, refOf(AN_OBJECT_ID_2)))
+ zkSharedRefDatabase.compareAndPut(A_TEST_PROJECT_NAME, oldRef, refOf(AN_OBJECT_ID_2)))
.isFalse();
}
@Test
public void shouldCompareAndRemoveSuccessfully() throws Exception {
Ref oldRef = refOf(AN_OBJECT_ID_1);
- String projectName = RefFixture.A_TEST_PROJECT_NAME;
+ String projectName = A_TEST_PROJECT_NAME;
- marshaller.create(new ZkRefInfo(projectName, oldRef));
+ zookeeperContainer.createRefInZk(projectName, oldRef);
assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
}
@@ -107,23 +112,22 @@
@Test
public void shouldReplaceTheRefWithATombstoneAfterCompareAndPutRemove() throws Exception {
Ref oldRef = refOf(AN_OBJECT_ID_1);
- String projectName = RefFixture.A_TEST_PROJECT_NAME;
+ String projectName = A_TEST_PROJECT_NAME;
- marshaller.create(new ZkRefInfo(projectName, oldRef));
+ zookeeperContainer.createRefInZk(projectName, oldRef);
assertThat(zkSharedRefDatabase.compareAndRemove(projectName, oldRef)).isTrue();
- Optional<ZkRefInfo> inZk = marshaller.read(projectName, oldRef.getName());
- assertThat(inZk.isPresent()).isTrue();
- inZk.get().equals(TombstoneRef.forRef(oldRef));
+ assertThat(zookeeperContainer.readRefValueFromZk(projectName, oldRef))
+ .isEqualTo(ObjectId.zeroId());
}
@Test
public void shouldNotCompareAndPutSuccessfullyAfterACompareAndRemove() throws Exception {
Ref oldRef = refOf(AN_OBJECT_ID_1);
- String projectName = RefFixture.A_TEST_PROJECT_NAME;
+ String projectName = A_TEST_PROJECT_NAME;
- marshaller.create(new ZkRefInfo(projectName, oldRef));
+ zookeeperContainer.createRefInZk(projectName, oldRef);
zkSharedRefDatabase.compareAndRemove(projectName, oldRef);
assertThat(zkSharedRefDatabase.compareAndPut(projectName, oldRef, refOf(AN_OBJECT_ID_2)))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
index dff2aab..3d041ae 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZookeeperTestContainerSupport.java
@@ -27,9 +27,15 @@
package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper;
+import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase.NULL_REF;
+import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.pathFor;
+import static com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.ZkSharedRefDatabase.writeObjectId;
+
import com.googlesource.gerrit.plugins.multisite.Configuration;
import org.apache.curator.framework.CuratorFramework;
import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
import org.junit.Ignore;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
@@ -46,7 +52,6 @@
}
private ZookeeperContainer container;
- private ZkRefInfoDAO marshaller;
private Configuration configuration;
private CuratorFramework curator;
@@ -58,18 +63,13 @@
return container;
}
- public ZkRefInfoDAO getMarshaller() {
- return marshaller;
- }
-
public Configuration getConfig() {
return configuration;
}
+ @SuppressWarnings("resource")
public ZookeeperTestContainerSupport() {
- container = new ZookeeperContainer();
- container.addExposedPorts(2181);
- container.waitingFor(Wait.forListeningPort());
+ container = new ZookeeperContainer().withExposedPorts(2181).waitingFor(Wait.forListeningPort());
container.start();
Integer zkHostPort = container.getMappedPort(2181);
Config splitBrainconfig = new Config();
@@ -79,12 +79,22 @@
configuration = new Configuration(splitBrainconfig);
this.curator = configuration.getSplitBrain().getZookeeper().buildCurator();
-
- this.marshaller = new ZkRefInfoDAO(this.curator);
}
public void cleanup() {
this.curator.delete();
this.container.stop();
}
+
+ public ObjectId readRefValueFromZk(String projectName, Ref ref) throws Exception {
+ final byte[] bytes = curator.getData().forPath(pathFor(projectName, NULL_REF, ref));
+ return ZkSharedRefDatabase.readObjectId(bytes);
+ }
+
+ public void createRefInZk(String projectName, Ref ref) throws Exception {
+ curator
+ .create()
+ .creatingParentContainersIfNeeded()
+ .forPath(pathFor(projectName, NULL_REF, ref), writeObjectId(ref.getObjectId()));
+ }
}