Merge branch 'stable-3.4' into stable-3.5
Also set target branch to stable-3.5
* stable-3.4:
Schedule reindexing when patchset ref is missing
Allow auto-refresh of replication lags for outdated repos
Add test for replication_lag update on ref-replicated event
Remove exceptions declared in throws but never thrown
Reload latest SHA1 from local when filtering RemoteRefUpdate
Mention that Gerrit's instanceId is a mandatory requirement
Restore tests accidentally removed
Exclude event-broker and global-refdb from shaded plugin jar
Download websession broker from its own branch
Fix issue with disabling ref-database
Bump wurstmeister/kafka to 2.13-2.6.3
Change-Id: Ib6aa8adbd01d0294653062bc5d7e92c46bf7ad69
diff --git a/BUILD b/BUILD
index d7bd3d8..de84a70 100644
--- a/BUILD
+++ b/BUILD
@@ -19,8 +19,8 @@
resources = glob(["src/main/resources/**/*"]),
deps = [
":replication-neverlink",
- "@events-broker//jar",
- "@global-refdb//jar",
+ "@events-broker//jar:neverlink",
+ "@global-refdb//jar:neverlink",
],
)
@@ -74,12 +74,6 @@
srcs = [
"e2e-tests/test.sh",
],
- data = [
- "//plugins/multi-site",
- "//plugins/multi-site:e2e_multi_site_test_dir",
- "//plugins/multi-site:e2e_multi_site_setup_local_env_dir",
- "external_plugin_deps.bzl",
- ] + glob(["setup_local_env/**/*"]) + glob(["e2e-tests/**/*"]),
args = [
"--multisite-lib-file $(location //plugins/multi-site)",
"--healthcheck-interval 5s",
@@ -88,6 +82,12 @@
"--location '$(location //plugins/multi-site:e2e_multi_site_test_dir)'",
"--local-env '$(location //plugins/multi-site:e2e_multi_site_setup_local_env_dir)'",
],
+ data = [
+ "//plugins/multi-site",
+ "//plugins/multi-site:e2e_multi_site_test_dir",
+ "//plugins/multi-site:e2e_multi_site_setup_local_env_dir",
+ "external_plugin_deps.bzl",
+ ] + glob(["setup_local_env/**/*"]) + glob(["e2e-tests/**/*"]),
tags = [
"e2e-multi-site",
],
diff --git a/README.md b/README.md
index 79094d3..dcae980 100644
--- a/README.md
+++ b/README.md
@@ -65,6 +65,21 @@
daemon running (/var/run/docker.sock accessible) or a DOCKER_HOST pointing to
a Docker server.
+## Pre-requisites
+
+Each Gerrit server of the cluster must be identified with a globally unique
+[instance-id](https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.5/config-gerrit.html#gerrit.instanceId)
+defined in `$GERRIT_SITE/etc/gerrit.config`.
+When migrating from a multi-site configuration with Gerrit v3.3 or earlier,
+you must reuse the instance-id value stored under `$GERRIT_SITE/data/multi-site`.
+
+Example:
+
+```
+[gerrit]
+ instanceId = 758fe5b7-1869-46e6-942a-3ae0ae7e3bd2
+```
+
## How to configure
Install the multi-site plugin into the `$GERRIT_SITE/lib` directory of all
diff --git a/docker-compose.kafka-broker.yaml b/docker-compose.kafka-broker.yaml
index 927b4d4..d3fc713 100644
--- a/docker-compose.kafka-broker.yaml
+++ b/docker-compose.kafka-broker.yaml
@@ -5,7 +5,7 @@
ports:
- "2181:2181"
kafka:
- image: wurstmeister/kafka:2.12-2.1.0
+ image: wurstmeister/kafka:2.13-2.6.3
ports:
- "9092:9092"
environment:
diff --git a/e2e-tests/docker-compose-kafka.yaml b/e2e-tests/docker-compose-kafka.yaml
index da39cca..addb8d5 100644
--- a/e2e-tests/docker-compose-kafka.yaml
+++ b/e2e-tests/docker-compose-kafka.yaml
@@ -1,7 +1,7 @@
version: '3'
services:
kafka:
- image: wurstmeister/kafka:2.12-2.1.0
+ image: wurstmeister/kafka:2.13-2.6.3
depends_on:
- zookeeper
environment:
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index abdc5c5..5491ddd 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,7 +16,7 @@
LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=master
+GERRIT_BRANCH=stable-3.5
GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
@@ -228,7 +228,7 @@
{ echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
+wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
-O $COMMON_PLUGINS/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Aborting"; exit 1; }
echo "Downloading healthcheck plugin $GERRIT_BRANCH"
diff --git a/setup_local_env/docker-compose-kafka.yaml b/setup_local_env/docker-compose-kafka.yaml
index 8a31502..2a5b0fc 100644
--- a/setup_local_env/docker-compose-kafka.yaml
+++ b/setup_local_env/docker-compose-kafka.yaml
@@ -1,7 +1,7 @@
version: '3'
services:
kafka:
- image: wurstmeister/kafka:2.12-2.1.0
+ image: wurstmeister/kafka:2.13-2.6.3
ports:
- "9092:9092"
container_name: kafka_test_node
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 8497882..ee2932a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -23,15 +23,18 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -50,6 +53,10 @@
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
static final int DEFAULT_THREAD_POOL_SIZE = 4;
+ private static final String REF_DATABASE = "ref-database";
+ private static final String REPLICATION_LAG_REFRESH_INTERVAL = "replicationLagRefreshInterval";
+ private static final Duration REPLICATION_LAG_REFRESH_INTERVAL_DEFAULT = Duration.ofSeconds(60);
+
private static final String REPLICATION_CONFIG = "replication.config";
// common parameters to cache and index sections
private static final int DEFAULT_INDEX_MAX_TRIES = 2;
@@ -67,6 +74,7 @@
private final Supplier<Collection<Message>> replicationConfigValidation;
private final Supplier<Broker> broker;
private final Config multiSiteConfig;
+ private final Supplier<Duration> replicationLagRefreshInterval;
@Inject
Configuration(SitePaths sitePaths) {
@@ -88,6 +96,17 @@
new SharedRefDbConfiguration(
enableSharedRefDbByDefault(lazyMultiSiteCfg.get()), PLUGIN_NAME));
broker = memoize(() -> new Broker(lazyMultiSiteCfg));
+ replicationLagRefreshInterval =
+ memoize(
+ () ->
+ Duration.ofMillis(
+ ConfigUtil.getTimeUnit(
+ lazyMultiSiteCfg.get(),
+ REF_DATABASE,
+ null,
+ REPLICATION_LAG_REFRESH_INTERVAL,
+ REPLICATION_LAG_REFRESH_INTERVAL_DEFAULT.toMillis(),
+ TimeUnit.MILLISECONDS)));
}
public Config getMultiSiteConfig() {
@@ -118,6 +137,10 @@
return projects.get();
}
+ public Duration replicationLagRefreshInterval() {
+ return replicationLagRefreshInterval.get();
+ }
+
public Collection<Message> validate() {
return replicationConfigValidation.get();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java
index 87e9d22..38cddd5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/GitModule.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbConfiguration;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.validation.ValidationModule;
@@ -28,6 +29,8 @@
@Override
protected void configure() {
+ bind(SharedRefDbConfiguration.class).toInstance(config.getSharedRefDbConfiguration());
+ bind(ProjectVersionLogger.class).to(Log4jProjectVersionLogger.class);
if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
install(new ValidationModule(config));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 20aaef6..ea14e64 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -18,6 +18,7 @@
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Scopes;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
@@ -27,11 +28,13 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
public class PluginModule extends LifecycleModule {
- private Configuration config;
+ private final Configuration config;
+ private final WorkQueue workQueue;
@Inject
- public PluginModule(Configuration config) {
+ public PluginModule(Configuration config, WorkQueue workQueue) {
this.config = config;
+ this.workQueue = workQueue;
}
@Override
@@ -42,7 +45,7 @@
install(new BrokerForwarderModule());
listener().to(MultiSiteConsumerRunner.class);
- install(new ReplicationStatusModule());
+ install(new ReplicationStatusModule(workQueue));
if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
listener().to(PluginStartup.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
index 40168c5..acdd76e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -23,12 +23,14 @@
import com.google.gerrit.server.cache.CacheModule;
import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.project.ProjectCache;
import com.google.inject.Inject;
import com.google.inject.Module;
-import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import java.util.Collection;
@@ -38,6 +40,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Singleton
@@ -45,36 +49,47 @@
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
- static final String REPLICATION_STATUS_CACHE = "replication_status";
+ static final String REPLICATION_STATUS = "replication_status";
- public static Module cacheModule() {
+ public static Module cacheModule(WorkQueue queue) {
return new CacheModule() {
@Override
protected void configure() {
- persist(REPLICATION_STATUS_CACHE, String.class, Long.class)
+ persist(REPLICATION_STATUS, String.class, Long.class)
.version(1)
.keySerializer(StringCacheSerializer.INSTANCE)
.valueSerializer(new JavaCacheSerializer<>());
+
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(Names.named(REPLICATION_STATUS))
+ .toInstance(queue.createQueue(0, REPLICATION_STATUS));
}
};
}
private final Map<String, Long> localVersionPerProject = new HashMap<>();
private final Cache<String, Long> cache;
- private final Provider<ProjectVersionRefUpdate> projectVersionRefUpdate;
+ private final Optional<ProjectVersionRefUpdate> projectVersionRefUpdate;
private final ProjectVersionLogger verLogger;
private final ProjectCache projectCache;
+ private final ScheduledExecutorService statusScheduler;
+
+ private final Configuration config;
@Inject
public ReplicationStatus(
- @Named(REPLICATION_STATUS_CACHE) Cache<String, Long> cache,
- Provider<ProjectVersionRefUpdate> projectVersionRefUpdate,
+ @Named(REPLICATION_STATUS) Cache<String, Long> cache,
+ Optional<ProjectVersionRefUpdate> projectVersionRefUpdate,
ProjectVersionLogger verLogger,
- ProjectCache projectCache) {
+ ProjectCache projectCache,
+ @Named(REPLICATION_STATUS) ScheduledExecutorService statusScheduler,
+ Configuration config) {
this.cache = cache;
this.projectVersionRefUpdate = projectVersionRefUpdate;
this.verLogger = verLogger;
this.projectCache = projectCache;
+ this.statusScheduler = statusScheduler;
+ this.config = config;
}
public Long getMaxLag() {
@@ -99,9 +114,11 @@
public void updateReplicationLag(Project.NameKey projectName) {
Optional<Long> remoteVersion =
- projectVersionRefUpdate.get().getProjectRemoteVersion(projectName.get());
+ projectVersionRefUpdate.flatMap(
+ refUpdate -> refUpdate.getProjectRemoteVersion(projectName.get()));
Optional<Long> localVersion =
- projectVersionRefUpdate.get().getProjectLocalVersion(projectName.get());
+ projectVersionRefUpdate.flatMap(
+ refUpdate -> refUpdate.getProjectLocalVersion(projectName.get()));
if (remoteVersion.isPresent() && localVersion.isPresent()) {
long lag = remoteVersion.get() - localVersion.get();
@@ -153,6 +170,26 @@
@Override
public void start() {
loadAllFromCache();
+
+ long replicationLagPollingInterval = config.replicationLagRefreshInterval().toMillis();
+
+ if (replicationLagPollingInterval > 0) {
+ statusScheduler.scheduleAtFixedRate(
+ this::refreshProjectsWithLag,
+ replicationLagPollingInterval,
+ replicationLagPollingInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @VisibleForTesting
+ public void refreshProjectsWithLag() {
+ logger.atFine().log("Refreshing projects version lags triggered ...");
+ replicationStatusPerProject.entrySet().stream()
+ .filter(entry -> entry.getValue() > 0)
+ .map(Map.Entry::getKey)
+ .map(Project::nameKey)
+ .forEach(this::updateReplicationLag);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
index 791517f..605e119 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
@@ -17,13 +17,23 @@
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
import com.google.inject.Scopes;
public class ReplicationStatusModule extends LifecycleModule {
+
+ private final WorkQueue workQueue;
+
+ @Inject
+ public ReplicationStatusModule(WorkQueue workQueue) {
+ this.workQueue = workQueue;
+ }
+
@Override
protected void configure() {
bind(ReplicationStatus.class).in(Scopes.SINGLETON);
- install(ReplicationStatus.cacheModule());
+ install(ReplicationStatus.cacheModule(workQueue));
listener().to(ReplicationStatus.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationStatus.class);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
index b055f26..53bcbc0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
@@ -20,9 +20,12 @@
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.events.EventListener;
import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.OptionalBinder;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdateImpl;
public class EventModule extends LifecycleModule {
private final Configuration configuration;
@@ -34,7 +37,7 @@
@Override
protected void configure() {
- DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdate.class);
+ DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdateImpl.class);
bind(StreamEventPublisherConfig.class)
.toInstance(
@@ -43,5 +46,15 @@
configuration.broker().getStreamEventPublishTimeout()));
install(new StreamEventPublisherModule());
+
+ OptionalBinder<ProjectVersionRefUpdate> projectVersionRefUpdateBinder =
+ OptionalBinder.newOptionalBinder(binder(), ProjectVersionRefUpdate.class);
+ if (configuration.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
+ DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdateImpl.class);
+ projectVersionRefUpdateBinder
+ .setBinding()
+ .to(ProjectVersionRefUpdateImpl.class)
+ .in(Scopes.SINGLETON);
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index 8d41500..522a78f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -63,17 +63,27 @@
protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
ChangeChecker checker = changeCheckerFactory.create(id);
Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
- if (changeNotes.isPresent()) {
+ boolean changeIsPresent = changeNotes.isPresent();
+ boolean changeIsConsistent = checker.isChangeConsistent();
+ if (changeIsPresent && changeIsConsistent) {
reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
} else {
log.warn(
- "Change {} not present yet in local Git repository (event={}) after {} attempt(s)",
+ "Change {} {} in local Git repository (event={}) after {} attempt(s)",
id,
+ !changeIsPresent
+ ? "not present yet"
+ : (changeIsConsistent ? "is" : "is not") + " consistent",
indexEvent,
retryCount);
if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
log.error(
- "Change {} could not be found in the local Git repository (event={})", id, indexEvent);
+ "Change {} {} in the local Git repository (event={})",
+ id,
+ !changeIsPresent
+ ? "could not be found"
+ : (changeIsConsistent ? "was" : "was not") + " consistent",
+ indexEvent);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
index 9ee59eb..3277150 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
@@ -59,4 +59,11 @@
* @throws IOException if an I/O error occurred while reading the local Change
*/
public Optional<Long> getComputedChangeTs() throws IOException;
+
+ /**
+ * Check if the local Change contains current patchset refs
+ *
+ * @return true if local change contains meta and current patchset refs
+ */
+ public boolean isChangeConsistent();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 983a07b..4ef5b7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -31,8 +31,11 @@
import java.sql.Timestamp;
import java.util.Objects;
import java.util.Optional;
+import org.eclipse.jgit.errors.MissingObjectException;
+import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,6 +148,34 @@
}
}
+ @Override
+ public boolean isChangeConsistent() {
+ Optional<ChangeNotes> notes = getChangeNotes();
+ if (!notes.isPresent()) {
+ log.warn("Unable to compute change notes for change {}", changeId);
+ return true;
+ }
+ ObjectId currentPatchSetCommitId = notes.get().getCurrentPatchSet().commitId();
+ try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName());
+ RevWalk walk = new RevWalk(repo)) {
+ walk.parseCommit(currentPatchSetCommitId);
+ } catch (StorageException | MissingObjectException e) {
+ log.warn(
+ String.format(
+ "Consistency check failed for change %s, missing current patchset commit %s",
+ changeId, currentPatchSetCommitId.getName()),
+ e);
+ return false;
+ } catch (IOException e) {
+ log.warn(
+ String.format(
+ "Cannot check consistency for change %s, current patchset commit %s. Assuming change is consistent",
+ changeId, currentPatchSetCommitId.getName()),
+ e);
+ }
+ return true;
+ }
+
private Optional<Long> computeLastChangeTs() {
return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(notes));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 98f4897..c9300c8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
@@ -66,19 +67,23 @@
gitRepositoryManager.openRepository(Project.nameKey(projectName))) {
List<RemoteRefUpdate> filteredRefUpdates =
remoteUpdatesList.stream()
- .filter(
+ .map(
refUpdate -> {
- boolean refUpToDate = isUpToDateWithRetry(projectName, repository, refUpdate);
- if (!refUpToDate) {
+ Optional<RemoteRefUpdate> updatedRefUpdate =
+ isUpToDateWithRetry(projectName, repository, refUpdate);
+ if (!updatedRefUpdate.isPresent()) {
repLog.warn(
- "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+ "{} is not up-to-date with the shared-refdb and thus will NOT BE"
+ + " replicated",
refUpdate);
if (refUpdate.getSrcRef().endsWith(REF_META_SUFFIX)) {
outdatedChanges.add(getRootChangeRefPrefix(refUpdate.getSrcRef()));
}
}
- return refUpToDate;
+ return updatedRefUpdate;
})
+ .filter(Optional::isPresent)
+ .map(Optional::get)
.collect(Collectors.toList());
return filteredRefUpdates.stream()
@@ -102,37 +107,56 @@
}
}
- private boolean isUpToDateWithRetry(
+ private Optional<RemoteRefUpdate> isUpToDateWithRetry(
String projectName, Repository repository, RemoteRefUpdate refUpdate) {
String ref = refUpdate.getSrcRef();
try {
if (sharedRefDb.isUpToDate(
Project.nameKey(projectName),
new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
- return true;
+ return Optional.of(refUpdate);
}
randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
projectName, refUpdate, ref);
+ ObjectId reloadedNewObjectId = getNotNullExactRef(repository, ref);
+ RemoteRefUpdate refUpdateReloaded =
+ newRemoteRefUpdateWithObjectId(repository, refUpdate, reloadedNewObjectId);
return sharedRefDb.isUpToDate(
- Project.nameKey(projectName),
- new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, getNotNullExactRef(repository, ref)));
+ Project.nameKey(projectName),
+ new ObjectIdRef.Unpeeled(
+ Ref.Storage.NETWORK, ref, refUpdateReloaded.getNewObjectId()))
+ ? Optional.of(refUpdateReloaded)
+ : Optional.empty();
} catch (GlobalRefDbLockException gle) {
String message =
String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
repLog.error(message);
logger.atSevere().withCause(gle).log(message);
- return false;
+ return Optional.empty();
} catch (IOException ioe) {
String message =
String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
repLog.error(message);
logger.atSevere().withCause(ioe).log(message);
- return false;
+ return Optional.empty();
}
}
+ private RemoteRefUpdate newRemoteRefUpdateWithObjectId(
+ Repository localDb, RemoteRefUpdate refUpdate, ObjectId reloadedNewObjectId)
+ throws IOException {
+ return new RemoteRefUpdate(
+ localDb,
+ refUpdate.getSrcRef(),
+ reloadedNewObjectId,
+ refUpdate.getRemoteName(),
+ refUpdate.isForceUpdate(),
+ null,
+ refUpdate.getExpectedOldObjectId());
+ }
+
private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
String projectName, RemoteRefUpdate refUpdate, String ref) {
int randomSleepTimeMsec =
@@ -140,7 +164,8 @@
+ new Random().nextInt(RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS);
repLog.debug(
String.format(
- "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and re-check",
+ "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and"
+ + " re-check",
ref, projectName, refUpdate.getNewObjectId(), randomSleepTimeMsec));
try {
Thread.sleep(randomSleepTimeMsec);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
index 11ea428..1526059 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdate.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2020 The Android Open Source Project
+// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -14,293 +14,19 @@
package com.googlesource.gerrit.plugins.multisite.validation;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
-
-import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
-import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.config.GerritInstanceId;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
-import com.google.gerrit.server.git.GitRepositoryManager;
-import com.google.gerrit.server.notedb.IntBlob;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import java.io.IOException;
import java.util.Optional;
-import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectIdRef;
-import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.Ref;
-import org.eclipse.jgit.lib.RefUpdate;
-import org.eclipse.jgit.lib.Repository;
-@Singleton
-public class ProjectVersionRefUpdate implements EventListener {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private static final ImmutableSet<RefUpdate.Result> SUCCESSFUL_RESULTS =
- ImmutableSet.of(RefUpdate.Result.NEW, RefUpdate.Result.FORCED, RefUpdate.Result.NO_CHANGE);
+public interface ProjectVersionRefUpdate {
- public static final String MULTI_SITE_VERSIONING_REF = "refs/multi-site/version";
- public static final String MULTI_SITE_VERSIONING_VALUE_REF = "refs/multi-site/version/value";
- public static final Ref NULL_PROJECT_VERSION_REF =
+ String MULTI_SITE_VERSIONING_REF = "refs/multi-site/version";
+ String MULTI_SITE_VERSIONING_VALUE_REF = "refs/multi-site/version/value";
+ Ref NULL_PROJECT_VERSION_REF =
new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, MULTI_SITE_VERSIONING_REF, ObjectId.zeroId());
- private final GitRepositoryManager gitRepositoryManager;
- private final GitReferenceUpdated gitReferenceUpdated;
- private final ProjectVersionLogger verLogger;
- private final String nodeInstanceId;
+ Optional<Long> getProjectLocalVersion(String projectName);
- protected final SharedRefDatabaseWrapper sharedRefDb;
-
- @Inject
- public ProjectVersionRefUpdate(
- GitRepositoryManager gitRepositoryManager,
- SharedRefDatabaseWrapper sharedRefDb,
- GitReferenceUpdated gitReferenceUpdated,
- ProjectVersionLogger verLogger,
- @GerritInstanceId String nodeInstanceId) {
- this.gitRepositoryManager = gitRepositoryManager;
- this.sharedRefDb = sharedRefDb;
- this.gitReferenceUpdated = gitReferenceUpdated;
- this.verLogger = verLogger;
- this.nodeInstanceId = nodeInstanceId;
- }
-
- @Override
- public void onEvent(Event event) {
- logger.atFine().log("Processing event type: " + event.type);
- // Producer of the Event use RefUpdatedEvent to trigger the version update
- if (nodeInstanceId.equals(event.instanceId) && event instanceof RefUpdatedEvent) {
- updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
- }
- }
-
- private boolean isSpecialRefName(String refName) {
- return refName.startsWith(RefNames.REFS_SEQUENCES)
- || refName.startsWith(RefNames.REFS_STARRED_CHANGES)
- || refName.equals(MULTI_SITE_VERSIONING_REF);
- }
-
- private void updateProducerProjectVersionUpdate(RefUpdatedEvent refUpdatedEvent) {
- String refName = refUpdatedEvent.getRefName();
-
- if (isSpecialRefName(refName)) {
- logger.atFine().log(
- "Found a special ref name %s, skipping update for %s",
- refName, refUpdatedEvent.getProjectNameKey().get());
- return;
- }
- try {
- Project.NameKey projectNameKey = refUpdatedEvent.getProjectNameKey();
- long newVersion = getCurrentGlobalVersionNumber();
-
- Optional<RefUpdate> newProjectVersionRefUpdate =
- updateLocalProjectVersion(projectNameKey, newVersion);
-
- if (newProjectVersionRefUpdate.isPresent()) {
- verLogger.log(projectNameKey, newVersion, 0L);
-
- if (updateSharedProjectVersion(
- projectNameKey, newProjectVersionRefUpdate.get().getNewObjectId(), newVersion)) {
- gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate.get(), null);
- }
- } else {
- logger.atWarning().log(
- "Ref %s not found on projet %s: skipping project version update",
- refUpdatedEvent.getRefName(), projectNameKey);
- }
- } catch (LocalProjectVersionUpdateException | SharedProjectVersionUpdateException e) {
- logger.atSevere().withCause(e).log(
- "Issue encountered when updating version for project "
- + refUpdatedEvent.getProjectNameKey());
- }
- }
-
- private RefUpdate getProjectVersionRefUpdate(Repository repository, Long version)
- throws IOException {
- RefUpdate refUpdate = repository.getRefDatabase().newUpdate(MULTI_SITE_VERSIONING_REF, false);
- refUpdate.setNewObjectId(getNewId(repository, version));
- refUpdate.setForceUpdate(true);
- return refUpdate;
- }
-
- private ObjectId getNewId(Repository repository, Long version) throws IOException {
- ObjectInserter ins = repository.newObjectInserter();
- ObjectId newId = ins.insert(OBJ_BLOB, Long.toString(version).getBytes(UTF_8));
- ins.flush();
- return newId;
- }
-
- private boolean updateSharedProjectVersion(
- Project.NameKey projectNameKey, ObjectId newObjectId, Long newVersion)
- throws SharedProjectVersionUpdateException {
-
- Ref sharedRef =
- sharedRefDb
- .get(projectNameKey, MULTI_SITE_VERSIONING_REF, String.class)
- .map(
- (String objectId) ->
- new ObjectIdRef.Unpeeled(
- Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.fromString(objectId)))
- .orElse(
- new ObjectIdRef.Unpeeled(
- Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.zeroId()));
- Optional<Long> sharedVersion =
- sharedRefDb
- .get(projectNameKey, MULTI_SITE_VERSIONING_VALUE_REF, String.class)
- .map(Long::parseLong);
-
- try {
- if (sharedVersion.isPresent() && sharedVersion.get() >= newVersion) {
- logger.atWarning().log(
- String.format(
- "NOT Updating project %s version %s (value=%d) in shared ref-db because is more recent than the local one %s (value=%d) ",
- projectNameKey.get(),
- newObjectId,
- newVersion,
- sharedRef.getObjectId().getName(),
- sharedVersion.get()));
- return false;
- }
-
- logger.atFine().log(
- String.format(
- "Updating shared project %s version to %s (value=%d)",
- projectNameKey.get(), newObjectId, newVersion));
-
- boolean success = sharedRefDb.compareAndPut(projectNameKey, sharedRef, newObjectId);
- if (!success) {
- String message =
- String.format(
- "Project version blob update failed for %s. Current value %s, new value: %s",
- projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
- logger.atSevere().log(message);
- throw new SharedProjectVersionUpdateException(message);
- }
-
- success =
- sharedRefDb.compareAndPut(
- projectNameKey,
- MULTI_SITE_VERSIONING_VALUE_REF,
- sharedVersion.map(Object::toString).orElse(null),
- newVersion.toString());
- if (!success) {
- String message =
- String.format(
- "Project version update failed for %s. Current value %s, new value: %s",
- projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
- logger.atSevere().log(message);
- throw new SharedProjectVersionUpdateException(message);
- }
-
- return true;
- } catch (GlobalRefDbSystemError refDbSystemError) {
- String message =
- String.format(
- "Error while updating shared project version for %s. Current value %s, new value: %s. Error: %s",
- projectNameKey.get(),
- sharedRef.getObjectId(),
- newObjectId,
- refDbSystemError.getMessage());
- logger.atSevere().withCause(refDbSystemError).log(message);
- throw new SharedProjectVersionUpdateException(message);
- }
- }
-
- public Optional<Long> getProjectLocalVersion(String projectName) {
- try (Repository repository =
- gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
- Optional<IntBlob> blob = IntBlob.parse(repository, MULTI_SITE_VERSIONING_REF);
- if (blob.isPresent()) {
- Long repoVersion = Integer.toUnsignedLong(blob.get().value());
- logger.atFine().log("Local project '%s' has version %d", projectName, repoVersion);
- return Optional.of(repoVersion);
- }
- } catch (RepositoryNotFoundException re) {
- logger.atFine().log("Project '%s' not found", projectName);
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Cannot read local project '%s' version", projectName);
- }
- return Optional.empty();
- }
-
- public Optional<Long> getProjectRemoteVersion(String projectName) {
- Optional<String> globalVersion =
- sharedRefDb.get(
- Project.NameKey.parse(projectName), MULTI_SITE_VERSIONING_VALUE_REF, String.class);
- return globalVersion.flatMap(longString -> getLongValueOf(longString));
- }
-
- private Object safeGetObjectId(Ref currentRef) {
- return currentRef == null ? "null" : currentRef.getObjectId();
- }
-
- private Optional<Long> getLongValueOf(String longString) {
- try {
- return Optional.ofNullable(Long.parseLong(longString));
- } catch (NumberFormatException e) {
- logger.atSevere().withCause(e).log(
- "Unable to parse timestamp value %s into Long", longString);
- return Optional.empty();
- }
- }
-
- private Optional<RefUpdate> updateLocalProjectVersion(
- Project.NameKey projectNameKey, long newVersionNumber)
- throws LocalProjectVersionUpdateException {
- logger.atFine().log(
- "Updating local version for project %s with version %d",
- projectNameKey.get(), newVersionNumber);
- try (Repository repository = gitRepositoryManager.openRepository(projectNameKey)) {
- RefUpdate refUpdate = getProjectVersionRefUpdate(repository, newVersionNumber);
- RefUpdate.Result result = refUpdate.update();
- if (!isSuccessful(result)) {
- String message =
- String.format(
- "RefUpdate failed with result %s for: project=%s, version=%d",
- result.name(), projectNameKey.get(), newVersionNumber);
- logger.atSevere().log(message);
- throw new LocalProjectVersionUpdateException(message);
- }
-
- return Optional.of(refUpdate);
- } catch (IOException e) {
- String message = "Cannot create versioning command for " + projectNameKey.get();
- logger.atSevere().withCause(e).log(message);
- throw new LocalProjectVersionUpdateException(message);
- }
- }
-
- private long getCurrentGlobalVersionNumber() {
- return System.currentTimeMillis() / 1000;
- }
-
- private Boolean isSuccessful(RefUpdate.Result result) {
- return SUCCESSFUL_RESULTS.contains(result);
- }
-
- public static class LocalProjectVersionUpdateException extends Exception {
- private static final long serialVersionUID = 7649956232401457023L;
-
- public LocalProjectVersionUpdateException(String projectName) {
- super("Cannot update local project version of " + projectName);
- }
- }
-
- public static class SharedProjectVersionUpdateException extends Exception {
- private static final long serialVersionUID = -9153858177700286314L;
-
- public SharedProjectVersionUpdateException(String projectName) {
- super("Cannot update shared project version of " + projectName);
- }
- }
+ Optional<Long> getProjectRemoteVersion(String projectName);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
new file mode 100644
index 0000000..e9a84ac
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
@@ -0,0 +1,308 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.notedb.IntBlob;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.ObjectInserter;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+
+public class ProjectVersionRefUpdateImpl implements EventListener, ProjectVersionRefUpdate {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
+ ImmutableSet.of(RefUpdate.Result.NEW, RefUpdate.Result.FORCED, RefUpdate.Result.NO_CHANGE);
+
+ private final GitRepositoryManager gitRepositoryManager;
+ private final GitReferenceUpdated gitReferenceUpdated;
+ private final ProjectVersionLogger verLogger;
+ private final String nodeInstanceId;
+
+ protected final SharedRefDatabaseWrapper sharedRefDb;
+
+ @Inject
+ public ProjectVersionRefUpdateImpl(
+ GitRepositoryManager gitRepositoryManager,
+ SharedRefDatabaseWrapper sharedRefDb,
+ GitReferenceUpdated gitReferenceUpdated,
+ ProjectVersionLogger verLogger,
+ @GerritInstanceId String nodeInstanceId) {
+ this.gitRepositoryManager = gitRepositoryManager;
+ this.sharedRefDb = sharedRefDb;
+ this.gitReferenceUpdated = gitReferenceUpdated;
+ this.verLogger = verLogger;
+ this.nodeInstanceId = nodeInstanceId;
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ logger.atFine().log("Processing event type: " + event.type);
+ // Producer of the Event use RefUpdatedEvent to trigger the version update
+ if (nodeInstanceId.equals(event.instanceId) && event instanceof RefUpdatedEvent) {
+ updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
+ }
+ }
+
+ private boolean isSpecialRefName(String refName) {
+ return refName.startsWith(RefNames.REFS_SEQUENCES)
+ || refName.startsWith(RefNames.REFS_STARRED_CHANGES)
+ || refName.equals(MULTI_SITE_VERSIONING_REF);
+ }
+
+ private void updateProducerProjectVersionUpdate(RefUpdatedEvent refUpdatedEvent) {
+ String refName = refUpdatedEvent.getRefName();
+
+ if (isSpecialRefName(refName)) {
+ logger.atFine().log(
+ "Found a special ref name %s, skipping update for %s",
+ refName, refUpdatedEvent.getProjectNameKey().get());
+ return;
+ }
+ try {
+ Project.NameKey projectNameKey = refUpdatedEvent.getProjectNameKey();
+ long newVersion = getCurrentGlobalVersionNumber();
+
+ Optional<RefUpdate> newProjectVersionRefUpdate =
+ updateLocalProjectVersion(projectNameKey, newVersion);
+
+ if (newProjectVersionRefUpdate.isPresent()) {
+ verLogger.log(projectNameKey, newVersion, 0L);
+
+ if (updateSharedProjectVersion(
+ projectNameKey, newProjectVersionRefUpdate.get().getNewObjectId(), newVersion)) {
+ gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate.get(), null);
+ }
+ } else {
+ logger.atWarning().log(
+ "Ref %s not found on projet %s: skipping project version update",
+ refUpdatedEvent.getRefName(), projectNameKey);
+ }
+ } catch (LocalProjectVersionUpdateException | SharedProjectVersionUpdateException e) {
+ logger.atSevere().withCause(e).log(
+ "Issue encountered when updating version for project "
+ + refUpdatedEvent.getProjectNameKey());
+ }
+ }
+
+ private RefUpdate getProjectVersionRefUpdate(Repository repository, Long version)
+ throws IOException {
+ RefUpdate refUpdate = repository.getRefDatabase().newUpdate(MULTI_SITE_VERSIONING_REF, false);
+ refUpdate.setNewObjectId(getNewId(repository, version));
+ refUpdate.setForceUpdate(true);
+ return refUpdate;
+ }
+
+ private ObjectId getNewId(Repository repository, Long version) throws IOException {
+ ObjectInserter ins = repository.newObjectInserter();
+ ObjectId newId = ins.insert(OBJ_BLOB, Long.toString(version).getBytes(UTF_8));
+ ins.flush();
+ return newId;
+ }
+
+ private boolean updateSharedProjectVersion(
+ Project.NameKey projectNameKey, ObjectId newObjectId, Long newVersion)
+ throws SharedProjectVersionUpdateException {
+
+ Ref sharedRef =
+ sharedRefDb
+ .get(projectNameKey, MULTI_SITE_VERSIONING_REF, String.class)
+ .map(
+ (String objectId) ->
+ new ObjectIdRef.Unpeeled(
+ Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.fromString(objectId)))
+ .orElse(
+ new ObjectIdRef.Unpeeled(
+ Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.zeroId()));
+ Optional<Long> sharedVersion =
+ sharedRefDb
+ .get(projectNameKey, MULTI_SITE_VERSIONING_VALUE_REF, String.class)
+ .map(Long::parseLong);
+
+ try {
+ if (sharedVersion.isPresent() && sharedVersion.get() >= newVersion) {
+ logger.atWarning().log(
+ String.format(
+ "NOT Updating project %s version %s (value=%d) in shared ref-db because is more recent than the local one %s (value=%d) ",
+ projectNameKey.get(),
+ newObjectId,
+ newVersion,
+ sharedRef.getObjectId().getName(),
+ sharedVersion.get()));
+ return false;
+ }
+
+ logger.atFine().log(
+ String.format(
+ "Updating shared project %s version to %s (value=%d)",
+ projectNameKey.get(), newObjectId, newVersion));
+
+ boolean success = sharedRefDb.compareAndPut(projectNameKey, sharedRef, newObjectId);
+ if (!success) {
+ String message =
+ String.format(
+ "Project version blob update failed for %s. Current value %s, new value: %s",
+ projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
+ logger.atSevere().log(message);
+ throw new SharedProjectVersionUpdateException(message);
+ }
+
+ success =
+ sharedRefDb.compareAndPut(
+ projectNameKey,
+ MULTI_SITE_VERSIONING_VALUE_REF,
+ sharedVersion.map(Object::toString).orElse(null),
+ newVersion.toString());
+ if (!success) {
+ String message =
+ String.format(
+ "Project version update failed for %s. Current value %s, new value: %s",
+ projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
+ logger.atSevere().log(message);
+ throw new SharedProjectVersionUpdateException(message);
+ }
+
+ return true;
+ } catch (GlobalRefDbSystemError refDbSystemError) {
+ String message =
+ String.format(
+ "Error while updating shared project version for %s. Current value %s, new value: %s. Error: %s",
+ projectNameKey.get(),
+ sharedRef.getObjectId(),
+ newObjectId,
+ refDbSystemError.getMessage());
+ logger.atSevere().withCause(refDbSystemError).log(message);
+ throw new SharedProjectVersionUpdateException(message);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate#getProjectLocalVersion(java.lang.String)
+ */
+ @Override
+ public Optional<Long> getProjectLocalVersion(String projectName) {
+ try (Repository repository =
+ gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+ Optional<IntBlob> blob = IntBlob.parse(repository, MULTI_SITE_VERSIONING_REF);
+ if (blob.isPresent()) {
+ Long repoVersion = Integer.toUnsignedLong(blob.get().value());
+ logger.atFine().log("Local project '%s' has version %d", projectName, repoVersion);
+ return Optional.of(repoVersion);
+ }
+ } catch (RepositoryNotFoundException re) {
+ logger.atFine().log("Project '%s' not found", projectName);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Cannot read local project '%s' version", projectName);
+ }
+ return Optional.empty();
+ }
+
+ /* (non-Javadoc)
+ * @see com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate#getProjectRemoteVersion(java.lang.String)
+ */
+ @Override
+ public Optional<Long> getProjectRemoteVersion(String projectName) {
+ Optional<String> globalVersion =
+ sharedRefDb.get(
+ Project.NameKey.parse(projectName), MULTI_SITE_VERSIONING_VALUE_REF, String.class);
+ return globalVersion.flatMap(longString -> getLongValueOf(longString));
+ }
+
+ private Object safeGetObjectId(Ref currentRef) {
+ return currentRef == null ? "null" : currentRef.getObjectId();
+ }
+
+ private Optional<Long> getLongValueOf(String longString) {
+ try {
+ return Optional.ofNullable(Long.parseLong(longString));
+ } catch (NumberFormatException e) {
+ logger.atSevere().withCause(e).log(
+ "Unable to parse timestamp value %s into Long", longString);
+ return Optional.empty();
+ }
+ }
+
+ private Optional<RefUpdate> updateLocalProjectVersion(
+ Project.NameKey projectNameKey, long newVersionNumber)
+ throws LocalProjectVersionUpdateException {
+ logger.atFine().log(
+ "Updating local version for project %s with version %d",
+ projectNameKey.get(), newVersionNumber);
+ try (Repository repository = gitRepositoryManager.openRepository(projectNameKey)) {
+ RefUpdate refUpdate = getProjectVersionRefUpdate(repository, newVersionNumber);
+ RefUpdate.Result result = refUpdate.update();
+ if (!isSuccessful(result)) {
+ String message =
+ String.format(
+ "RefUpdate failed with result %s for: project=%s, version=%d",
+ result.name(), projectNameKey.get(), newVersionNumber);
+ logger.atSevere().log(message);
+ throw new LocalProjectVersionUpdateException(message);
+ }
+
+ return Optional.of(refUpdate);
+ } catch (IOException e) {
+ String message = "Cannot create versioning command for " + projectNameKey.get();
+ logger.atSevere().withCause(e).log(message);
+ throw new LocalProjectVersionUpdateException(message);
+ }
+ }
+
+ private long getCurrentGlobalVersionNumber() {
+ return System.currentTimeMillis() / 1000;
+ }
+
+ private Boolean isSuccessful(RefUpdate.Result result) {
+ return SUCCESSFUL_RESULTS.contains(result);
+ }
+
+ public static class LocalProjectVersionUpdateException extends Exception {
+ private static final long serialVersionUID = 7649956232401457023L;
+
+ public LocalProjectVersionUpdateException(String projectName) {
+ super("Cannot update local project version of " + projectName);
+ }
+ }
+
+ public static class SharedProjectVersionUpdateException extends Exception {
+ private static final long serialVersionUID = -9153858177700286314L;
+
+ public SharedProjectVersionUpdateException(String projectName) {
+ super("Cannot update shared project version of " + projectName);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index fe224ae..f66bf37 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -20,7 +20,6 @@
import com.gerritforge.gerrit.globalrefdb.validation.RefUpdateValidator;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbBatchRefUpdate;
-import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbConfiguration;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbGitRepositoryManager;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbRefDatabase;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbRefUpdate;
@@ -37,8 +36,6 @@
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.Log4jProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule;
import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
@@ -55,7 +52,6 @@
bind(SharedRefDatabaseWrapper.class).in(Scopes.SINGLETON);
bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
- bind(ProjectVersionLogger.class).to(Log4jProjectVersionLogger.class);
factory(LockWrapper.Factory.class);
factory(SharedRefDbRepository.Factory.class);
@@ -65,7 +61,6 @@
factory(RefUpdateValidator.Factory.class);
factory(BatchRefUpdateValidator.Factory.class);
- bind(SharedRefDbConfiguration.class).toInstance(cfg.getSharedRefDbConfiguration());
bind(new TypeLiteral<ImmutableSet<String>>() {})
.annotatedWith(Names.named(SharedRefDbGitRepositoryManager.IGNORED_REFS))
.toInstance(
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e217994..8117ba9 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -78,6 +78,12 @@
: Enable the use of a shared ref-database
Defaults: true
+```ref-database.replicationLagRefreshInterval```
+: Enable the auto-refresh of the metrics to trace the auto-replication
+ lag by polling on a regular basis. Set to zero for disabling the polling
+ mechanism.
+ Defaults: 60 min
+
```ref-database.enforcementRules.<policy>```
: Level of consistency enforcement across sites on a project:refs basis.
Supports two values for enforcing the policy on multiple projects or refs.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
index 1e11bf9..cb1f78e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -104,8 +104,7 @@
}
@Test
- public void shouldUpdateReplicationMetricsWithLocalEvents()
- throws IOException, PermissionBackendException, CacheNotFoundException {
+ public void shouldUpdateReplicationMetricsWithLocalEvents() {
for (Event event : events()) {
event.instanceId = NODE_INSTANCE_ID;
when(projectsFilter.matches(any(String.class))).thenReturn(true);
@@ -118,8 +117,7 @@
}
@Test
- public void shouldUpdateReplicationMetricsWithNonLocalEvents()
- throws IOException, PermissionBackendException, CacheNotFoundException {
+ public void shouldUpdateReplicationMetricsWithNonLocalEvents() {
for (Event event : events()) {
event.instanceId = INSTANCE_ID;
when(projectsFilter.matches(any(String.class))).thenReturn(true);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
index a2a5e73..e1cd127 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -25,10 +25,12 @@
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.server.project.ProjectCache;
-import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,7 +42,6 @@
@Mock private ProjectVersionLogger verLogger;
@Mock private ProjectCache projectCache;
- @Mock private Provider<ProjectVersionRefUpdate> projectVersionRefUpdateProvider;
@Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
private ReplicationStatus objectUnderTest;
private Cache<String, Long> replicationStatusCache;
@@ -53,8 +54,12 @@
replicationStatusCache = CacheBuilder.newBuilder().build();
objectUnderTest =
new ReplicationStatus(
- replicationStatusCache, projectVersionRefUpdateProvider, verLogger, projectCache);
- lenient().when(projectVersionRefUpdateProvider.get()).thenReturn(projectVersionRefUpdate);
+ replicationStatusCache,
+ Optional.of(projectVersionRefUpdate),
+ verLogger,
+ projectCache,
+ Executors.newScheduledThreadPool(1),
+ new Configuration(new Config(), new Config()));
}
@Test
@@ -146,6 +151,44 @@
assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(lag);
}
+ @Test
+ public void shouldUpdateReplicationLagForProject() {
+ String projectName = "projectA";
+ long projectLocalVersion = 10L;
+ long projectRemoteVersion = 20L;
+
+ when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectLocalVersion));
+ when(projectVersionRefUpdate.getProjectRemoteVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectRemoteVersion));
+
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName))
+ .isEqualTo(projectRemoteVersion - projectLocalVersion);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldAutoRefreshReplicationLagForProject() {
+ String projectName = "projectA";
+ long projectLocalVersion = 10L;
+ long projectRemoteVersion = 20L;
+
+ when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectLocalVersion), Optional.of(projectRemoteVersion));
+ when(projectVersionRefUpdate.getProjectRemoteVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectRemoteVersion));
+
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName))
+ .isEqualTo(projectRemoteVersion - projectLocalVersion);
+ objectUnderTest.refreshProjectsWithLag();
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(0);
+ }
+
private void setupReplicationLag(String projectName, long lag) {
long currentVersion = System.currentTimeMillis();
long newVersion = currentVersion + lag;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
index a0e97a8..71fd828 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
@@ -14,15 +14,19 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.Project.NameKey;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefEvent;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
@@ -79,6 +83,31 @@
verify(droppedEventListeners, never()).onEventDropped(event);
}
+ @Test
+ public void shouldUpdateReplicationMetricsWithRemoteRefEvent() {
+ Event event =
+ new RefEvent("ref-replicated") {
+
+ @Override
+ public String getRefName() {
+ return "foo-ref";
+ }
+
+ @Override
+ public NameKey getProjectNameKey() {
+ return Project.nameKey(PROJECT_NAME);
+ }
+ };
+
+ event.instanceId = INSTANCE_ID;
+ when(projectsFilter.matches(eq(PROJECT_NAME))).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+ }
+
@Override
protected AbstractSubcriber objectUnderTest() {
return new StreamEventSubscriber(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index 8ed4a14..6d660bb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -27,12 +27,13 @@
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.project.ProjectCache;
-import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
import java.net.URISyntaxException;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.transport.URIish;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +50,6 @@
@Mock private MetricMaker metricMaker;
@Mock private ProjectVersionLogger verLogger;
@Mock private ProjectCache projectCache;
- @Mock private Provider<ProjectVersionRefUpdate> projectVersionRefUpdateProvider;
@Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
private SubscriberMetrics metrics;
private ReplicationStatus replicationStatus;
@@ -59,11 +59,13 @@
replicationStatus =
new ReplicationStatus(
CacheBuilder.newBuilder().build(),
- projectVersionRefUpdateProvider,
+ Optional.of(projectVersionRefUpdate),
verLogger,
- projectCache);
+ projectCache,
+ Executors.newScheduledThreadPool(1),
+ new com.googlesource.gerrit.plugins.multisite.Configuration(
+ new Config(), new Config()));
metrics = new SubscriberMetrics(metricMaker, replicationStatus);
- when(projectVersionRefUpdateProvider.get()).thenReturn(projectVersionRefUpdate);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index 452a0d2..b827cdf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -17,6 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@@ -49,6 +50,7 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.class)
public class ForwardedIndexChangeHandlerTest {
@@ -62,6 +64,8 @@
private static final boolean THROW_STORAGE_EXCEPTION = true;
private static final boolean CHANGE_UP_TO_DATE = true;
private static final boolean CHANGE_OUTDATED = false;
+ private static final boolean CHANGE_CONSISTENT = true;
+ private static final boolean CHANGE_INCONSISTENT = false;
@Rule public ExpectedException exception = ExpectedException.none();
@Mock private ChangeIndexer indexerMock;
@@ -88,6 +92,7 @@
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
when(configurationMock.index()).thenReturn(index);
when(index.numStripedLocks()).thenReturn(10);
+ when(index.maxTries()).thenReturn(1);
handler =
new ForwardedIndexChangeHandler(
indexerMock, configurationMock, indexExecutorMock, ctxMock, changeCheckerFactoryMock);
@@ -95,14 +100,36 @@
@Test
public void changeIsIndexedWhenUpToDate() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
verify(indexerMock, times(1)).index(any(Change.class));
}
@Test
public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED, CHANGE_CONSISTENT);
+ handler.index(
+ TEST_CHANGE_ID,
+ Operation.INDEX,
+ Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
+ verify(indexerMock, times(1)).index(any(Change.class));
+ }
+
+ @Test
+ public void changeIsIndexeAtFirstRetryWhenInitiallyInconsistent() throws Exception {
+ setupChangeAccessRelatedMocks(
+ CHANGE_EXISTS,
+ DO_NOT_THROW_STORAGE_EXCEPTION,
+ CHANGE_UP_TO_DATE,
+ CHANGE_INCONSISTENT,
+ CHANGE_CONSISTENT);
+ handler.index(
+ TEST_CHANGE_ID,
+ Operation.INDEX,
+ Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
+ verify(indexerMock, never()).index(any(Change.class));
+ verify(indexExecutorMock, times(1)).schedule(any(Runnable.class), anyLong(), any());
+
handler.index(
TEST_CHANGE_ID,
Operation.INDEX,
@@ -126,7 +153,8 @@
@Test
public void indexerThrowsStorageExceptionTryingToIndexChange() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE);
+ setupChangeAccessRelatedMocks(
+ CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
assertThrows(
StorageException.class,
() -> handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()));
@@ -178,11 +206,21 @@
private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate)
throws Exception {
- setupChangeAccessRelatedMocks(changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate);
+ setupChangeAccessRelatedMocks(
+ changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate, CHANGE_CONSISTENT);
}
private void setupChangeAccessRelatedMocks(
- boolean changeExists, boolean storageException, boolean changeIsUpToDate)
+ boolean changeExist, boolean changeUpToDate, boolean changeConsistent) throws Exception {
+ setupChangeAccessRelatedMocks(
+ changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate, changeConsistent);
+ }
+
+ private void setupChangeAccessRelatedMocks(
+ boolean changeExists,
+ boolean storageException,
+ boolean changeIsUpToDate,
+ boolean... changeConsistentReturnValues)
throws StorageException {
if (changeExists) {
when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock);
@@ -193,5 +231,11 @@
}
when(changeCheckerPresentMock.isUpToDate(any())).thenReturn(changeIsUpToDate);
+
+ OngoingStubbing<Boolean> changeConsistentCall =
+ when(changeCheckerPresentMock.isChangeConsistent());
+ for (boolean changeConsistent : changeConsistentReturnValues) {
+ changeConsistentCall = changeConsistentCall.thenReturn(changeConsistent);
+ }
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
index f95668e..f940fc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
@@ -27,7 +27,11 @@
import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.entities.Project;
import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.OptionalBinder;
import com.googlesource.gerrit.plugins.multisite.Log4jProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
@@ -36,6 +40,8 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdateImpl;
import java.io.IOException;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
@@ -53,18 +59,24 @@
private ReplicationStatus replicationStatus;
public static class TestModule extends AbstractModule {
+ @Inject WorkQueue workQueue;
+
@Override
protected void configure() {
install(new ForwarderModule());
install(new CacheModule());
install(new RouterModule());
install(new IndexModule());
- install(new ReplicationStatusModule());
+ install(new ReplicationStatusModule(workQueue));
SharedRefDbConfiguration sharedRefDbConfig =
new SharedRefDbConfiguration(new Config(), "multi-site");
bind(SharedRefDbConfiguration.class).toInstance(sharedRefDbConfig);
bind(ProjectVersionLogger.class).to(Log4jProjectVersionLogger.class);
bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
+ OptionalBinder.newOptionalBinder(binder(), ProjectVersionRefUpdate.class)
+ .setBinding()
+ .to(ProjectVersionRefUpdateImpl.class)
+ .in(Scopes.SINGLETON);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java
new file mode 100644
index 0000000..b7ad98c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefLogger;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.Ignore;
+
+@Ignore
+public class DisabledSharedRefLogger implements SharedRefLogger {
+
+ @Override
+ public void logRefUpdate(String project, Ref currRef, ObjectId newRefValue) {}
+
+ @Override
+ public void logProjectDelete(String project) {}
+
+ @Override
+ public void logLockAcquisition(String project, String refName) {}
+
+ @Override
+ public void logLockRelease(String project, String refName) {}
+
+ @Override
+ public <T> void logRefUpdate(String project, String refName, T currRef, T newRefValue) {}
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java
new file mode 100644
index 0000000..b45e1fb
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java
@@ -0,0 +1,211 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MultisiteReplicationPushFilterTest extends LocalDiskRepositoryTestCase
+ implements RefFixture {
+
+ @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
+
+ @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
+
+ @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+ String project = A_TEST_PROJECT_NAME;
+ Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+ private TestRepository<InMemoryRepository> repo;
+
+ @Before
+ public void setupTestRepo() throws Exception {
+ InMemoryRepository inMemoryRepo =
+ gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+ repo = new TestRepository<>(inMemoryRepo);
+ }
+
+ @Test
+ public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
+ List<RemoteRefUpdate> refUpdates =
+ Arrays.asList(refUpdate("refs/heads/foo"), refUpdate("refs/heads/bar"));
+ doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
+ }
+
+ @Test
+ public void shouldFilterOutOneOutdatedRef() throws Exception {
+ RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+ RemoteRefUpdate outdatedRef = refUpdate("refs/heads/outdated");
+ List<RemoteRefUpdate> refUpdates = Arrays.asList(refUpToDate, outdatedRef);
+ SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(outdatedRef.getSrcRef());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).containsExactly(refUpToDate);
+ }
+
+ @Test
+ public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+ String refName = "refs/heads/temporaryOutdated";
+ RemoteRefUpdate temporaryOutdated = refUpdate(refName);
+ ObjectId latestObjectId = repo.getRepository().exactRef(refName).getObjectId();
+
+ List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+ doReturn(false).doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).hasSize(1);
+ assertThat(filteredRefUpdates.get(0).getNewObjectId()).isEqualTo(latestObjectId);
+
+ verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+ }
+
+ @Test
+ public void shouldLoadLocalVersionAndFilter() throws Exception {
+ RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+ repo.branch("refs/heads/temporaryOutdated").commit().create();
+ List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+ doReturn(false).doReturn(false).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).isEmpty();
+ verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+ }
+
+ @Test
+ public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
+ RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+ RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
+ RemoteRefUpdate changeMetaRef = refUpdate("refs/changes/12/4512/meta");
+ RemoteRefUpdate changeRef = refUpdate("refs/changes/12/4512/1");
+ List<RemoteRefUpdate> refUpdates =
+ Arrays.asList(refUpToDate, refChangeUpToDate, changeMetaRef, changeRef);
+ SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(changeMetaRef.getSrcRef());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
+ }
+
+ private SharedRefDatabaseWrapper newSharedRefDatabase(String... rejectedRefs) {
+ Set<String> rejectedSet = new HashSet<>();
+ rejectedSet.addAll(Arrays.asList(rejectedRefs));
+
+ GlobalRefDatabase sharedRefDatabase =
+ new GlobalRefDatabase() {
+
+ @Override
+ public boolean isUpToDate(Project.NameKey project, Ref ref)
+ throws GlobalRefDbLockException {
+ return !rejectedSet.contains(ref.getName());
+ }
+
+ @Override
+ public boolean exists(Project.NameKey project, String refName) {
+ return true;
+ }
+
+ @Override
+ public boolean compareAndPut(Project.NameKey project, Ref currRef, ObjectId newRefValue)
+ throws GlobalRefDbSystemError {
+ return false;
+ }
+
+ @Override
+ public <T> boolean compareAndPut(
+ Project.NameKey project, String refName, T currValue, T newValue)
+ throws GlobalRefDbSystemError {
+ return false;
+ }
+
+ @Override
+ public AutoCloseable lockRef(Project.NameKey project, String refName)
+ throws GlobalRefDbLockException {
+ return null;
+ }
+
+ @Override
+ public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+ @Override
+ public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+ throws GlobalRefDbSystemError {
+ return Optional.empty();
+ }
+ };
+ return new SharedRefDatabaseWrapper(
+ DynamicItem.itemOf(GlobalRefDatabase.class, sharedRefDatabase),
+ new DisabledSharedRefLogger());
+ }
+
+ private RemoteRefUpdate refUpdate(String refName) throws Exception {
+ ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
+ Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+ repo.branch(refName).commit().create();
+ return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index 384db84..cce996b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -100,7 +100,7 @@
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
@@ -143,7 +143,7 @@
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
@@ -182,7 +182,7 @@
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
@@ -224,7 +224,7 @@
when(refUpdatedEvent.getRefName()).thenReturn(magicRefName);
repo.branch(magicRefName).commit().create();
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
@@ -239,7 +239,7 @@
throws IOException {
refUpdatedEvent.instanceId = "instance-id-2";
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
@@ -254,7 +254,7 @@
when(refUpdatedEvent.getProjectNameKey()).thenReturn(Project.nameKey("aNonExistentProject"));
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.onEvent(refUpdatedEvent);
@@ -270,7 +270,7 @@
.thenReturn(Optional.of("123"));
Optional<Long> version =
- new ProjectVersionRefUpdate(
+ new ProjectVersionRefUpdateImpl(
repoManager, sharedRefDb, gitReferenceUpdated, verLogger, DEFAULT_INSTANCE_ID)
.getProjectRemoteVersion(A_TEST_PROJECT_NAME);