Merge "Merge branch 'stable-3.3' into stable-3.4" into stable-3.6
diff --git a/DESIGN.md b/DESIGN.md
index 1bbe78f..b114eb3 100644
--- a/DESIGN.md
+++ b/DESIGN.md
@@ -2,7 +2,7 @@
This document collects and organizes thoughts about
the design of the Gerrit multi-site plugin, supporting the definition of the
-[implementation roadmap](#next-steps-in-the-road-map).
+[implementation roadmap](#next-steps-in-the-roadmap).
It first presents background for the problems the plugin will address and
the tools currently available in the Gerrit ecosystem that support the
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index 3ecb9fd..b069565 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,8 +16,8 @@
LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.4
-GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
+GERRIT_BRANCH=stable-3.6
+GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
DEF_GERRIT_IMAGE=3.4.0-centos8
@@ -25,6 +25,7 @@
DEF_GERRIT_HEALTHCHECK_INTERVAL=5s
DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
DEF_GERRIT_HEALTHCHECK_RETRIES=5
+COMMON_PLUGINS_LIST="websession-broker healthcheck zookeeper-refdb"
function check_application_requirements {
type java >/dev/null 2>&1 || { echo >&2 "Require java but it's not installed. Aborting."; exit 1; }
@@ -112,6 +113,19 @@
return 0
}
+function download_plugin {
+ local PLUGIN_NAME=$1
+
+ echo "Downloading $PLUGIN_NAME plugin $GERRIT_BRANCH onto $TARGET_DIR"
+ wget $GERRIT_CI/plugin-$PLUGIN_NAME-bazel-$GERRIT_BRANCH/$LAST_BUILD/$PLUGIN_NAME/$PLUGIN_NAME.jar \
+ -O $COMMON_PLUGINS/$PLUGIN_NAME.jar || \
+ wget $GERRIT_CI/plugin-$PLUGIN_NAME-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/$PLUGIN_NAME/$PLUGIN_NAME.jar \
+ -O $COMMON_PLUGINS/$PLUGIN_NAME.jar || \
+ { echo >&2 "Cannot download $PLUGIN_NAME plugin: Check internet connection. Aborting"; exit 1; }
+
+ return 0
+}
+
# Check application requirements
check_application_requirements
@@ -220,27 +234,14 @@
echo "Downloading common plugins"
COMMON_PLUGINS=${DEPLOYMENT_LOCATION}/common_plugins
mkdir -p ${COMMON_PLUGINS}
+for plugin in $COMMON_PLUGINS_LIST; do download_plugin $plugin; done
echo "plugin location[${MULTISITE_LIB_LOCATION}]"
cp -f $MULTISITE_LIB_LOCATION $COMMON_PLUGINS/multi-site.jar >/dev/null 2>&1 || \
{ echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
-echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
- -O $COMMON_PLUGINS/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Aborting"; exit 1; }
-
-echo "Downloading healthcheck plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
- -O $COMMON_PLUGINS/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Aborting"; exit 1; }
-
-echo "Downloading zookeeper plugin $GERRIT_BRANCH"
-wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
- -O $COMMON_PLUGINS/zookeeper-refdb.jar || { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Aborting"; exit 1; }
-
if [ "$BROKER_TYPE" = "kafka" ]; then
- echo "Downloading events-kafka plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-events-kafka-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
- -O $COMMON_PLUGINS/events-kafka.jar || { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Aborting"; exit 1; }
+ download_plugin events-kafka $COMMON_PLUGINS
BROKER_PORT=9092
BROKER_HOST=kafka
BROKER_PLUGIN=events-kafka
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 8a176f2..20075d7 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -39,8 +39,6 @@
directory = cache
[plugins]
allowRemoteAdmin = true
-[plugin "websession-flatfile"]
- directory = $FAKE_NFS
[plugin "events-kafka"]
sendAsync = true
bootstrapServers = $BROKER_HOST:$BROKER_PORT
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 247a721..7697f24 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,8 +16,8 @@
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.4
-GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
+GERRIT_BRANCH=stable-3.6
+GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
function check_application_requirements {
@@ -224,6 +224,16 @@
fi
}
+function download_artifact_from_ci {
+ local artifact_name=$1
+ local prefix=${2:-plugin}
+ wget $GERRIT_CI/$prefix-$artifact_name-bazel-$GERRIT_BRANCH/$LAST_BUILD/$artifact_name/$artifact_name.jar \
+ -O $DEPLOYMENT_LOCATION/$artifact_name.jar || \
+ wget $GERRIT_CI/$prefix-$artifact_name-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/$artifact_name/$artifact_name.jar \
+ -O $DEPLOYMENT_LOCATION/$artifact_name.jar || \
+ { echo >&2 "Cannot download $artifact_name $prefix: Check internet connection. Aborting"; exit 1; }
+}
+
while [ $# -ne 0 ]
do
case "$1" in
@@ -429,70 +439,37 @@
if [ $DOWNLOAD_WEBSESSION_PLUGIN = "true" ];then
echo "Downloading websession-broker plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
- -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
- wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
- -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
- { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
-ing"; exit 1; }
- wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
- -O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci websession-broker
+ download_artifact_from_ci healthcheck
+
else
echo "Without the websession-broker; user login via haproxy will fail."
fi
echo "Downloading zookeeper plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
- -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
- wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
- -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
- { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci zookeeper-refdb
if [ "$BROKER_TYPE" = "kafka" ]; then
echo "Downloading events-kafka plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-events-kafka-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
- -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
- wget $GERRIT_CI/plugin-events-kafka-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
- -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
- { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci events-kafka
fi
if [ "$BROKER_TYPE" = "kinesis" ]; then
echo "Downloading events-aws-kinesis plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
- -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
- wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
- -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
- { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci events-aws-kinesis
fi
if [ "$BROKER_TYPE" = "gcloud-pubsub" ]; then
echo "Downloading events-gcloud-pubsub plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
- -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
- wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
- -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
- { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci events-gcloud-pubsub
fi
echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
- -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
- wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
- -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
- { echo >&2 "Cannot download metrics-reporter-prometheus plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci metrics-reporter-prometheus
echo "Downloading pull-replication plugin $GERRIT_BRANCH"
- wget $GERRIT_CI/plugin-pull-replication-bazel-$GERRIT_BRANCH/$LAST_BUILD/pull-replication/pull-replication.jar \
- -O $DEPLOYMENT_LOCATION/pull-replication.jar || { echo >&2 "Cannot download pull-replication plugin: Check internet connection. Abort\
-ing"; exit 1; }
+ download_artifact_from_ci pull-replication
if [ "$HTTPS_ENABLED" = "true" ];then
export HTTP_PROTOCOL="https"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 2d67eab..e3f53ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -65,6 +65,8 @@
private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+ private static final long DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT = 30000;
+
private final Supplier<Cache> cache;
private final Supplier<Event> event;
private final Supplier<Index> index;
@@ -227,6 +229,17 @@
}
}
+ private static long getLong(
+ Supplier<Config> cfg, String section, String subSection, String name, long defaultValue) {
+ try {
+ return cfg.get().getLong(section, subSection, name, defaultValue);
+ } catch (IllegalArgumentException e) {
+ log.error("invalid value for {}; using default value {}", name, defaultValue);
+ log.debug("Failed to retrieve long value: {}", e.getMessage(), e);
+ return defaultValue;
+ }
+ }
+
public static class Projects {
public static final String SECTION = "projects";
public static final String PATTERN_KEY = "pattern";
@@ -293,12 +306,15 @@
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
static final String RETRY_INTERVAL_KEY = "retryInterval";
+ static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
+ static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
private final int retryInterval;
private final int maxTries;
private final int numStripedLocks;
+ private final boolean synchronizeForced;
private Index(Supplier<Config> cfg) {
super(cfg, INDEX_SECTION);
@@ -309,6 +325,8 @@
maxTries = getInt(cfg, INDEX_SECTION, null, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
numStripedLocks =
getInt(cfg, INDEX_SECTION, null, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+ synchronizeForced =
+ getBoolean(cfg, INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
}
public int threadPoolSize() {
@@ -326,19 +344,36 @@
public int numStripedLocks() {
return numStripedLocks;
}
+
+ public boolean synchronizeForced() {
+ return synchronizeForced;
+ }
}
public static class Broker {
static final String BROKER_SECTION = "broker";
+ static final String STREAM_EVENT_PUBLISH_TIMEOUT = "streamEventPublishTimeoutMs";
private final Config cfg;
+ private long streamEventPublishTimeout;
Broker(Supplier<Config> cfgSupplier) {
cfg = cfgSupplier.get();
+ streamEventPublishTimeout =
+ getLong(
+ cfgSupplier,
+ BROKER_SECTION,
+ null,
+ STREAM_EVENT_PUBLISH_TIMEOUT,
+ DEFALULT_STREAM_EVENT_PUBLISH_TIMEOUT);
}
public String getTopic(String topicKey, String defValue) {
return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
}
+
+ public long getStreamEventPublishTimeout() {
+ return streamEventPublishTimeout;
+ }
}
public static class ReplicationFilter {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java
index 106bcde..7c88185 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/LibModuleLogFile.java
@@ -23,10 +23,11 @@
public abstract class LibModuleLogFile {
public LibModuleLogFile(SystemLog systemLog, String logName, Layout layout) {
- AsyncAppender asyncAppender = systemLog.createAsyncAppender(logName, layout, true, true);
Logger logger = LogManager.getLogger(logName);
- logger.removeAppender(logName);
- logger.addAppender(asyncAppender);
- logger.setAdditivity(false);
+ if (logger.getAppender(logName) == null) {
+ AsyncAppender asyncAppender = systemLog.createAsyncAppender(logName, layout, true, true);
+ logger.addAppender(asyncAppender);
+ logger.setAdditivity(false);
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index f58efa7..88422e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -22,11 +22,11 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@@ -39,17 +39,20 @@
private final DynamicItem<BrokerApi> apiDelegate;
private final BrokerMetrics metrics;
private final MessageLogger msgLog;
+ private final String nodeInstanceId;
@Inject
public BrokerApiWrapper(
@BrokerExecutor Executor executor,
DynamicItem<BrokerApi> apiDelegate,
BrokerMetrics metrics,
- MessageLogger msgLog) {
+ MessageLogger msgLog,
+ @GerritInstanceId String instanceId) {
this.apiDelegate = apiDelegate;
this.executor = executor;
this.metrics = metrics;
this.msgLog = msgLog;
+ this.nodeInstanceId = instanceId;
}
public boolean sendSync(String topic, Event event) {
@@ -70,7 +73,7 @@
@Override
public ListenableFuture<Boolean> send(String topic, Event message) {
SettableFuture<Boolean> resultFuture = SettableFuture.create();
- if (Context.isForwardedEvent()) {
+ if (!nodeInstanceId.equals(message.instanceId)) {
resultFuture.set(true);
return resultFuture;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java
index 6373f58..5e54951 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheModule.java
@@ -14,18 +14,31 @@
package com.googlesource.gerrit.plugins.multisite.cache;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
import java.util.concurrent.Executor;
public class CacheModule extends LifecycleModule {
+ private final Class<? extends ExecutorProvider> cacheExecutorProviderClass;
+
+ public CacheModule() {
+ this(CacheExecutorProvider.class);
+ }
+
+ @VisibleForTesting
+ public CacheModule(Class<? extends ExecutorProvider> cacheExecutorProviderClass) {
+ this.cacheExecutorProviderClass = cacheExecutorProviderClass;
+ }
+
@Override
protected void configure() {
- bind(Executor.class).annotatedWith(CacheExecutor.class).toProvider(CacheExecutorProvider.class);
+ bind(Executor.class).annotatedWith(CacheExecutor.class).toProvider(cacheExecutorProviderClass);
listener().to(CacheExecutorProvider.class);
DynamicSet.bind(binder(), CacheRemovalListener.class).to(CacheEvictionHandler.class);
DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ProjectListUpdateHandler.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index 44f8417..6a2c7a5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -14,7 +14,6 @@
package com.googlesource.gerrit.plugins.multisite.cache;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.events.ProjectEvent;
@@ -33,18 +32,15 @@
private final DynamicSet<ProjectListUpdateForwarder> forwarders;
private final Executor executor;
- private final ProjectsFilter projectsFilter;
private final String instanceId;
@Inject
public ProjectListUpdateHandler(
DynamicSet<ProjectListUpdateForwarder> forwarders,
@CacheExecutor Executor executor,
- ProjectsFilter filter,
@GerritInstanceId String instanceId) {
this.forwarders = forwarders;
this.executor = executor;
- this.projectsFilter = filter;
this.instanceId = instanceId;
}
@@ -61,7 +57,7 @@
}
private void process(ProjectEvent event, boolean delete) {
- if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
+ if (!Context.isForwardedEvent()) {
executor.execute(
new ProjectListUpdateTask(
new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 7107b87..c2b9e9c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -70,13 +70,9 @@
if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
|| !shouldConsumeEvent(event)) {
if (Strings.isNullOrEmpty(sourceInstanceId)) {
- logger.atWarning().log(
- String.format(
- "Dropping event %s because sourceInstanceId cannot be null", event.toString()));
+ logger.atWarning().log("Dropping event %s because sourceInstanceId cannot be null", event);
} else if (instanceId.equals(sourceInstanceId)) {
- logger.atFiner().log(
- String.format(
- "Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
+ logger.atFiner().log("Dropping event %s produced by our instanceId %s", event, instanceId);
}
droppedEventListeners.forEach(l -> l.onEventDropped(event));
} else {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java
deleted file mode 100644
index 07bbbf7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface EventExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java
deleted file mode 100644
index e4979f9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
-
-@Singleton
-class EventExecutorProvider extends ExecutorProvider {
-
- @Inject
- EventExecutorProvider(WorkQueue workQueue) {
- super(workQueue, 1, "Forward-Stream-Event");
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
deleted file mode 100644
index b2efb80..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventHandler.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.ProjectEvent;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import java.util.concurrent.Executor;
-
-class EventHandler implements EventListener {
- private final Executor executor;
- private final DynamicSet<StreamEventForwarder> forwarders;
- private final ProjectsFilter projectsFilter;
-
- @Inject
- EventHandler(
- DynamicSet<StreamEventForwarder> forwarders,
- @EventExecutor Executor executor,
- ProjectsFilter projectsFilter) {
- this.forwarders = forwarders;
- this.executor = executor;
- this.projectsFilter = projectsFilter;
- }
-
- @Override
- public void onEvent(Event event) {
- if (!Context.isForwardedEvent() && event instanceof ProjectEvent) {
- if (projectsFilter.matches(event)) {
- executor.execute(new EventTask(event));
- }
- }
- }
-
- class EventTask implements Runnable {
- private final Event event;
-
- EventTask(Event event) {
- this.event = event;
- }
-
- @Override
- public void run() {
- forwarders.forEach(f -> f.send(event));
- }
-
- @Override
- public String toString() {
- return String.format("Send event '%s' to target instance", event.type);
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
index 5f16210..8c67823 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/event/EventModule.java
@@ -14,37 +14,42 @@
package com.googlesource.gerrit.plugins.multisite.event;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherConfig;
+import com.gerritforge.gerrit.eventbroker.publisher.StreamEventPublisherModule;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.events.EventListener;
-import com.google.inject.Scopes;
+import com.google.inject.Inject;
import com.google.inject.multibindings.OptionalBinder;
import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdateImpl;
-import java.util.concurrent.Executor;
public class EventModule extends LifecycleModule {
+ private final Configuration configuration;
- private final Configuration config;
-
- public EventModule(Configuration config) {
- this.config = config;
+ @Inject
+ public EventModule(Configuration configuration) {
+ this.configuration = configuration;
}
@Override
protected void configure() {
- bind(Executor.class).annotatedWith(EventExecutor.class).toProvider(EventExecutorProvider.class);
- listener().to(EventExecutorProvider.class);
- DynamicSet.bind(binder(), EventListener.class).to(EventHandler.class);
+ bind(StreamEventPublisherConfig.class)
+ .toInstance(
+ new StreamEventPublisherConfig(
+ EventTopic.STREAM_EVENT_TOPIC.topic(configuration),
+ configuration.broker().getStreamEventPublishTimeout()));
+
+ install(new StreamEventPublisherModule());
+
OptionalBinder<ProjectVersionRefUpdate> projectVersionRefUpdateBinder =
OptionalBinder.newOptionalBinder(binder(), ProjectVersionRefUpdate.class);
- if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
- DynamicSet.bind(binder(), EventListener.class).to(ProjectVersionRefUpdateImpl.class);
- projectVersionRefUpdateBinder
- .setBinding()
- .to(ProjectVersionRefUpdateImpl.class)
- .in(Scopes.SINGLETON);
+ if (configuration.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
+ DynamicSet.bind(binder(), GitBatchRefUpdateListener.class)
+ .to(ProjectVersionRefUpdateImpl.class);
+ projectVersionRefUpdateBinder.setBinding().to(ProjectVersionRefUpdateImpl.class);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
index dcfd1b0..e2e3954 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandler.java
@@ -51,11 +51,8 @@
*/
public void dispatch(Event event) throws PermissionBackendException {
try (ManualRequestContext ctx = oneOffCtx.open()) {
- Context.setForwardedEvent(true);
log.debug("dispatching event {}", event.getType());
dispatcher.get().postEvent(event);
- } finally {
- Context.unsetForwardedEvent();
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
index ca17004..1d36e23 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwarderModule.java
@@ -24,6 +24,5 @@
DynamicSet.setOf(binder(), CacheEvictionForwarder.class);
DynamicSet.setOf(binder(), IndexEventForwarder.class);
DynamicSet.setOf(binder(), ProjectListUpdateForwarder.class);
- DynamicSet.setOf(binder(), StreamEventForwarder.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java
deleted file mode 100644
index 79a9af2..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/StreamEventForwarder.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder;
-
-import com.google.gerrit.server.events.Event;
-
-public interface StreamEventForwarder {
- /**
- * Forward a stream event to the other master.
- *
- * @param event the event to forward.
- * @return true if successful, otherwise false.
- */
- boolean send(Event event);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 6bd6437..d6c5658 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -19,7 +19,6 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
public class BrokerForwarderModule extends LifecycleModule {
@Override
@@ -28,6 +27,5 @@
DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
.to(BrokerProjectListUpdateForwarder.class);
- DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
deleted file mode 100644
index 8a020fc..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-
-@Singleton
-public class BrokerStreamEventForwarder implements StreamEventForwarder {
- private final BrokerApiWrapper broker;
- private final Configuration cfg;
-
- @Inject
- BrokerStreamEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
- this.broker = broker;
- this.cfg = cfg;
- }
-
- @Override
- public boolean send(Event event) {
- return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 4e7a781..3255bcf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -21,7 +21,7 @@
BATCH_INDEX_TOPIC("GERRIT.EVENT.BATCH.INDEX", "batchIndexEvent"),
CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
PROJECT_LIST_TOPIC("GERRIT.EVENT.PROJECT.LIST", "projectListEvent"),
- STREAM_EVENT_TOPIC("GERRIT.EVENT.STREAM", "streamEvent");
+ STREAM_EVENT_TOPIC("gerrit", "streamEvent");
private final String topic;
private final String aliasKey;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 4ef5b7f..063d7bf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -14,10 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.index;
-import com.google.gerrit.entities.Change;
-import com.google.gerrit.entities.HumanComment;
import com.google.gerrit.exceptions.StorageException;
-import com.google.gerrit.server.CommentsUtil;
import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.git.GitRepositoryManager;
@@ -42,7 +39,6 @@
public class ChangeCheckerImpl implements ChangeChecker {
private static final Logger log = LoggerFactory.getLogger(ChangeCheckerImpl.class);
private final GitRepositoryManager gitRepoMgr;
- private final CommentsUtil commentsUtil;
private final OneOffRequestContext oneOffReqCtx;
private final String changeId;
private final ChangeFinder changeFinder;
@@ -57,14 +53,12 @@
@Inject
public ChangeCheckerImpl(
GitRepositoryManager gitRepoMgr,
- CommentsUtil commentsUtil,
ChangeFinder changeFinder,
OneOffRequestContext oneOffReqCtx,
@GerritInstanceId String instanceId,
@Assisted String changeId) {
this.changeFinder = changeFinder;
this.gitRepoMgr = gitRepoMgr;
- this.commentsUtil = commentsUtil;
this.oneOffReqCtx = oneOffReqCtx;
this.changeId = changeId;
this.instanceId = instanceId;
@@ -177,20 +171,7 @@
}
private Optional<Long> computeLastChangeTs() {
- return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(notes));
- }
-
- private long getTsFromChangeAndDraftComments(ChangeNotes notes) {
- Change change = notes.getChange();
- Timestamp changeTs = change.getLastUpdatedOn();
- try {
- for (HumanComment comment : commentsUtil.draftByChange(changeNotes.get())) {
- Timestamp commentTs = comment.writtenOn;
- changeTs = commentTs.after(changeTs) ? commentTs : changeTs;
- }
- } catch (StorageException e) {
- log.warn("Unable to access draft comments for change {}", change, e);
- }
- return changeTs.getTime() / 1000;
+ return getChangeNotes()
+ .map(notes -> Timestamp.from(notes.getChange().getLastUpdatedOn()).getTime() / 1000);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
new file mode 100644
index 0000000..d6598b9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/CurrentRequestContext.java
@@ -0,0 +1,58 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import java.util.function.Consumer;
+
+@Singleton
+public class CurrentRequestContext {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private ThreadLocalRequestContext threadLocalCtx;
+ private Configuration cfg;
+ private OneOffRequestContext oneOffCtx;
+
+ @Inject
+ public CurrentRequestContext(
+ ThreadLocalRequestContext threadLocalCtx, Configuration cfg, OneOffRequestContext oneOffCtx) {
+ this.threadLocalCtx = threadLocalCtx;
+ this.cfg = cfg;
+ this.oneOffCtx = oneOffCtx;
+ }
+
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ RequestContext ctx = threadLocalCtx.getContext();
+ if (ctx == null && !cfg.index().synchronizeForced()) {
+ logger.atFine().log("No context, skipping event (index.synchronizeForced is false)");
+ return;
+ }
+
+ if (ctx == null) {
+ try (ManualRequestContext manualCtx = oneOffCtx.open()) {
+ body.accept(manualCtx);
+ }
+ } else {
+ body.accept(ctx);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index 02f1b1c..9aee56e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -14,7 +14,6 @@
package com.googlesource.gerrit.plugins.multisite.index;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.common.base.Objects;
import com.google.gerrit.extensions.events.AccountIndexedListener;
import com.google.gerrit.extensions.events.ChangeIndexedListener;
@@ -47,39 +46,42 @@
private final DynamicSet<IndexEventForwarder> forwarders;
private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ChangeCheckerImpl.Factory changeChecker;
- private final ProjectsFilter projectsFilter;
private final GroupChecker groupChecker;
private final String instanceId;
+ private final CurrentRequestContext currCtx;
@Inject
IndexEventHandler(
@IndexExecutor Executor executor,
DynamicSet<IndexEventForwarder> forwarders,
ChangeCheckerImpl.Factory changeChecker,
- ProjectsFilter projectsFilter,
GroupChecker groupChecker,
- @GerritInstanceId String instanceId) {
+ @GerritInstanceId String instanceId,
+ CurrentRequestContext currCtx) {
this.forwarders = forwarders;
this.executor = executor;
this.changeChecker = changeChecker;
- this.projectsFilter = projectsFilter;
this.groupChecker = groupChecker;
this.instanceId = instanceId;
+ this.currCtx = currCtx;
}
@Override
public void onAccountIndexed(int id) {
- if (!Context.isForwardedEvent()) {
- IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
- }
+ currCtx.onlyWithContext(
+ (ctx) -> {
+ if (!Context.isForwardedEvent()) {
+ IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
+ if (queuedTasks.add(task)) {
+ executor.execute(task);
+ }
+ }
+ });
}
@Override
public void onChangeIndexed(String projectName, int id) {
- executeIndexChangeTask(projectName, id);
+ currCtx.onlyWithContext((ctx) -> executeIndexChangeTask(projectName, id));
}
@Override
@@ -101,7 +103,7 @@
@Override
public void onProjectIndexed(String projectName) {
- if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
+ if (!Context.isForwardedEvent()) {
IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
if (queuedTasks.add(task)) {
executor.execute(task);
@@ -110,7 +112,7 @@
}
private void executeIndexChangeTask(String projectName, int id) {
- if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
+ if (!Context.isForwardedEvent()) {
ChangeChecker checker = changeChecker.create(projectName + "~" + id);
try {
@@ -182,7 +184,9 @@
@Override
public String toString() {
- return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+ return String.format(
+ "Index change %s for project %s produced by instance %s",
+ changeIndexEvent.changeId, changeIndexEvent.projectName, changeIndexEvent.instanceId);
}
}
@@ -213,7 +217,9 @@
@Override
public String toString() {
- return String.format("Index change %s in target instance", changeIndexEvent.changeId);
+ return String.format(
+ "Index change %s for project %s produced by instance %s",
+ changeIndexEvent.changeId, changeIndexEvent.projectName, changeIndexEvent.instanceId);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
index d851420..3fc57ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
@@ -86,7 +86,7 @@
} catch (IOException ioe) {
String message = String.format("Error while opening project: '%s'", projectName);
repLog.error(message);
- logger.atSevere().withCause(ioe).log(message);
+ logger.atSevere().withCause(ioe).log("%s", message);
return Collections.emptySet();
}
}
@@ -157,13 +157,13 @@
} catch (GlobalRefDbLockException gle) {
String message = String.format("%s is locked on shared-refdb", ref);
repLog.error(message);
- logger.atSevere().withCause(gle).log(message);
+ logger.atSevere().withCause(gle).log("%s", message);
return Optional.empty();
} catch (IOException ioe) {
String message =
String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
repLog.error(message);
- logger.atSevere().withCause(ioe).log(message);
+ logger.atSevere().withCause(ioe).log("%s", message);
return Optional.empty();
}
}
@@ -194,7 +194,7 @@
String message =
String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
repLog.error(message);
- logger.atWarning().withCause(ie).log(message);
+ logger.atWarning().withCause(ie).log("%s", message);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 6f9c2e7..0aba056 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -102,9 +102,9 @@
.collect(Collectors.toList());
} catch (IOException ioe) {
- String message = String.format("Error while opening project: '%s'", projectName);
- repLog.error(message);
- logger.atSevere().withCause(ioe).log(message);
+ final String messageFmt = "Error while opening project: '%s'";
+ repLog.error(messageFmt, projectName);
+ logger.atSevere().withCause(ioe).log(messageFmt, projectName);
return Collections.emptyList();
}
}
@@ -132,16 +132,14 @@
? Optional.of(refUpdateReloaded)
: Optional.empty();
} catch (GlobalRefDbLockException gle) {
- String message =
- String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
- repLog.error(message);
- logger.atSevere().withCause(gle).log(message);
+ final String messageFmt = "%s is locked on shared-refdb and thus will NOT BE replicated";
+ repLog.error(messageFmt, ref);
+ logger.atSevere().withCause(gle).log(messageFmt, ref);
return Optional.empty();
} catch (IOException ioe) {
- String message =
- String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
- repLog.error(message);
- logger.atSevere().withCause(ioe).log(message);
+ final String messageFmt = "Error while extracting ref '%s' for project '%s'";
+ repLog.error(messageFmt, ref, projectName);
+ logger.atSevere().withCause(ioe).log(messageFmt, ref, projectName);
return Optional.empty();
}
}
@@ -174,10 +172,9 @@
try {
Thread.sleep(randomSleepTimeMsec);
} catch (InterruptedException ie) {
- String message =
- String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
- repLog.error(message);
- logger.atWarning().withCause(ie).log(message);
+ final String messageFmt = "Error while waiting for next check for '%s', ref '%s'";
+ repLog.error(messageFmt, projectName, ref);
+ logger.atWarning().withCause(ie).log(messageFmt, projectName, ref);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
index e07fef0..150ea52 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
@@ -18,22 +18,18 @@
import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.Project.NameKey;
-import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@@ -44,7 +40,9 @@
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
-public class ProjectVersionRefUpdateImpl implements EventListener, ProjectVersionRefUpdate {
+@Singleton
+public class ProjectVersionRefUpdateImpl
+ implements GitBatchRefUpdateListener, ProjectVersionRefUpdate {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
ImmutableSet.of(RefUpdate.Result.NEW, RefUpdate.Result.FORCED, RefUpdate.Result.NO_CHANGE);
@@ -52,7 +50,6 @@
private final GitRepositoryManager gitRepositoryManager;
private final GitReferenceUpdated gitReferenceUpdated;
private final ProjectVersionLogger verLogger;
- private final ProjectsFilter projectsFilter;
protected final SharedRefDatabaseWrapper sharedRefDb;
@@ -61,63 +58,43 @@
GitRepositoryManager gitRepositoryManager,
SharedRefDatabaseWrapper sharedRefDb,
GitReferenceUpdated gitReferenceUpdated,
- ProjectVersionLogger verLogger,
- ProjectsFilter projectsFilter) {
+ ProjectVersionLogger verLogger) {
this.gitRepositoryManager = gitRepositoryManager;
this.sharedRefDb = sharedRefDb;
this.gitReferenceUpdated = gitReferenceUpdated;
this.verLogger = verLogger;
- this.projectsFilter = projectsFilter;
}
@Override
- public void onEvent(Event event) {
- logger.atFine().log("Processing event type: " + event.type);
+ public void onGitBatchRefUpdate(Event event) {
// Producer of the Event use RefUpdatedEvent to trigger the version update
- if (!Context.isForwardedEvent() && event instanceof RefUpdatedEvent) {
- if (projectsFilter.matches(event)) {
- updateProducerProjectVersionUpdate((RefUpdatedEvent) event);
- }
- }
+ updateProducerProjectVersionUpdate(event);
}
- private boolean isSpecialRefName(String refName) {
- return refName.startsWith(RefNames.REFS_SEQUENCES)
- || refName.startsWith(RefNames.REFS_STARRED_CHANGES)
- || refName.equals(MULTI_SITE_VERSIONING_REF);
- }
-
- private void updateProducerProjectVersionUpdate(RefUpdatedEvent refUpdatedEvent) {
- String refName = refUpdatedEvent.getRefName();
-
- if (isSpecialRefName(refName)) {
+ private void updateProducerProjectVersionUpdate(Event refUpdatedEvent) {
+ if (refUpdatedEvent.getRefNames().stream()
+ .allMatch(refName -> refName.equals(MULTI_SITE_VERSIONING_REF))) {
logger.atFine().log(
"Found a special ref name %s, skipping update for %s",
- refName, refUpdatedEvent.getProjectNameKey().get());
+ MULTI_SITE_VERSIONING_REF, refUpdatedEvent.getProjectName());
return;
}
+
try {
- Project.NameKey projectNameKey = refUpdatedEvent.getProjectNameKey();
+ Project.NameKey projectNameKey = Project.nameKey(refUpdatedEvent.getProjectName());
long newVersion = getCurrentGlobalVersionNumber();
- Optional<RefUpdate> newProjectVersionRefUpdate =
- updateLocalProjectVersion(projectNameKey, newVersion);
+ RefUpdate newProjectVersionRefUpdate = updateLocalProjectVersion(projectNameKey, newVersion);
- if (newProjectVersionRefUpdate.isPresent()) {
- verLogger.log(projectNameKey, newVersion, 0L);
+ verLogger.log(projectNameKey, newVersion, 0L);
- if (updateSharedProjectVersion(projectNameKey, newVersion)) {
- gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate.get(), null);
- }
- } else {
- logger.atWarning().log(
- "Ref %s not found on projet %s: skipping project version update",
- refUpdatedEvent.getRefName(), projectNameKey);
+ if (updateSharedProjectVersion(projectNameKey, newVersion)) {
+ gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate, null);
}
} catch (LocalProjectVersionUpdateException | SharedProjectVersionUpdateException e) {
logger.atSevere().withCause(e).log(
- "Issue encountered when updating version for project "
- + refUpdatedEvent.getProjectNameKey());
+ "Issue encountered when updating version for project %s",
+ refUpdatedEvent.getProjectName());
}
}
@@ -136,6 +113,7 @@
return newId;
}
+ @SuppressWarnings("FloggerLogString")
private boolean updateSharedProjectVersion(Project.NameKey projectNameKey, Long newVersion)
throws SharedProjectVersionUpdateException {
@@ -147,15 +125,13 @@
try {
if (sharedVersion.isPresent() && sharedVersion.get() >= newVersion) {
logger.atWarning().log(
- String.format(
- "NOT Updating project %s value=%d in shared ref-db because is more recent than the local value=%d",
- projectNameKey.get(), newVersion, sharedVersion.get()));
+ "NOT Updating project %s value=%d in shared ref-db because is more recent than the local value=%d",
+ projectNameKey.get(), newVersion, sharedVersion.get());
return false;
}
logger.atFine().log(
- String.format(
- "Updating shared project %s value to %d", projectNameKey.get(), newVersion));
+ "Updating shared project %s value to %d", projectNameKey.get(), newVersion);
updateProjectVersionValue(projectNameKey, newVersion, sharedVersion);
return true;
@@ -250,8 +226,8 @@
}
}
- private Optional<RefUpdate> updateLocalProjectVersion(
- Project.NameKey projectNameKey, long newVersionNumber)
+ @SuppressWarnings("FloggerLogString")
+ private RefUpdate updateLocalProjectVersion(Project.NameKey projectNameKey, long newVersionNumber)
throws LocalProjectVersionUpdateException {
logger.atFine().log(
"Updating local version for project %s with version %d",
@@ -268,7 +244,7 @@
throw new LocalProjectVersionUpdateException(message);
}
- return Optional.of(refUpdate);
+ return refUpdate;
} catch (IOException e) {
String message = "Cannot create versioning command for " + projectNameKey.get();
logger.atSevere().withCause(e).log(message);
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e451cb3..0485d5b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -47,6 +47,11 @@
service such as ElasticSearch.
Defaults to true.
+```index.synchronizeForced```
+: Whether to synchronize forced index events. E.g. on-line reindex
+automatically triggered upon version upgrades.
+Defaults to true.
+
```index.threadPoolSize```
: Maximum number of threads used to send index events to the target instance.
Defaults to 4.
@@ -67,9 +72,9 @@
: Name of the topic to use for publishing indexing events
Defaults to GERRIT.EVENT.INDEX
-```broker.streamEventTopic```
-: Name of the topic to use for publishing stream events
- Defaults to GERRIT.EVENT.STREAM
+`broker.streamEventTopic`
+: Name of the topic to use for publishing all stream events.
+ Default: gerrit
```broker.cacheEventTopic```
: Name of the topic to use for publishing cache eviction events
@@ -79,6 +84,10 @@
: Name of the topic to use for publishing cache eviction events
Defaults to GERRIT.EVENT.PROJECT.LIST
+```broker.streamEventPublishTimeoutMs```
+: The timeout in milliseconds for publishing stream events.
+ Defaults to 30000 (30 seconds).
+
**NOTE**: All broker settings are ignored when all of the `cache`,
`index` or `event` synchronization is disabled.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
index 0863f55..3a24773 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ConfigurationTest.java
@@ -21,7 +21,9 @@
import static com.googlesource.gerrit.plugins.multisite.Configuration.Event.EVENT_SECTION;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Forwarding.SYNCHRONIZE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.DEFAULT_SYNCHRONIZE_FORCED;
import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.INDEX_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.Configuration.Index.SYNCHRONIZE_FORCED_KEY;
import static com.googlesource.gerrit.plugins.multisite.Configuration.THREAD_POOL_SIZE_KEY;
import com.google.common.collect.ImmutableList;
@@ -129,4 +131,13 @@
replicationConfig.setBoolean("gerrit", null, "replicateOnStartup", true);
assertThat(new Configuration(globalPluginConfig, replicationConfig).validate()).isNotEmpty();
}
+
+ @Test
+ public void testGetIndexSynchronizeForced() throws Exception {
+ assertThat(getConfiguration().index().synchronizeForced())
+ .isEqualTo(DEFAULT_SYNCHRONIZE_FORCED);
+
+ globalPluginConfig.setBoolean(INDEX_SECTION, null, SYNCHRONIZE_FORCED_KEY, false);
+ assertThat(getConfiguration().index().synchronizeForced()).isFalse();
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 7d1751c..50f55b2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -22,6 +22,7 @@
@RunWith(MockitoJUnitRunner.class)
public class BrokerApiWrapperTest {
+ private static final String DEFAULT_INSTANCE_ID = "instance-id";
@Mock private BrokerMetrics brokerMetrics;
@Mock private BrokerApi brokerApi;
@Mock Event event;
@@ -32,13 +33,14 @@
@Before
public void setUp() {
- event.instanceId = "instance-id";
+ event.instanceId = DEFAULT_INSTANCE_ID;
objectUnderTest =
new BrokerApiWrapper(
MoreExecutors.directExecutor(),
DynamicItem.itemOf(BrokerApi.class, brokerApi),
brokerMetrics,
- msgLog);
+ msgLog,
+ DEFAULT_INSTANCE_ID);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
index ce222d0..ed51e1b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.cache;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
@@ -44,6 +44,6 @@
handler.onRemoval(
"test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
- verifyZeroInteractions(executorMock);
+ verifyNoInteractions(executorMock);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index c8216bd..3a4242c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -20,10 +20,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
@@ -46,14 +45,12 @@
private ProjectListUpdateHandler handler;
@Mock private ProjectListUpdateForwarder forwarder;
- @Mock private ProjectsFilter projectsFilter;
@Before
public void setUp() {
- when(projectsFilter.matches(any(String.class))).thenReturn(true);
handler =
new ProjectListUpdateHandler(
- asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
+ asDynamicSet(forwarder), MoreExecutors.directExecutor(), INSTANCE_ID);
}
private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -93,12 +90,11 @@
handler.onNewProjectCreated(mock(NewProjectCreatedListener.Event.class));
handler.onProjectDeleted(mock(ProjectDeletedListener.Event.class));
Context.unsetForwardedEvent();
- verifyZeroInteractions(forwarder);
+ verifyNoInteractions(forwarder);
}
@Test
public void shouldNotForwardIfFilteredOutByProjectName() throws Exception {
- when(projectsFilter.matches(any(String.class))).thenReturn(false);
String projectName = "projectToAdd";
NewProjectCreatedListener.Event event = mock(NewProjectCreatedListener.Event.class);
when(event.getProjectName()).thenReturn(projectName);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
index 0e63528..fd4a0aa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -128,6 +128,6 @@
Change.id(CHANGE_ID),
Account.id(9999),
BranchNameKey.create(Project.nameKey(PROJECT_NAME), "refs/heads/master"),
- TimeUtil.nowTs());
+ TimeUtil.now());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index 6413b18..6a03a6c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -16,7 +16,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Suppliers;
@@ -140,7 +140,7 @@
metrics.updateReplicationStatusMetrics(eventMessage);
- verifyZeroInteractions(verLogger);
+ verifyNoInteractions(verLogger);
}
@Test
@@ -152,7 +152,7 @@
metrics.updateReplicationStatusMetrics(eventMessage);
- verifyZeroInteractions(verLogger);
+ verifyNoInteractions(verLogger);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java
deleted file mode 100644
index a025dd0..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventExecutorProviderTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventExecutorProviderTest {
- @Mock private ScheduledThreadPoolExecutor executorMock;
- private EventExecutorProvider eventsExecutorProvider;
-
- @Before
- public void setUp() throws Exception {
- WorkQueue workQueueMock = mock(WorkQueue.class);
- when(workQueueMock.createQueue(1, "Forward-Stream-Event")).thenReturn(executorMock);
- eventsExecutorProvider = new EventExecutorProvider(workQueueMock);
- }
-
- @Test
- public void shouldReturnExecutor() throws Exception {
- assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
- }
-
- @Test
- public void testStop() throws Exception {
- eventsExecutorProvider.start();
- assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
- eventsExecutorProvider.stop();
- verify(executorMock).shutdown();
- assertThat(eventsExecutorProvider.get()).isNull();
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
deleted file mode 100644
index b9b416d..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/EventHandlerTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.ProjectEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.googlesource.gerrit.plugins.multisite.event.EventHandler.EventTask;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventHandlerTest {
-
- private EventHandler eventHandler;
-
- @Mock private StreamEventForwarder forwarder;
- @Mock private ProjectsFilter projectsFilter;
-
- @Before
- public void setUp() {
- when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(true);
- eventHandler =
- new EventHandler(asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
- }
-
- private DynamicSet<StreamEventForwarder> asDynamicSet(StreamEventForwarder forwarder) {
- DynamicSet<StreamEventForwarder> result = new DynamicSet<>();
- result.add("multi-site", forwarder);
- return result;
- }
-
- @Test
- public void shouldForwardAnyProjectEvent() throws Exception {
- ProjectEvent event = mock(ProjectEvent.class);
- eventHandler.onEvent(event);
- verify(forwarder).send(event);
- }
-
- @Test
- public void shouldNotForwardNonProjectEvent() throws Exception {
- eventHandler.onEvent(mock(Event.class));
- verifyZeroInteractions(forwarder);
- }
-
- @Test
- public void shouldNotForwardIfAlreadyForwardedEvent() throws Exception {
- Context.setForwardedEvent(true);
- eventHandler.onEvent(mock(ProjectEvent.class));
- Context.unsetForwardedEvent();
- verifyZeroInteractions(forwarder);
- }
-
- @Test
- public void shouldNotForwardIfFilteredOutByProjectName() throws Exception {
- when(projectsFilter.matches(any(ProjectEvent.class))).thenReturn(false);
-
- ProjectEvent event = mock(ProjectEvent.class);
-
- eventHandler.onEvent(event);
- verify(forwarder, never()).send(event);
- }
-
- @Test
- public void tesEventTaskToString() throws Exception {
- Event event = new RefUpdatedEvent();
- EventTask task = eventHandler.new EventTask(event);
- assertThat(task.toString())
- .isEqualTo(String.format("Send event '%s' to target instance", event.type));
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 413b6c1..4ffdbbc 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -16,7 +16,7 @@
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import com.google.gerrit.entities.Account;
import com.google.gerrit.server.config.AllUsersName;
@@ -73,7 +73,7 @@
verify(indexAccountHandler)
.indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
- verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+ verifyNoInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
}
@Test
@@ -87,7 +87,7 @@
verify(indexAccountHandler)
.indexAsync(Account.id(event.accountId), ForwardedIndexingHandler.Operation.INDEX);
- verifyZeroInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
+ verifyNoInteractions(indexChangeHandler, indexGroupHandler, indexProjectHandler);
streamEventRouter.route(new RefReplicationDoneEvent(allUsersName.get(), "refs/any", 1));
@@ -103,7 +103,7 @@
verify(indexGroupHandler)
.index(groupId, ForwardedIndexingHandler.Operation.INDEX, Optional.of(event));
- verifyZeroInteractions(indexAccountHandler, indexChangeHandler, indexProjectHandler);
+ verifyNoInteractions(indexAccountHandler, indexChangeHandler, indexProjectHandler);
}
@Test
@@ -115,7 +115,7 @@
verify(indexProjectHandler)
.index(projectName, ForwardedIndexingHandler.Operation.INDEX, Optional.of(event));
- verifyZeroInteractions(indexAccountHandler, indexChangeHandler, indexGroupHandler);
+ verifyNoInteractions(indexAccountHandler, indexChangeHandler, indexGroupHandler);
}
@Test
@@ -129,7 +129,7 @@
ForwardedIndexingHandler.Operation.INDEX,
Optional.of(event));
- verifyZeroInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
+ verifyNoInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
}
@Test
@@ -143,7 +143,7 @@
ForwardedIndexingHandler.Operation.DELETE,
Optional.of(event));
- verifyZeroInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
+ verifyNoInteractions(indexAccountHandler, indexGroupHandler, indexProjectHandler);
}
@Test
@@ -151,7 +151,7 @@
final IndexEvent newEventType = new IndexEvent("new-type", INSTANCE_ID) {};
assertThrows(UnsupportedOperationException.class, () -> router.route(newEventType));
- verifyZeroInteractions(
+ verifyNoInteractions(
indexAccountHandler, indexChangeHandler, indexGroupHandler, indexProjectHandler);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
index 3b4bec5..ac90436 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
@@ -55,6 +55,6 @@
Change.id(1),
Account.id(1),
BranchNameKey.create("proj", "refs/heads/master"),
- TimeUtil.nowTs());
+ TimeUtil.now());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
index 28bf56d..c583455 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -16,7 +16,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
@@ -99,21 +99,21 @@
@Test
public void shouldSkipEventFromHighAvailabilityPluginThread() {
brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_PLUGIN), testTopic, testEvent);
- verifyZeroInteractions(brokerMock);
+ verifyNoInteractions(brokerMock);
}
@Test
public void shouldSkipEventFromHighAvailabilityPluginForwardedThread() {
brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_FORWARDED), testTopic, testEvent);
- verifyZeroInteractions(brokerMock);
+ verifyNoInteractions(brokerMock);
}
@Test
public void shouldSkipEventFromHighAvailabilityPluginBatchForwardedThread() {
brokerForwarder.send(newForwarderTask(HIGH_AVAILABILITY_BATCH_FORWARDED), testTopic, testEvent);
- verifyZeroInteractions(brokerMock);
+ verifyNoInteractions(brokerMock);
}
private ForwarderTask newForwarderTask(String threadName) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
index 75596ed..6ca1a7a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -29,10 +29,13 @@
import com.google.gerrit.extensions.registration.RegistrationHandle;
import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.project.ProjectCacheImpl;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
@@ -43,7 +46,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jgit.lib.Config;
import org.junit.After;
import org.junit.Before;
@@ -69,7 +75,7 @@
@Override
protected void configure() {
install(new ForwarderModule());
- install(new CacheModule());
+ install(new CacheModule(TestForwardingExecutorProvider.class));
install(new RouterModule());
install(new IndexModule());
SharedRefDbConfiguration sharedRefDbConfig =
@@ -78,11 +84,44 @@
}
}
+ @Singleton
+ public static class TestForwardingExecutorProvider extends ExecutorProvider {
+ private final ScheduledThreadPoolExecutor executor;
+ private final AtomicInteger executionsCounter;
+
+ @Inject
+ protected TestForwardingExecutorProvider(WorkQueue workQueue) {
+ super(workQueue, 1, "test");
+ executionsCounter = new AtomicInteger();
+ executor =
+ new ScheduledThreadPoolExecutor(1) {
+
+ @Override
+ public void execute(Runnable command) {
+ @SuppressWarnings("unused")
+ int ignored = executionsCounter.incrementAndGet();
+ super.execute(command);
+ }
+ };
+ }
+
+ @Override
+ public ScheduledExecutorService get() {
+ return executor;
+ }
+
+ public int executions() {
+ return executionsCounter.get();
+ }
+ }
+
public static class CacheEvictionsTracker<K, V> implements CacheRemovalListener<K, V> {
private final Map<String, Set<Object>> trackedEvictions;
private final CountDownLatch allExpectedEvictionsArrived;
+ private final String trackedCacheName;
- public CacheEvictionsTracker(int numExpectedEvictions) {
+ public CacheEvictionsTracker(String cacheName, int numExpectedEvictions) {
+ this.trackedCacheName = cacheName;
allExpectedEvictionsArrived = new CountDownLatch(numExpectedEvictions);
trackedEvictions = Maps.newHashMap();
}
@@ -99,22 +138,24 @@
@Override
public void onRemoval(
String pluginName, String cacheName, RemovalNotification<K, V> notification) {
- trackedEvictions.compute(
- cacheName,
- (k, v) -> {
- if (v == null) {
- return Sets.newHashSet(notification.getKey());
- }
- v.add(notification.getKey());
- return v;
- });
- allExpectedEvictionsArrived.countDown();
+ if (cacheName.equals(trackedCacheName)) {
+ trackedEvictions.compute(
+ cacheName,
+ (k, v) -> {
+ if (v == null) {
+ return Sets.newHashSet(notification.getKey());
+ }
+ v.add(notification.getKey());
+ return v;
+ });
+ allExpectedEvictionsArrived.countDown();
+ }
}
}
@Before
public void startTrackingCacheEvictions() {
- evictionsCacheTracker = new CacheEvictionsTracker<>(1);
+ evictionsCacheTracker = new CacheEvictionsTracker<>(ProjectCacheImpl.CACHE_NAME, 1);
cacheEvictionRegistrationHandle = cacheRemovalListeners.add("gerrit", evictionsCacheTracker);
}
@@ -135,6 +176,36 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ @GerritConfig(name = "cache.threads", value = "0")
+ public void shouldNotForwardProjectCacheEvictionsWhenEventIsForwarded() throws Exception {
+ TestForwardingExecutorProvider cacheForwarder =
+ plugin.getSysInjector().getInstance(TestForwardingExecutorProvider.class);
+ Context.setForwardedEvent(true);
+ projectCache.evict(allProjects);
+
+ evictionsCacheTracker.waitForExpectedEvictions();
+ assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
+ .contains(allProjects);
+
+ assertThat(cacheForwarder.executions()).isEqualTo(0);
+ }
+
+ @Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
+ public void shouldForwardProjectCacheEvictions() throws Exception {
+ TestForwardingExecutorProvider cacheForwarder =
+ plugin.getSysInjector().getInstance(TestForwardingExecutorProvider.class);
+ projectCache.evict(allProjects);
+
+ evictionsCacheTracker.waitForExpectedEvictions();
+ assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
+ .contains(allProjects);
+
+ assertThat(cacheForwarder.executions()).isEqualTo(1);
+ }
+
+ @Test
@GerritConfig(name = "gerrit.instanceId", value = "instance-id")
public void shouldEvictProjectCacheWithSlash() throws Exception {
ProjectInput in = new ProjectInput();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
index 704401b..73ef353 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedEventHandlerTest.java
@@ -14,13 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.forwarder;
-import static com.google.common.truth.Truth.assertThat;
-import static com.google.gerrit.testing.GerritJUnit.assertThrows;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.gerrit.exceptions.StorageException;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventDispatcher;
@@ -33,7 +29,6 @@
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
public class ForwardedEventHandlerTest {
@@ -56,45 +51,4 @@
handler.dispatch(event);
verify(dispatcherMock).postEvent(event);
}
-
- @Test
- public void shouldSetAndUnsetForwardedContext() throws Exception {
- Event event = new ProjectCreatedEvent();
- // this doAnswer is to allow to assert that context is set to forwarded
- // while cache eviction is called.
- doAnswer(
- (Answer<Void>)
- invocation -> {
- assertThat(Context.isForwardedEvent()).isTrue();
- return null;
- })
- .when(dispatcherMock)
- .postEvent(event);
-
- assertThat(Context.isForwardedEvent()).isFalse();
- handler.dispatch(event);
- assertThat(Context.isForwardedEvent()).isFalse();
-
- verify(dispatcherMock).postEvent(event);
- }
-
- @Test
- public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception {
- Event event = new ProjectCreatedEvent();
- doAnswer(
- (Answer<Void>)
- invocation -> {
- assertThat(Context.isForwardedEvent()).isTrue();
- throw new StorageException("someMessage");
- })
- .when(dispatcherMock)
- .postEvent(event);
-
- assertThat(Context.isForwardedEvent()).isFalse();
- StorageException thrown = assertThrows(StorageException.class, () -> handler.dispatch(event));
- assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
- assertThat(Context.isForwardedEvent()).isFalse();
-
- verify(dispatcherMock).postEvent(event);
- }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index b827cdf..2c4b157 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -87,7 +87,7 @@
public void setUp() throws Exception {
when(ctxMock.open()).thenReturn(manualRequestContextMock);
id = Change.id(TEST_CHANGE_NUMBER);
- change = new Change(null, id, null, null, TimeUtil.nowTs());
+ change = new Change(null, id, null, null, TimeUtil.now());
when(changeNotes.getChange()).thenReturn(change);
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
when(configurationMock.index()).thenReturn(index);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index 8cf6ea9..7e27eeb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -15,23 +15,26 @@
package com.googlesource.gerrit.plugins.multisite.index;
import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.index.IndexEventHandler.IndexProjectTask;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,23 +45,34 @@
public class IndexEventHandlerTest {
private static final String INSTANCE_ID = "instance-id";
+ private static final String PROJECT_NAME = "test_project";
+ private static final int CHANGE_ID = 1;
private IndexEventHandler eventHandler;
- @Mock private ProjectsFilter projectsFilter;
@Mock private IndexEventForwarder forwarder;
@Mock private ChangeCheckerImpl.Factory changeChecker;
+ @Mock private ChangeChecker changeCheckerMock;
+ @Mock private RequestContext mockCtx;
+
+ private CurrentRequestContext currCtx =
+ new CurrentRequestContext(null, null, null) {
+ @Override
+ public void onlyWithContext(Consumer<RequestContext> body) {
+ body.accept(mockCtx);
+ }
+ };
@Before
- public void setUp() {
+ public void setUp() throws IOException {
eventHandler =
new IndexEventHandler(
MoreExecutors.directExecutor(),
asDynamicSet(forwarder),
changeChecker,
- projectsFilter,
new TestGroupChecker(true),
- INSTANCE_ID);
+ INSTANCE_ID,
+ currCtx);
}
private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -68,41 +82,59 @@
}
@Test
- public void shouldNotForwardProjectIndexedIfFilteredOutByProjectName() throws Exception {
- when(projectsFilter.matches(any(String.class))).thenReturn(false);
-
- eventHandler.onProjectIndexed("test_project");
- verify(forwarder, never())
- .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
- }
-
- @Test
- public void shouldNotForwardIndexChangeIfFilteredOutByProjectName() throws Exception {
- int changeId = 1;
- when(projectsFilter.matches(any(String.class))).thenReturn(false);
-
- eventHandler.onChangeIndexed("test_project", changeId);
- verifyZeroInteractions(changeChecker);
- }
-
- @Test
public void shouldNotForwardIndexChangeIfCurrentThreadIsPullReplicationApplyObject()
throws Exception {
String currentThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("pull-replication~" + APPLY_OBJECT_API_ENDPOINT);
- int changeId = 1;
Context.setForwardedEvent(false);
- lenient().when(projectsFilter.matches(any(String.class))).thenReturn(true);
lenient()
.when(changeChecker.create(anyString()))
.thenThrow(
new IllegalStateException("Change indexing event should have not been triggered"));
- eventHandler.onChangeIndexed("test_project", changeId);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
verifyNoInteractions(changeChecker);
} finally {
Thread.currentThread().setName(currentThreadName);
}
}
+
+ @Test
+ public void shouldNotForwardIndexChangeWhenContextIsMissingAndForcedIndexingDisabled()
+ throws Exception {
+ eventHandler = createIndexEventHandler(changeChecker, false);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+ verifyNoInteractions(changeChecker);
+ verifyNoInteractions(forwarder);
+ }
+
+ @Test
+ public void shouldForwardIndexChangeWhenContextIsMissingAndForcedIndexingEnabled()
+ throws Exception {
+ when(changeChecker.create(any())).thenReturn(changeCheckerMock);
+ when(changeCheckerMock.newIndexEvent(PROJECT_NAME, CHANGE_ID, false))
+ .thenReturn(Optional.of(new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, false, INSTANCE_ID)));
+ eventHandler = createIndexEventHandler(changeChecker, true);
+ eventHandler.onChangeIndexed(PROJECT_NAME, CHANGE_ID);
+ verify(changeCheckerMock).newIndexEvent(PROJECT_NAME, CHANGE_ID, false);
+ verify(forwarder).index(any(), any());
+ }
+
+ private IndexEventHandler createIndexEventHandler(
+ ChangeCheckerImpl.Factory changeChecker, boolean synchronizeForced) {
+ ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
+ OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
+ Configuration cfgMock = mock(Configuration.class);
+ Configuration.Index cfgIndex = mock(Configuration.Index.class);
+ when(cfgMock.index()).thenReturn(cfgIndex);
+ when(cfgIndex.synchronizeForced()).thenReturn(synchronizeForced);
+ return new IndexEventHandler(
+ MoreExecutors.directExecutor(),
+ asDynamicSet(forwarder),
+ changeChecker,
+ new TestGroupChecker(true),
+ INSTANCE_ID,
+ new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index d99fb4b..e2e3113 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -24,29 +24,25 @@
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
-import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
-import com.google.common.base.Suppliers;
import com.google.gerrit.entities.Project;
-import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.data.RefUpdateAttribute;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.extensions.common.AccountInfo;
+import com.google.gerrit.extensions.events.GitBatchRefUpdateListener;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.project.ProjectConfig;
import com.google.gerrit.testing.InMemoryRepositoryManager;
import com.google.gerrit.testing.InMemoryTestEnvironment;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
+import java.util.Set;
import org.eclipse.jgit.errors.LargeObjectException;
import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
import org.eclipse.jgit.junit.TestRepository;
@@ -54,7 +50,7 @@
import org.eclipse.jgit.lib.ObjectLoader;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.revwalk.RevCommit;
-import org.junit.After;
+import org.eclipse.jgit.transport.ReceiveCommand;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -65,24 +61,26 @@
@RunWith(MockitoJUnitRunner.class)
public class ProjectVersionRefUpdateTest implements RefFixture {
+ private static final int A_TEST_ACCOUNT_ID = 1;
+ private static final GitReferenceUpdated.UpdatedRef A_TEST_UPDATED_REF =
+ new GitReferenceUpdated.UpdatedRef(
+ A_TEST_REF_NAME, ObjectId.zeroId(), ObjectId.zeroId(), ReceiveCommand.Type.UPDATE);
+
@Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
- @Mock RefUpdatedEvent refUpdatedEvent;
+ @Mock GitBatchRefUpdateListener.Event refUpdatedEvent;
@Mock SharedRefDatabaseWrapper sharedRefDb;
@Mock GitReferenceUpdated gitReferenceUpdated;
@Mock ProjectVersionLogger verLogger;
- @Mock ProjectsFilter projectsFilter;
@Inject private ProjectConfig.Factory projectConfigFactory;
@Inject private InMemoryRepositoryManager repoManager;
-
private TestRepository<InMemoryRepository> repo;
private ProjectConfig project;
private RevCommit masterCommit;
@Before
public void setUp() throws Exception {
- when(projectsFilter.matches(any(Event.class))).thenReturn(true);
InMemoryRepository inMemoryRepo = repoManager.createRepository(A_TEST_PROJECT_NAME_KEY);
project = projectConfigFactory.create(A_TEST_PROJECT_NAME_KEY);
project.load(inMemoryRepo);
@@ -90,14 +88,8 @@
masterCommit = repo.branch("master").commit().create();
}
- @After
- public void tearDown() {
- Context.unsetForwardedEvent();
- }
-
@Test
public void producerShouldUpdateProjectVersionUponRefUpdatedEvent() throws IOException {
- Context.setForwardedEvent(false);
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
@@ -105,12 +97,11 @@
.thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
.thenReturn(true);
- when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
- when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+ when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+ when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
- .onEvent(refUpdatedEvent);
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+ .onGitBatchRefUpdate(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -127,18 +118,47 @@
}
@Test
+ public void producerShouldUpdateProjectVersionOnceUponMultipleRefUpdatedEvent() {
+ when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
+ .thenReturn(true);
+ when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+ when(refUpdatedEvent.getRefNames())
+ .thenReturn(Set.of(A_TEST_REF_NAME, A_REF_NAME_OF_A_PATCHSET));
+
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+ .onGitBatchRefUpdate(refUpdatedEvent);
+
+ verify(sharedRefDb, atMost(1))
+ .compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class));
+ }
+
+ @Test
+ public void producerShouldUpdateProjectVersionOnceUponMultipleRefWithOneMetaUpdatedEvent() {
+ when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
+ .thenReturn(true);
+ when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+ when(refUpdatedEvent.getRefNames())
+ .thenReturn(Set.of(A_TEST_REF_NAME, MULTI_SITE_VERSIONING_REF));
+
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+ .onGitBatchRefUpdate(refUpdatedEvent);
+
+ verify(sharedRefDb, atMost(1))
+ .compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class));
+ }
+
+ @Test
public void producerShouldUsePutInsteadOfCompareAndPutWhenExtendedGlobalRefDb()
throws IOException {
when(sharedRefDb.isSetOperationSupported()).thenReturn(true);
- RefUpdatedEvent refUpdatedEvent = new RefUpdatedEvent();
- RefUpdateAttribute refUpdatedAttribute = new RefUpdateAttribute();
- refUpdatedAttribute.project = A_TEST_PROJECT_NAME_KEY.get();
- refUpdatedAttribute.refName = A_TEST_REF_NAME;
- refUpdatedEvent.refUpdate = Suppliers.memoize(() -> refUpdatedAttribute);
+ GitBatchRefUpdateListener.Event refUpdatedEvent =
+ new GitReferenceUpdated.GitBatchRefUpdateEvent(
+ A_TEST_PROJECT_NAME_KEY,
+ Set.of(A_TEST_UPDATED_REF),
+ new AccountInfo(A_TEST_ACCOUNT_ID));
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
- .onEvent(refUpdatedEvent);
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+ .onGitBatchRefUpdate(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -157,8 +177,6 @@
@Test
public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception {
- Context.setForwardedEvent(false);
-
Thread.sleep(1000L);
RevCommit masterPlusOneCommit = repo.branch("master").commit().create();
@@ -171,13 +189,12 @@
.thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
.thenReturn(true);
- when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
- when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+ when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+ when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
ProjectVersionRefUpdateImpl projectVersion =
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter);
- projectVersion.onEvent(refUpdatedEvent);
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger);
+ projectVersion.onGitBatchRefUpdate(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -200,7 +217,6 @@
@Test
public void producerShouldCreateNewProjectVersionWhenMissingUponRefUpdatedEvent()
throws IOException {
- Context.setForwardedEvent(false);
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
@@ -209,12 +225,11 @@
when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
.thenReturn(true);
- when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
- when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+ when(refUpdatedEvent.getProjectName()).thenReturn(A_TEST_PROJECT_NAME);
+ when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
- .onEvent(refUpdatedEvent);
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+ .onGitBatchRefUpdate(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -230,55 +245,24 @@
verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, storedVersion, 0);
}
- @Test
- public void producerShouldNotUpdateProjectVersionUponSequenceRefUpdatedEvent() throws Exception {
- producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(RefNames.REFS_SEQUENCES);
- }
-
- @Test
- public void producerShouldNotUpdateProjectVersionUponStarredChangesRefUpdatedEvent()
- throws Exception {
- producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(RefNames.REFS_STARRED_CHANGES);
- }
-
private long readLongObject(ObjectLoader loader)
throws LargeObjectException, UnsupportedEncodingException {
String boutString = new String(loader.getBytes(), StandardCharsets.UTF_8.name());
return Long.parseLong(boutString);
}
- private void producerShouldNotUpdateProjectVersionUponMagicRefUpdatedEvent(String magicRefPrefix)
- throws Exception {
- String magicRefName = magicRefPrefix + "/foo";
- Context.setForwardedEvent(false);
- when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
- when(refUpdatedEvent.getRefName()).thenReturn(magicRefName);
- repo.branch(magicRefName).commit().create();
-
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
- .onEvent(refUpdatedEvent);
-
- Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
- assertThat(ref).isNull();
-
- verifyZeroInteractions(verLogger);
- }
-
@Test
public void shouldNotUpdateProjectVersionWhenProjectDoesntExist() throws IOException {
- Context.setForwardedEvent(false);
- when(refUpdatedEvent.getProjectNameKey()).thenReturn(Project.nameKey("aNonExistentProject"));
- when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
+ when(refUpdatedEvent.getProjectName()).thenReturn("aNonExistentProject");
+ when(refUpdatedEvent.getRefNames()).thenReturn(Set.of(A_TEST_REF_NAME));
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
- .onEvent(refUpdatedEvent);
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
+ .onGitBatchRefUpdate(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
assertThat(ref).isNull();
- verifyZeroInteractions(verLogger);
+ verifyNoInteractions(verLogger);
}
@Test
@@ -287,8 +271,7 @@
.thenReturn(Optional.of("123"));
Optional<Long> version =
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ new ProjectVersionRefUpdateImpl(repoManager, sharedRefDb, gitReferenceUpdated, verLogger)
.getProjectRemoteVersion(A_TEST_PROJECT_NAME);
assertThat(version.isPresent()).isTrue();