Merge branch 'stable-3.3' into stable-3.4
* stable-3.3:
Specify cache.threads = 0 in the multi-site config documentation
Change-Id: If053c76ec9f77029103d30c91888b309fa3ef221
diff --git a/BUILD b/BUILD
index d7bd3d8..d95dc33 100644
--- a/BUILD
+++ b/BUILD
@@ -18,9 +18,10 @@
],
resources = glob(["src/main/resources/**/*"]),
deps = [
+ ":events-broker-neverlink",
+ ":global-refdb-neverlink",
+ ":pull-replication-neverlink",
":replication-neverlink",
- "@events-broker//jar",
- "@global-refdb//jar",
],
)
@@ -30,6 +31,24 @@
exports = ["//plugins/replication"],
)
+java_library(
+ name = "pull-replication-neverlink",
+ neverlink = 1,
+ exports = ["//plugins/pull-replication"],
+)
+
+java_library(
+ name = "events-broker-neverlink",
+ neverlink = 1,
+ exports = ["//plugins/events-broker"],
+)
+
+java_library(
+ name = "global-refdb-neverlink",
+ neverlink = 1,
+ exports = ["//plugins/global-refdb"],
+)
+
junit_tests(
name = "multi_site_tests",
srcs = glob(["src/test/java/**/*.java"]),
@@ -49,8 +68,9 @@
visibility = ["//visibility:public"],
exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
":multi-site__plugin",
- "@global-refdb//jar",
- "@events-broker//jar",
+ "//plugins/events-broker",
+ "//plugins/global-refdb",
+ "//plugins/pull-replication",
"//plugins/replication",
],
)
@@ -74,12 +94,6 @@
srcs = [
"e2e-tests/test.sh",
],
- data = [
- "//plugins/multi-site",
- "//plugins/multi-site:e2e_multi_site_test_dir",
- "//plugins/multi-site:e2e_multi_site_setup_local_env_dir",
- "external_plugin_deps.bzl",
- ] + glob(["setup_local_env/**/*"]) + glob(["e2e-tests/**/*"]),
args = [
"--multisite-lib-file $(location //plugins/multi-site)",
"--healthcheck-interval 5s",
@@ -88,6 +102,11 @@
"--location '$(location //plugins/multi-site:e2e_multi_site_test_dir)'",
"--local-env '$(location //plugins/multi-site:e2e_multi_site_setup_local_env_dir)'",
],
+ data = [
+ "//plugins/multi-site",
+ "//plugins/multi-site:e2e_multi_site_test_dir",
+ "//plugins/multi-site:e2e_multi_site_setup_local_env_dir",
+ ] + glob(["setup_local_env/**/*"]) + glob(["e2e-tests/**/*"]),
tags = [
"e2e-multi-site",
],
diff --git a/Jenkinsfile b/Jenkinsfile
index f008749..6ef43af 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,4 @@
pluginPipeline(formatCheckId: 'gerritforge:multi-site-format-47168e90078b0b3f11401610930e82830e76bff7',
- buildCheckId: 'gerritforge:multi-site-47168e90078b0b3f11401610930e82830e76bff7')
+ buildCheckId: 'gerritforge:multi-site-47168e90078b0b3f11401610930e82830e76bff7',
+ extraPlugins: [ 'pull-replication' ],
+ extraModules: [ 'events-broker', 'global-refdb' ])
diff --git a/README.md b/README.md
index 79094d3..f683f4e 100644
--- a/README.md
+++ b/README.md
@@ -40,8 +40,6 @@
cd gerrit/plugins
ln -s ../../multi-site .
-rm external_plugin_deps.bzl
-ln -s multi-site/external_plugin_deps.bzl .
```
Example of building the multi-site plugin:
@@ -65,6 +63,21 @@
daemon running (/var/run/docker.sock accessible) or a DOCKER_HOST pointing to
a Docker server.
+## Pre-requisites
+
+Each Gerrit server of the cluster must be identified with a globally unique
+[instance-id](https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.5/config-gerrit.html#gerrit.instanceId)
+defined in `$GERRIT_SITE/etc/gerrit.config`.
+When migrating from a multi-site configuration with Gerrit v3.3 or earlier,
+you must reuse the instance-id value stored under `$GERRIT_SITE/data/multi-site`.
+
+Example:
+
+```
+[gerrit]
+ instanceId = 758fe5b7-1869-46e6-942a-3ae0ae7e3bd2
+```
+
## How to configure
Install the multi-site plugin into the `$GERRIT_SITE/lib` directory of all
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index 19c8b04..3ecb9fd 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,11 +16,11 @@
LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
-DEF_GERRIT_IMAGE=3.3.6-centos8
+DEF_GERRIT_IMAGE=3.4.0-centos8
DEF_GERRIT_HEALTHCHECK_START_PERIOD=60s
DEF_GERRIT_HEALTHCHECK_INTERVAL=5s
DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
@@ -64,7 +64,7 @@
export BROKER_HOST=$2
export BROKER_PORT=$3
- export GROUP_ID=$4
+ export INSTANCE_ID=$4
export SSH_ADVERTISED_PORT=$5
export LOCATION_TEST_SITE=/var/gerrit
export REMOTE_DEBUG_PORT=5005
@@ -196,8 +196,6 @@
done
# Defaults
-EVENTS_BROKER_VER=`grep 'com.gerritforge:events-broker' ${LOCATION}/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
-GLOBAL_REFDB_VER=`grep 'com.gerritforge:global-refdb' ${LOCATION}/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
DEPLOYMENT_LOCATION=$(mktemp -d || $(echo >&2 "Could not create temp dir" && exit 1))
MULTISITE_LIB_LOCATION=${MULTISITE_LIB_LOCATION:-${DEF_MULTISITE_LOCATION}}
BROKER_TYPE=${BROKER_TYPE:-"kafka"}
@@ -261,13 +259,11 @@
docker cp ${CONTAINER_NAME}:/var/gerrit/plugins/replication.jar $COMMON_LIBS/
docker rm -fv ${CONTAINER_NAME}
-echo "Downloading global-refdb library $GERRIT_BRANCH"
-wget https://repo1.maven.org/maven2/com/gerritforge/global-refdb/$GLOBAL_REFDB_VER/global-refdb-$GLOBAL_REFDB_VER.jar \
- -O $COMMON_LIBS/global-refdb.jar || { echo >&2 "Cannot download global-refdb library: Check internet connection. Aborting"; exit 1; }
+echo "Copying global-refdb library $GERRIT_BRANCH"
+cp bazel-bin/plugins/global-refdb/global-refdb.jar $COMMON_LIBS/global-refdb.jar
echo "Downloading events-broker library $GERRIT_BRANCH"
-wget https://repo1.maven.org/maven2/com/gerritforge/events-broker/$EVENTS_BROKER_VER/events-broker-$EVENTS_BROKER_VER.jar \
- -O $COMMON_LIBS/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Aborting"; exit 1; }
+cp bazel-bin/plugins/events-broker/events-broker.jar $COMMON_LIBS/events-broker.jar
echo "Setting up directories"
mkdir -p ${GERRIT_1_ETC} ${GERRIT_1_PLUGINS} ${GERRIT_1_LIBS} ${GERRIT_2_ETC} ${GERRIT_2_PLUGINS} ${GERRIT_2_LIBS}
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
deleted file mode 100644
index 223778e..0000000
--- a/external_plugin_deps.bzl
+++ /dev/null
@@ -1,14 +0,0 @@
-load("//tools/bzl:maven_jar.bzl", "maven_jar")
-
-def external_plugin_deps():
- maven_jar(
- name = "global-refdb",
- artifact = "com.gerritforge:global-refdb:3.3.1",
- sha1 = "5df9dddad2fc67c922406f41549186b210cd957e",
- )
-
- maven_jar(
- name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.3.2",
- sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
- )
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index 88b40d5..e6ec9e7 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -74,8 +74,6 @@
[--gerrit2-httpd-port] Gerrit Instance 2 http port; default 18081
[--gerrit2-sshd-port] Gerrit Instance 2 sshd port; default 49418
-[--replication-type] Options [file,ssh]; default file
-[--replication-ssh-user] SSH user for the replication plugin; default $(whoami)
[--replication-delay] Replication delay across the two instances in seconds
[--just-cleanup-env] Cleans up previous deployment; default false
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 2cd8c9d..8a176f2 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -5,6 +5,7 @@
installModule = com.gerritforge.gerrit.eventbroker.BrokerApiModule # events-broker module to setup BrokerApi dynamic item
installModule = com.googlesource.gerrit.plugins.multisite.Module # multi-site needs to be a gerrit lib
installDbModule = com.googlesource.gerrit.plugins.multisite.GitModule
+ instanceId = $INSTANCE_ID
[database]
type = h2
database = $LOCATION_TEST_SITE/db/ReviewDB
@@ -43,7 +44,7 @@
[plugin "events-kafka"]
sendAsync = true
bootstrapServers = $BROKER_HOST:$BROKER_PORT
- groupId = $GROUP_ID
+ groupId = $INSTANCE_ID
numberOfSubscribers = 6
securityProtocol = PLAINTEXT
pollingIntervalMs = 1000
@@ -55,12 +56,12 @@
pollingIntervalMs = 1000
region = us-east-1
endpoint = http://localhost:$BROKER_PORT
- applicationName = $GROUP_ID
+ applicationName = $INSTANCE_ID
initialPosition = trim_horizon
[plugin "events-gcloud-pubsub"]
numberOfSubscribers = 6
gcloudProject="test-project"
- subscriptionId=$GROUP_ID
+ subscriptionId=$INSTANCE_ID
privateKeyLocation="not used in local mode"
[plugin "metrics-reporter-prometheus"]
prometheusBearerToken = token
diff --git a/setup_local_env/configs/replication.config b/setup_local_env/configs/replication.config
index ece1b3e..1c2aac4 100644
--- a/setup_local_env/configs/replication.config
+++ b/setup_local_env/configs/replication.config
@@ -1,6 +1,7 @@
-[remote "Replication"]
- $REPLICATION_URL
- push = +refs/*:refs/*
+[remote "$REPLICA_INSTANCE_ID"]
+ $PULL_REPLICATION_URL
+ $PULL_REPLICATION_API_URL
+ fetch = +refs/*:refs/*
mirror = true
timeout = 600
rescheduleDelay = 15
@@ -13,4 +14,8 @@
replicateOnStartup = false
[replication]
lockErrorMaxRetries = 5
- maxRetries = 5
\ No newline at end of file
+ maxRetries = 5
+ useCGitClient = false
+ consumeStreamEvents = false
+ syncRefs="ALL REFS ASYNC"
+ maxApiPayloadSize=40000
diff --git a/setup_local_env/configs/secure.config b/setup_local_env/configs/secure.config
new file mode 100644
index 0000000..faeb659
--- /dev/null
+++ b/setup_local_env/configs/secure.config
@@ -0,0 +1,3 @@
+[auth]
+ registerEmailPrivateKey = A3D00Ny0bLW2z7xdInvIf35Q5mDCi5u9o/E=
+ bearertoken = some-bearer-token
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 11ae04f..247a721 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,17 +16,15 @@
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
-EVENTS_BROKER_VER=`grep 'com.gerritforge:events-broker' $(dirname $0)/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
-GLOBAL_REFDB_VER=`grep 'com.gerritforge:global-refdb' $(dirname $0)/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
function check_application_requirements {
type haproxy >/dev/null 2>&1 || { echo >&2 "Require haproxy but it's not installed. Aborting."; exit 1; }
type java >/dev/null 2>&1 || { echo >&2 "Require java but it's not installed. Aborting."; exit 1; }
type docker >/dev/null 2>&1 || { echo >&2 "Require docker but it's not installed. Aborting."; exit 1; }
- type docker-compose >/dev/null 2>&1 || { echo >&2 "Require docker-compose but it's not installed. Aborting."; exit 1; }
+ [ $($SUDO docker --version | awk '{print $3}' | cut -d '.' -f 1) -ge 20 ] || { echo >&2 "Require docker v20 or later. Aborting."; exit 1; }
type wget >/dev/null 2>&1 || { echo >&2 "Require wget but it's not installed. Aborting."; exit 1; }
type envsubst >/dev/null 2>&1 || { echo >&2 "Require envsubst but it's not installed. Aborting."; exit 1; }
type openssl >/dev/null 2>&1 || { echo >&2 "Require openssl but it's not installed. Aborting."; exit 1; }
@@ -35,16 +33,16 @@
fi
}
-function get_replication_url {
- REPLICATION_LOCATION_TEST_SITE=$1
- REPLICATION_HOSTNAME=$2
- USER=$REPLICATION_SSH_USER
+function get_pull_replication_api_url {
+ REPLICATION_HOSTNAME=$1
- if [ "$REPLICATION_TYPE" = "file" ];then
- echo "url = file://$REPLICATION_LOCATION_TEST_SITE/git/#{name}#.git"
- elif [ "$REPLICATION_TYPE" = "ssh" ];then
- echo "url = ssh://$USER@$REPLICATION_HOSTNAME:$REPLICATION_LOCATION_TEST_SITE/git/#{name}#.git"
- fi
+ echo "apiUrl = http://$REPLICATION_HOSTNAME:$REPLICATION_HTTPD_PORT"
+}
+
+function get_pull_replication_url {
+ REPLICATION_HOSTNAME=$1
+
+ echo "url = http://$REPLICATION_HOSTNAME:$REPLICATION_HTTPD_PORT/#{name}#.git"
}
function deploy_tls_certificates {
@@ -66,12 +64,13 @@
export LOCATION_TEST_SITE=$3
export GERRIT_SSHD_PORT=$4
export REPLICATION_HTTPD_PORT=$5
- export REPLICATION_LOCATION_TEST_SITE=$6
- export GERRIT_HOSTNAME=$7
- export REPLICATION_HOSTNAME=$8
- export REMOTE_DEBUG_PORT=$9
- export GROUP_ID=${10}
- export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
+ export GERRIT_HOSTNAME=$6
+ export REPLICATION_HOSTNAME=$7
+ export REMOTE_DEBUG_PORT=$8
+ export INSTANCE_ID=${9}
+ export REPLICA_INSTANCE_ID=${10}
+ export PULL_REPLICATION_URL=$(get_pull_replication_url $REPLICATION_HOSTNAME)
+ export PULL_REPLICATION_API_URL=$(get_pull_replication_api_url $REPLICATION_HOSTNAME)
echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -126,25 +125,25 @@
GERRIT_SITE1_SSHD_PORT=$3
CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
- GERRIT_SITE1_GROUP_ID="instance-1"
+ GERRIT_SITE1_INSTANCE_ID="instance-1"
# SITE 2
GERRIT_SITE2_HOSTNAME=$4
GERRIT_SITE2_HTTPD_PORT=$5
GERRIT_SITE2_SSHD_PORT=$6
CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
- GERRIT_SITE2_GROUP_ID="instance-2"
+ GERRIT_SITE2_INSTANCE_ID="instance-2"
# Set config SITE1
- copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
+ copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_INSTANCE_ID $GERRIT_SITE2_INSTANCE_ID
# Set config SITE2
- copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
+ copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_INSTANCE_ID $GERRIT_SITE1_INSTANCE_ID
}
function is_docker_desktop {
- echo $(docker info | grep "Operating System: Docker Desktop" | wc -l)
+ echo $($SUDO docker info | grep "Operating System: Docker Desktop" | wc -l)
}
function docker_host_env {
@@ -183,9 +182,10 @@
kill $(ps -ax | grep haproxy | grep "gerrit_setup/ha-proxy-config" | awk '{print $1}') 2> /dev/null
echo "Stopping $BROKER_TYPE docker container"
- docker-compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" down 2> /dev/null
+ printenv > "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.env"
+ $SUDO docker compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" --env-file "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.env" down 2> /dev/null
echo "Stopping core docker containers"
- docker-compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" down 2> /dev/null
+ $SUDO docker compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" --env-file "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.env" down 2> /dev/null
echo "Stopping GERRIT instances"
$1/bin/gerrit.sh stop 2> /dev/null
@@ -197,7 +197,7 @@
function check_if_container_is_running {
local container=$1;
- echo $(docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
+ echo $($SUDO docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
}
function ensure_docker_compose_is_up_and_running {
@@ -207,8 +207,9 @@
local is_container_running=$(check_if_container_is_running "$container_name")
if [ "$is_container_running" -lt 1 ];then
+ printenv > "${SCRIPT_DIR}/${docker_compose_file}.env"
echo "[$log_label] Starting docker containers"
- docker-compose -f "${SCRIPT_DIR}/${docker_compose_file}" up -d
+ $SUDO docker compose -f "${SCRIPT_DIR}/${docker_compose_file}" --env-file "${SCRIPT_DIR}/${docker_compose_file}.env" up -d
echo "[$log_label] Waiting for docker containers to start..."
while [[ $(check_if_container_is_running "$container_name") -lt 1 ]];do sleep 10s; done
@@ -231,6 +232,8 @@
echo
echo "[--release-war-file] Location to release.war file"
echo "[--multisite-lib-file] Location to lib multi-site.jar file"
+ echo "[--eventsbroker-lib-file] Location to lib events-broker.jar file"
+ echo "[--globalrefdb-lib-file] Location to lib global-refdb.jar file"
echo
echo "[--new-deployment] Cleans up previous gerrit deployment and re-installs it. default true"
echo "[--get-websession-plugin] Download websession-broker plugin from CI lastSuccessfulBuild; default true"
@@ -247,8 +250,6 @@
echo "[--gerrit2-httpd-port] Gerrit Instance 2 http port; default 18081"
echo "[--gerrit2-sshd-port] Gerrit Instance 2 sshd port; default 49418"
echo
- echo "[--replication-type] Options [file,ssh]; default ssh"
- echo "[--replication-ssh-user] SSH user for the replication plugin; default $(whoami)"
echo "[--replication-delay] Replication delay across the two instances in seconds"
echo
echo "[--just-cleanup-env] Cleans up previous deployment; default false"
@@ -257,6 +258,8 @@
echo
echo "[--broker-type] events broker type; 'kafka', 'kinesis' or 'gcloud-pubsub'. Default 'kafka'"
echo
+ echo "[--sudo] run docker commands with sudo"
+ echo
exit 0
;;
"--new-deployment")
@@ -284,6 +287,16 @@
shift
shift
;;
+ "--eventsbroker-lib-file" )
+ EVENTS_BROKER_LIB_LOCATION=$2
+ shift
+ shift
+ ;;
+ "--globalrefdb-lib-file" )
+ GLOBAL_REFDB_LIB_LOCATION=$2
+ shift
+ shift
+ ;;
"--gerrit-canonical-host" )
export GERRIT_CANONICAL_HOSTNAME=$2
shift
@@ -319,16 +332,6 @@
shift
shift
;;
- "--replication-ssh-user" )
- export REPLICATION_SSH_USER=$2
- shift
- shift
- ;;
- "--replication-type")
- export REPLICATION_TYPE=$2
- shift
- shift
- ;;
"--replication-delay")
export REPLICATION_DELAY_SEC=$2
shift
@@ -353,6 +356,11 @@
exit 1
fi
;;
+ "--sudo" )
+ SUDO=sudo
+ shift
+ shift
+ ;;
* )
echo "Unknown option argument: $1"
shift
@@ -376,8 +384,6 @@
GERRIT_2_HTTPD_PORT=${GERRIT_2_HTTPD_PORT:-"18081"}
GERRIT_1_SSHD_PORT=${GERRIT_1_SSHD_PORT:-"39418"}
GERRIT_2_SSHD_PORT=${GERRIT_2_SSHD_PORT:-"49418"}
-REPLICATION_TYPE=${REPLICATION_TYPE:-"file"}
-REPLICATION_SSH_USER=${REPLICATION_SSH_USER:-$(whoami)}
export REPLICATION_DELAY_SEC=${REPLICATION_DELAY_SEC:-"5"}
export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
@@ -392,7 +398,8 @@
RELEASE_WAR_FILE_LOCATION=${RELEASE_WAR_FILE_LOCATION:-bazel-bin/release.war}
MULTISITE_LIB_LOCATION=${MULTISITE_LIB_LOCATION:-bazel-bin/plugins/multi-site/multi-site.jar}
-
+EVENTS_BROKER_LIB_LOCATION=${EVENTS_BROKER_LIB_LOCATION:-bazel-bin/plugins/events-broker/events-broker.jar}
+GLOBAL_REFDB_LIB_LOCATION=${GLOBAL_REFDB_LIB_LOCATION:-bazel-bin/plugins/global-refdb/global-refdb.jar}
export FAKE_NFS=$COMMON_LOCATION/fake_nfs
@@ -413,10 +420,20 @@
else
cp -f $MULTISITE_LIB_LOCATION $DEPLOYMENT_LOCATION/multi-site.jar >/dev/null 2>&1 || { echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
fi
+
+echo "Copying events-broker library"
+ cp -f $EVENTS_BROKER_LIB_LOCATION $DEPLOYMENT_LOCATION/events-broker.jar >/dev/null 2>&1 || { echo >&2 "$EVENTS_BROKER_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
+
+echo "Copying global-refdb library"
+ cp -f $GLOBAL_REFDB_LIB_LOCATION $DEPLOYMENT_LOCATION/global-refdb.jar >/dev/null 2>&1 || { echo >&2 "$GLOBAL_REFDB_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
+
if [ $DOWNLOAD_WEBSESSION_PLUGIN = "true" ];then
echo "Downloading websession-broker plugin $GERRIT_BRANCH"
wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
- -O $DEPLOYMENT_LOCATION/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
+ -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
+ wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
+ -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
+ { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
ing"; exit 1; }
wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
-O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
@@ -427,30 +444,29 @@
echo "Downloading zookeeper plugin $GERRIT_BRANCH"
wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
- -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
-ing"; exit 1; }
-
-echo "Downloading global-refdb library $GERRIT_BRANCH"
- wget https://repo1.maven.org/maven2/com/gerritforge/global-refdb/$GLOBAL_REFDB_VER/global-refdb-$GLOBAL_REFDB_VER.jar \
- -O $DEPLOYMENT_LOCATION/global-refdb.jar || { echo >&2 "Cannot download global-refdb library: Check internet connection. Abort\
-ing"; exit 1; }
-
-echo "Downloading events-broker library $GERRIT_BRANCH"
- wget https://repo1.maven.org/maven2/com/gerritforge/events-broker/$EVENTS_BROKER_VER/events-broker-$EVENTS_BROKER_VER.jar \
- -O $DEPLOYMENT_LOCATION/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Abort\
+ -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
+ wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
+ -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
+ { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
ing"; exit 1; }
if [ "$BROKER_TYPE" = "kafka" ]; then
echo "Downloading events-kafka plugin $GERRIT_BRANCH"
wget $GERRIT_CI/plugin-events-kafka-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
- -O $DEPLOYMENT_LOCATION/events-kafka.jar || { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Abort\
+ -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
+ wget $GERRIT_CI/plugin-events-kafka-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
+ -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
+ { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Abort\
ing"; exit 1; }
fi
if [ "$BROKER_TYPE" = "kinesis" ]; then
echo "Downloading events-aws-kinesis plugin $GERRIT_BRANCH"
wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
- -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
+ -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
+ wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
+ -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
+ { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
ing"; exit 1; }
fi
@@ -458,19 +474,25 @@
if [ "$BROKER_TYPE" = "gcloud-pubsub" ]; then
echo "Downloading events-gcloud-pubsub plugin $GERRIT_BRANCH"
wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
- -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
+ -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
+ wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
+ -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
+ { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
ing"; exit 1; }
fi
echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
+ wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
+ -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
- -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || { echo >&2 "Cannot download metrics-reporter-prometheus plugin: Check internet connection. Abort\
+ -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
+ { echo >&2 "Cannot download metrics-reporter-prometheus plugin: Check internet connection. Abort\
ing"; exit 1; }
-if [ "$REPLICATION_TYPE" = "ssh" ];then
- echo "Using 'SSH' replication type"
- echo "Make sure ~/.ssh/authorized_keys and ~/.ssh/known_hosts are configured correctly"
-fi
+echo "Downloading pull-replication plugin $GERRIT_BRANCH"
+ wget $GERRIT_CI/plugin-pull-replication-bazel-$GERRIT_BRANCH/$LAST_BUILD/pull-replication/pull-replication.jar \
+ -O $DEPLOYMENT_LOCATION/pull-replication.jar || { echo >&2 "Cannot download pull-replication plugin: Check internet connection. Abort\
+ing"; exit 1; }
if [ "$HTTPS_ENABLED" = "true" ];then
export HTTP_PROTOCOL="https"
@@ -530,13 +552,18 @@
echo "Replicating environment"
cp -fR $LOCATION_TEST_SITE_1/* $LOCATION_TEST_SITE_2
- echo "Link replication plugin"
- ln -s $LOCATION_TEST_SITE_1/plugins/replication.jar $LOCATION_TEST_SITE_1/lib/replication.jar
- ln -s $LOCATION_TEST_SITE_2/plugins/replication.jar $LOCATION_TEST_SITE_2/lib/replication.jar
+ echo "Link pullreplication plugin"
+ ln -s $LOCATION_TEST_SITE_1/plugins/pull-replication.jar $LOCATION_TEST_SITE_1/lib/pull-replication.jar
+ ln -s $LOCATION_TEST_SITE_2/plugins/pull-replication.jar $LOCATION_TEST_SITE_2/lib/pull-replication.jar
echo "Link multi-site library to plugin directory"
ln -s $LOCATION_TEST_SITE_1/lib/multi-site.jar $LOCATION_TEST_SITE_1/plugins/multi-site.jar
ln -s $LOCATION_TEST_SITE_2/lib/multi-site.jar $LOCATION_TEST_SITE_2/plugins/multi-site.jar
+
+ echo "Copy pull-replication plugin"
+ cp -f $DEPLOYMENT_LOCATION/pull-replication.jar $LOCATION_TEST_SITE_1/plugins/pull-replication.jar
+ cp -f $DEPLOYMENT_LOCATION/pull-replication.jar $LOCATION_TEST_SITE_2/plugins/pull-replication.jar
+
fi
DOCKER_HOST_ENV=$(docker_host_env)
@@ -558,8 +585,12 @@
echo "Re-deploying configuration files"
deploy_config_files $GERRIT_1_HOSTNAME $GERRIT_1_HTTPD_PORT $GERRIT_1_SSHD_PORT $GERRIT_2_HOSTNAME $GERRIT_2_HTTPD_PORT $GERRIT_2_SSHD_PORT
+echo "Remove replication plugin from gerrit site 1"
+rm $LOCATION_TEST_SITE_1/plugins/replication.jar
echo "Starting gerrit site 1"
$LOCATION_TEST_SITE_1/bin/gerrit.sh restart
+echo "Remove replication plugin from gerrit site 2"
+rm $LOCATION_TEST_SITE_2/plugins/replication.jar
echo "Starting gerrit site 2"
$LOCATION_TEST_SITE_2/bin/gerrit.sh restart
@@ -574,8 +605,6 @@
echo "==============================="
echo "The admin password is 'secret'"
echo "deployment-location=$DEPLOYMENT_LOCATION"
-echo "replication-type=$REPLICATION_TYPE"
-echo "replication-ssh-user=$REPLICATION_SSH_USER"
echo "replication-delay=$REPLICATION_DELAY_SEC"
echo "enable-https=$HTTPS_ENABLED"
echo
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 6b77713..2d67eab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -23,15 +23,19 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -50,6 +54,10 @@
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
static final int DEFAULT_THREAD_POOL_SIZE = 4;
+ private static final String REF_DATABASE = "ref-database";
+ private static final String REPLICATION_LAG_REFRESH_INTERVAL = "replicationLagRefreshInterval";
+ private static final Duration REPLICATION_LAG_REFRESH_INTERVAL_DEFAULT = Duration.ofSeconds(60);
+
private static final String REPLICATION_CONFIG = "replication.config";
// common parameters to cache and index sections
private static final int DEFAULT_INDEX_MAX_TRIES = 2;
@@ -64,7 +72,9 @@
private final Supplier<SharedRefDbConfiguration> sharedRefDb;
private final Supplier<Collection<Message>> replicationConfigValidation;
private final Supplier<Broker> broker;
+ private final Supplier<ReplicationFilter> replicationFilter;
private final Config multiSiteConfig;
+ private final Supplier<Duration> replicationLagRefreshInterval;
@Inject
Configuration(SitePaths sitePaths) {
@@ -86,6 +96,18 @@
new SharedRefDbConfiguration(
enableSharedRefDbByDefault(lazyMultiSiteCfg.get()), PLUGIN_NAME));
broker = memoize(() -> new Broker(lazyMultiSiteCfg));
+ replicationFilter = memoize(() -> new ReplicationFilter(lazyMultiSiteCfg));
+ replicationLagRefreshInterval =
+ memoize(
+ () ->
+ Duration.ofMillis(
+ ConfigUtil.getTimeUnit(
+ lazyMultiSiteCfg.get(),
+ REF_DATABASE,
+ null,
+ REPLICATION_LAG_REFRESH_INTERVAL,
+ REPLICATION_LAG_REFRESH_INTERVAL_DEFAULT.toMillis(),
+ TimeUnit.MILLISECONDS)));
}
public Config getMultiSiteConfig() {
@@ -116,6 +138,14 @@
return projects.get();
}
+ public ReplicationFilter replicationFilter() {
+ return replicationFilter.get();
+ }
+
+ public Duration replicationLagRefreshInterval() {
+ return replicationLagRefreshInterval.get();
+ }
+
public Collection<Message> validate() {
return replicationConfigValidation.get();
}
@@ -311,6 +341,81 @@
}
}
+ public static class ReplicationFilter {
+ static final String REPLICATION_FILTER_SECTION = "replication";
+ static final String REPLICATION_PUSH_FILTER_SUBSECTION = "push-filter";
+ static final String REPLICATION_FETCH_FILTER_SUBSECTION = "fetch-filter";
+
+ private static final String MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS =
+ "minWaitBeforeReloadLocalVersionMs";
+ private static final String RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS =
+ "maxRandomWaitBeforeReloadLocalVersionMs";
+
+ private static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT = 1000;
+ private static final int RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT = 1000;
+ private final Supplier<Integer> fetchMinWaitBeforeReloadLocalVersionMs;
+ private final Supplier<Integer> fetchWaitBeforeReloadLocalVersionMs;
+ private final Supplier<Integer> pushMinWaitBeforeReloadLocalVersionMs;
+ private final Supplier<Integer> pushWaitBeforeReloadLocalVersionMs;
+
+ public ReplicationFilter(Supplier<Config> cfg) {
+ fetchMinWaitBeforeReloadLocalVersionMs =
+ memoize(
+ () ->
+ cfg.get()
+ .getInt(
+ REPLICATION_FILTER_SECTION,
+ REPLICATION_FETCH_FILTER_SUBSECTION,
+ MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS,
+ MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+ fetchWaitBeforeReloadLocalVersionMs =
+ memoize(
+ () ->
+ cfg.get()
+ .getInt(
+ REPLICATION_FILTER_SECTION,
+ REPLICATION_FETCH_FILTER_SUBSECTION,
+ RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS,
+ RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+ pushMinWaitBeforeReloadLocalVersionMs =
+ memoize(
+ () ->
+ cfg.get()
+ .getInt(
+ REPLICATION_FILTER_SECTION,
+ REPLICATION_PUSH_FILTER_SUBSECTION,
+ MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS,
+ MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+ pushWaitBeforeReloadLocalVersionMs =
+ memoize(
+ () ->
+ cfg.get()
+ .getInt(
+ REPLICATION_FILTER_SECTION,
+ REPLICATION_PUSH_FILTER_SUBSECTION,
+ RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS,
+ RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+ }
+
+ public boolean isFetchFilterRandomSleepEnabled() {
+ return fetchWaitBeforeReloadLocalVersionMs.get() != 0;
+ }
+
+ public Integer fetchFilterRandomSleepTimeMs() {
+ return fetchMinWaitBeforeReloadLocalVersionMs.get()
+ + new Random().nextInt(fetchWaitBeforeReloadLocalVersionMs.get());
+ }
+
+ public boolean isPushFilterRandomSleepEnabled() {
+ return pushWaitBeforeReloadLocalVersionMs.get() != 0;
+ }
+
+ public Integer pushFilterRandomSleepTimeMs() {
+ return pushMinWaitBeforeReloadLocalVersionMs.get()
+ + new Random().nextInt(pushWaitBeforeReloadLocalVersionMs.get());
+ }
+ }
+
static boolean getBoolean(
Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
try {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index 7c88655..95c417f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,9 +14,9 @@
package com.googlesource.gerrit.plugins.multisite;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.util.PluginLogFile;
import com.google.gerrit.server.util.SystemLog;
import com.google.gson.Gson;
@@ -41,7 +41,7 @@
}
@Override
- public void log(Direction direction, String topic, EventMessage event) {
+ public void log(Direction direction, String topic, Event event) {
msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
index c2c4b46..23c720a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
@@ -45,4 +45,9 @@
verLog.info("{ \"project\":\"{}\", \"version\":{} }", projectName, currentVersion);
}
}
+
+ @Override
+ public void logDeleted(Project.NameKey projectName) {
+ verLog.info("{ \"project\":\"{}\", \"status\":\"DELETED\" }", projectName);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index b1f3e79..cc64b02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
public interface MessageLogger {
@@ -23,5 +23,5 @@
CONSUME;
}
- public void log(Direction direction, String topic, EventMessage event);
+ public void log(Direction direction, String topic, Event event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 91a927b..9392077 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -16,30 +16,17 @@
import com.gerritforge.gerrit.globalrefdb.validation.LibModule;
import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.SitePaths;
import com.google.inject.CreationException;
import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
-import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Collection;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class Module extends LifecycleModule {
- private static final Logger log = LoggerFactory.getLogger(Module.class);
private Configuration config;
@Inject
@@ -49,6 +36,7 @@
@Override
protected void configure() {
+ boolean brokerRouterNeeded = false;
Collection<Message> validationErrors = config.validate();
if (!validationErrors.isEmpty()) {
@@ -64,65 +52,19 @@
if (config.cache().synchronize()) {
install(new CacheModule());
+ brokerRouterNeeded = true;
}
if (config.event().synchronize()) {
- install(new EventModule(config));
+ brokerRouterNeeded = true;
}
if (config.index().synchronize()) {
install(new IndexModule());
+ brokerRouterNeeded = true;
}
- install(new RouterModule());
- }
-
- @Provides
- @Singleton
- @InstanceId
- UUID getInstanceId(SitePaths sitePaths) throws IOException {
- UUID instanceId = null;
- Path dataDir = sitePaths.data_dir.resolve(Configuration.PLUGIN_NAME);
- if (!dataDir.toFile().exists()) {
- dataDir.toFile().mkdirs();
+ if (brokerRouterNeeded) {
+ install(new BrokerModule());
+ install(new RouterModule());
}
- String serverIdFile =
- dataDir.toAbsolutePath().toString() + "/" + Configuration.INSTANCE_ID_FILE;
-
- instanceId = tryToLoadSavedInstanceId(serverIdFile);
-
- if (instanceId == null) {
- instanceId = UUID.randomUUID();
- Files.createFile(Paths.get(serverIdFile));
- try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(serverIdFile))) {
- writer.write(instanceId.toString());
- } catch (IOException e) {
- log.warn(
- String.format(
- "Cannot write instance ID, a new one will be generated at instance restart. (%s)",
- e.getMessage()));
- }
- }
- return instanceId;
- }
-
- private UUID tryToLoadSavedInstanceId(String serverIdFile) {
- if (Files.exists(Paths.get(serverIdFile))) {
- try (BufferedReader br = Files.newBufferedReader(Paths.get(serverIdFile))) {
- return UUID.fromString(br.readLine());
- } catch (IOException e) {
- log.warn(
- String.format(
- "Cannot read instance ID from path '%s', deleting the old file and generating a new ID: (%s)",
- serverIdFile, e.getMessage()));
- try {
- Files.delete(Paths.get(serverIdFile));
- } catch (IOException e1) {
- log.warn(
- String.format(
- "Cannot delete old instance ID file at path '%s' with instance ID while generating a new one: (%s)",
- serverIdFile, e1.getMessage()));
- }
- }
- }
- return null;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 20aaef6..db06c9f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,38 +15,99 @@
package com.googlesource.gerrit.plugins.multisite;
import com.gerritforge.gerrit.globalrefdb.validation.ProjectDeletedSharedDbCleanup;
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatusModule;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
+import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
public class PluginModule extends LifecycleModule {
- private Configuration config;
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+ private static final String[] FILTER_MODULES_CLASS_NAMES =
+ new String[] {
+ /* Class names are defined as String for avoiding this class failing to load
+ * if either replication or pull-replication plugins are missing.
+ */
+ "com.googlesource.gerrit.plugins.multisite.validation.PullReplicationFilterModule",
+ "com.googlesource.gerrit.plugins.multisite.validation.PushReplicationFilterModule"
+ };
+
+ private final Configuration config;
+ private final WorkQueue workQueue;
+ private final Injector parentInjector;
@Inject
- public PluginModule(Configuration config) {
+ public PluginModule(Configuration config, WorkQueue workQueue, Injector parentInjector) {
this.config = config;
+ this.workQueue = workQueue;
+ this.parentInjector = parentInjector;
}
@Override
protected void configure() {
- bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
- install(new SubscriberModule());
+ if (config.index().synchronize()
+ || config.cache().synchronize()
+ || config.event().synchronize()) {
+ install(new EventModule(config));
+ bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+ install(new SubscriberModule());
- install(new BrokerForwarderModule());
- listener().to(MultiSiteConsumerRunner.class);
+ install(new BrokerForwarderModule());
+ listener().to(MultiSiteConsumerRunner.class);
- install(new ReplicationStatusModule());
+ install(new ReplicationStatusModule(workQueue));
+ }
+
if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
listener().to(PluginStartup.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class)
.to(ProjectDeletedSharedDbCleanup.class);
}
+
+ detectFilterModules()
+ .forEach(
+ mod -> {
+ install(mod);
+ log.atInfo().log(
+ "Replication filter module %s installed successfully",
+ mod.getClass().getSimpleName());
+ });
+ }
+
+ private Iterable<AbstractModule> detectFilterModules() {
+ ImmutableList.Builder<AbstractModule> filterModulesBuilder = ImmutableList.builder();
+
+ for (String filterClassName : FILTER_MODULES_CLASS_NAMES) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<AbstractModule> filterClass = (Class<AbstractModule>) Class.forName(filterClassName);
+
+ AbstractModule filterModule = parentInjector.getInstance(filterClass);
+ // Check if the filterModule would be valid for creating a child Guice Injector
+ parentInjector.createChildInjector(filterModule);
+
+ filterModulesBuilder.add(filterModule);
+ } catch (NoClassDefFoundError | ClassNotFoundException e) {
+ log.atFine().withCause(e).log(
+ "Not loading %s because of missing the associated replication plugin", filterClassName);
+ } catch (Exception e) {
+ throw new ProvisionException(
+ "Unable to instantiate replication filter " + filterClassName, e);
+ }
+ }
+
+ return filterModulesBuilder.build();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
index 6ababb6..2ee2c13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
@@ -19,4 +19,6 @@
public interface ProjectVersionLogger {
public void log(Project.NameKey projectName, long currentVersion, long replicationLag);
+
+ public void logDeleted(Project.NameKey projectName);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 71be5e6..f58efa7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -15,62 +15,101 @@
package com.googlesource.gerrit.plugins.multisite.broker;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.util.Set;
-import java.util.UUID;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BrokerApiWrapper implements BrokerApi {
+ private static final Logger log = LoggerFactory.getLogger(BrokerApiWrapper.class);
+ private final Executor executor;
private final DynamicItem<BrokerApi> apiDelegate;
private final BrokerMetrics metrics;
private final MessageLogger msgLog;
- private final UUID instanceId;
@Inject
public BrokerApiWrapper(
+ @BrokerExecutor Executor executor,
DynamicItem<BrokerApi> apiDelegate,
BrokerMetrics metrics,
- MessageLogger msgLog,
- @InstanceId UUID instanceId) {
+ MessageLogger msgLog) {
this.apiDelegate = apiDelegate;
+ this.executor = executor;
this.metrics = metrics;
this.msgLog = msgLog;
- this.instanceId = instanceId;
}
- public boolean send(String topic, Event event) {
- return send(topic, apiDelegate.get().newMessage(instanceId, event));
- }
-
- @Override
- public boolean send(String topic, EventMessage message) {
- if (Context.isForwardedEvent()) {
- return true;
- }
- boolean succeeded = false;
+ public boolean sendSync(String topic, Event event) {
try {
- succeeded = apiDelegate.get().send(topic, message);
- } finally {
- if (succeeded) {
- msgLog.log(Direction.PUBLISH, topic, message);
- metrics.incrementBrokerPublishedMessage();
- } else {
- metrics.incrementBrokerFailedToPublishMessage();
- }
+ return send(topic, event).get();
+ } catch (Throwable e) {
+ log.error(
+ "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
+ event,
+ topic,
+ e.getMessage(),
+ e.getStackTrace());
+ metrics.incrementBrokerFailedToPublishMessage();
+ return false;
}
- return succeeded;
}
@Override
- public void receiveAsync(String topic, Consumer<EventMessage> messageConsumer) {
+ public ListenableFuture<Boolean> send(String topic, Event message) {
+ SettableFuture<Boolean> resultFuture = SettableFuture.create();
+ if (Context.isForwardedEvent()) {
+ resultFuture.set(true);
+ return resultFuture;
+ }
+
+ if (Strings.isNullOrEmpty(message.instanceId)) {
+ log.warn(
+ "Dropping event '{}' because event instance id cannot be null or empty",
+ message.toString());
+ resultFuture.set(true);
+ return resultFuture;
+ }
+
+ ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
+ Futures.addCallback(
+ resfultF,
+ new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ msgLog.log(Direction.PUBLISH, topic, message);
+ metrics.incrementBrokerPublishedMessage();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ log.error(
+ "Failed to publish message '{}' to topic '{}' - error: {}",
+ message.toString(),
+ topic,
+ throwable.getMessage());
+ metrics.incrementBrokerFailedToPublishMessage();
+ }
+ },
+ executor);
+
+ return resfultF;
+ }
+
+ @Override
+ public void receiveAsync(String topic, Consumer<Event> messageConsumer) {
apiDelegate.get().receiveAsync(topic, messageConsumer);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
similarity index 83%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
index 87306a2..aa24eb1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2019 The Android Open Source Project
+// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.broker;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@@ -21,4 +21,4 @@
@Retention(RUNTIME)
@BindingAnnotation
-public @interface InstanceId {}
+@interface BrokerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
new file mode 100644
index 0000000..e843263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
+
+@Singleton
+class BrokerExecutorProvider extends ExecutorProvider {
+
+ @Inject
+ BrokerExecutorProvider(WorkQueue workQueue) {
+ super(workQueue, 1, "Multi-Site-Broker");
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
new file mode 100644
index 0000000..a5dac4a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import java.util.concurrent.Executor;
+
+public class BrokerModule extends LifecycleModule {
+
+ @Override
+ protected void configure() {
+ bind(Executor.class)
+ .annotatedWith(BrokerExecutor.class)
+ .toProvider(BrokerExecutorProvider.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
index 2990264..e418da5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
@@ -17,6 +17,7 @@
import com.google.common.cache.RemovalNotification;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -28,21 +29,25 @@
private final Executor executor;
private final DynamicSet<CacheEvictionForwarder> forwarders;
private final CachePatternMatcher matcher;
+ private final String instanceId;
@Inject
CacheEvictionHandler(
DynamicSet<CacheEvictionForwarder> forwarders,
@CacheExecutor Executor executor,
- CachePatternMatcher matcher) {
+ CachePatternMatcher matcher,
+ @GerritInstanceId String instanceId) {
this.forwarders = forwarders;
this.executor = executor;
this.matcher = matcher;
+ this.instanceId = instanceId;
}
@Override
public void onRemoval(String plugin, String cache, RemovalNotification<K, V> notification) {
if (!Context.isForwardedEvent() && !notification.wasEvicted() && matcher.matches(cache)) {
- executor.execute(new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey())));
+ executor.execute(
+ new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey(), instanceId)));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
index b8521a3..2dcb09a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
@@ -25,7 +25,7 @@
@Singleton
class CachePatternMatcher {
- private static final List<String> DEFAULT_PATTERNS =
+ private static final ImmutableList<String> DEFAULT_PATTERNS =
ImmutableList.of("^groups.*", "ldap_groups", "ldap_usernames", "projects", "sshkeys");
private final Pattern pattern;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index fdc6fc3..44f8417 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -19,6 +19,7 @@
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.events.ProjectEvent;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -33,15 +34,18 @@
private final DynamicSet<ProjectListUpdateForwarder> forwarders;
private final Executor executor;
private final ProjectsFilter projectsFilter;
+ private final String instanceId;
@Inject
public ProjectListUpdateHandler(
DynamicSet<ProjectListUpdateForwarder> forwarders,
@CacheExecutor Executor executor,
- ProjectsFilter filter) {
+ ProjectsFilter filter,
+ @GerritInstanceId String instanceId) {
this.forwarders = forwarders;
this.executor = executor;
this.projectsFilter = filter;
+ this.instanceId = instanceId;
}
@Override
@@ -59,7 +63,8 @@
private void process(ProjectEvent event, boolean delete) {
if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
executor.execute(
- new ProjectListUpdateTask(new ProjectListUpdateEvent(event.getProjectName(), delete)));
+ new ProjectListUpdateTask(
+ new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index b37f434..7107b87 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,20 +14,19 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.base.Strings;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
import java.io.IOException;
-import java.util.UUID;
import java.util.function.Consumer;
public abstract class AbstractSubcriber {
@@ -44,13 +43,13 @@
public AbstractSubcriber(
ForwardedEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @InstanceId UUID instanceId,
+ @GerritInstanceId String gerritInstanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
Configuration cfg) {
this.eventRouter = eventRouter;
this.droppedEventListeners = droppedEventListeners;
- this.instanceId = instanceId.toString();
+ this.instanceId = gerritInstanceId;
this.msgLog = msgLog;
this.subscriberMetrics = subscriberMetrics;
this.cfg = cfg;
@@ -59,19 +58,22 @@
protected abstract EventTopic getTopic();
- public Consumer<EventMessage> getConsumer() {
+ protected abstract Boolean shouldConsumeEvent(Event event);
+
+ public Consumer<Event> getConsumer() {
return this::processRecord;
}
- private void processRecord(EventMessage event) {
- String sourceInstanceId = event.getHeader().sourceInstanceId;
+ private void processRecord(Event event) {
+ String sourceInstanceId = event.instanceId;
- if (Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId)) {
+ if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
+ || !shouldConsumeEvent(event)) {
if (Strings.isNullOrEmpty(sourceInstanceId)) {
logger.atWarning().log(
String.format(
"Dropping event %s because sourceInstanceId cannot be null", event.toString()));
- } else {
+ } else if (instanceId.equals(sourceInstanceId)) {
logger.atFiner().log(
String.format(
"Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
@@ -80,16 +82,16 @@
} else {
try {
msgLog.log(Direction.CONSUME, topic, event);
- eventRouter.route(event.getEvent());
+ eventRouter.route(event);
subscriberMetrics.incrementSubscriberConsumedMessage();
- subscriberMetrics.updateReplicationStatusMetrics(event);
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Malformed event '%s'", event.getHeader());
+ logger.atSevere().withCause(e).log("Malformed event '%s'", event);
subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
} catch (PermissionBackendException | CacheNotFoundException e) {
- logger.atSevere().withCause(e).log("Cannot handle message '%s'", event.getHeader());
+ logger.atSevere().withCause(e).log("Cannot handle message '%s'", event);
subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
}
}
+ subscriberMetrics.updateReplicationStatusMetrics(event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
index 5bbaee0..cdbf220 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -14,31 +14,49 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
@Singleton
public class BatchIndexEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public BatchIndexEventSubscriber(
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @InstanceId UUID instanceId,
+ @GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.BATCH_INDEX_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ if (event instanceof ChangeIndexEvent) {
+ return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+ }
+ if (event instanceof ProjectIndexEvent) {
+ return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+ }
+ return true;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index eae66b4..d8342b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -15,14 +15,14 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
-import java.util.UUID;
@Singleton
public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@@ -30,7 +30,7 @@
public CacheEvictionEventSubscriber(
CacheEvictionEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @InstanceId UUID instanceId,
+ @GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
Configuration cfg) {
@@ -42,4 +42,9 @@
protected EventTopic getTopic() {
return EventTopic.CACHE_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ return true;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 6f4680c..b34e02c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
public interface DroppedEventListener {
/**
@@ -22,5 +22,5 @@
*
* @param event information about the event.
*/
- void onEventDropped(EventMessage event);
+ void onEventDropped(Event event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 49d470a..480fc50 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -14,31 +14,68 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.entities.Change;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
+import java.util.Optional;
@Singleton
public class IndexEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+ private final ChangeFinder changeFinder;
+
@Inject
public IndexEventSubscriber(
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @InstanceId UUID instanceId,
+ @GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter,
+ ChangeFinder changeFinder) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
+ this.changeFinder = changeFinder;
}
@Override
protected EventTopic getTopic() {
return EventTopic.INDEX_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ if (event instanceof ChangeIndexEvent) {
+ ChangeIndexEvent changeIndexEvent = (ChangeIndexEvent) event;
+ String projectName = changeIndexEvent.projectName;
+ if (isDeletedChangeWithEmptyProject(changeIndexEvent)) {
+ projectName = findProjectFromChangeId(changeIndexEvent.changeId).orElse(projectName);
+ }
+ return projectsFilter.matches(projectName);
+ }
+ if (event instanceof ProjectIndexEvent) {
+ return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+ }
+ return true;
+ }
+
+ private boolean isDeletedChangeWithEmptyProject(ChangeIndexEvent changeIndexEvent) {
+ return changeIndexEvent.deleted && changeIndexEvent.projectName.isEmpty();
+ }
+
+ private Optional<String> findProjectFromChangeId(int changeId) {
+ return changeFinder.findOne(Change.id(changeId)).map(c -> c.getChange().getProject().get());
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 6ff0969..069a516 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -14,31 +14,42 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import java.util.UUID;
@Singleton
public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public ProjectUpdateEventSubscriber(
ProjectListUpdateRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @InstanceId UUID instanceId,
+ @GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.PROJECT_LIST_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ return projectsFilter.matches(((ProjectListUpdateEvent) event).projectName);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
index 7360b8f..0253588 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -19,14 +19,20 @@
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.server.cache.CacheModule;
import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.project.ProjectCache;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import java.util.Collection;
@@ -36,23 +42,29 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Singleton
-public class ReplicationStatus implements LifecycleListener {
+public class ReplicationStatus implements LifecycleListener, ProjectDeletedListener {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
- static final String REPLICATION_STATUS_CACHE = "replication_status";
+ static final String REPLICATION_STATUS = "replication_status";
- public static Module cacheModule() {
+ public static Module cacheModule(WorkQueue queue) {
return new CacheModule() {
@Override
protected void configure() {
- persist(REPLICATION_STATUS_CACHE, String.class, Long.class)
+ persist(REPLICATION_STATUS, String.class, Long.class)
.version(1)
.keySerializer(StringCacheSerializer.INSTANCE)
.valueSerializer(new JavaCacheSerializer<>());
+
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(Names.named(REPLICATION_STATUS))
+ .toInstance(queue.createQueue(0, REPLICATION_STATUS));
}
};
}
@@ -62,20 +74,35 @@
private final Optional<ProjectVersionRefUpdate> projectVersionRefUpdate;
private final ProjectVersionLogger verLogger;
private final ProjectCache projectCache;
+ private final ScheduledExecutorService statusScheduler;
+
+ private final Configuration config;
+
+ private final MetricMaker metricMaker;
@Inject
public ReplicationStatus(
- @Named(REPLICATION_STATUS_CACHE) Cache<String, Long> cache,
+ @Named(REPLICATION_STATUS) Cache<String, Long> cache,
Optional<ProjectVersionRefUpdate> projectVersionRefUpdate,
ProjectVersionLogger verLogger,
- ProjectCache projectCache) {
+ ProjectCache projectCache,
+ @Named(REPLICATION_STATUS) ScheduledExecutorService statusScheduler,
+ Configuration config,
+ MetricMaker metricMaker) {
this.cache = cache;
this.projectVersionRefUpdate = projectVersionRefUpdate;
this.verLogger = verLogger;
this.projectCache = projectCache;
+ this.statusScheduler = statusScheduler;
+ this.config = config;
+ this.metricMaker = metricMaker;
}
public Long getMaxLag() {
+ return getMaxLagMillis() / 1000;
+ }
+
+ public Long getMaxLagMillis() {
Collection<Long> lags = replicationStatusPerProject.values();
if (lags.isEmpty()) {
return 0L;
@@ -121,15 +148,75 @@
}
}
+ void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
+ Optional<Long> localVersion =
+ projectVersionRefUpdate.get().getProjectLocalVersion(projectName.get());
+
+ if (!localVersion.isPresent() && replicationStatusPerProject.containsKey(projectName.get())) {
+ cache.invalidate(projectName.get());
+ replicationStatusPerProject.remove(projectName.get());
+ localVersionPerProject.remove(projectName.get());
+ verLogger.logDeleted(projectName);
+ logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
+ }
+ }
+
+ @VisibleForTesting
+ Runnable replicationLagMetricPerProject(CallbackMetric1<String, Long> metricCallback) {
+ return () -> {
+ if (replicationStatusPerProject.isEmpty()) {
+ metricCallback.forceCreate("");
+ } else {
+ replicationStatusPerProject.entrySet().stream()
+ .filter(e -> e.getValue() > 0)
+ .forEach(
+ e ->
+ metricCallback.set(
+ SubscriberMetrics.sanitizeProjectName(e.getKey()), e.getValue()));
+ metricCallback.prune();
+ }
+ };
+ }
+
@VisibleForTesting
public void doUpdateLag(Project.NameKey projectName, Long lag) {
cache.put(projectName.get(), lag);
replicationStatusPerProject.put(projectName.get(), lag);
}
+ @VisibleForTesting
+ Long getReplicationStatus(String projectName) {
+ return replicationStatusPerProject.get(projectName);
+ }
+
+ @VisibleForTesting
+ Long getLocalVersion(String projectName) {
+ return localVersionPerProject.get(projectName);
+ }
+
@Override
public void start() {
loadAllFromCache();
+
+ long replicationLagPollingInterval = config.replicationLagRefreshInterval().toMillis();
+
+ if (replicationLagPollingInterval > 0) {
+ statusScheduler.scheduleAtFixedRate(
+ this::refreshProjectsWithLag,
+ replicationLagPollingInterval,
+ replicationLagPollingInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @VisibleForTesting
+ public void refreshProjectsWithLag() {
+ logger.atFine().log("Refreshing projects version lags triggered ...");
+ replicationStatusPerProject.entrySet().stream()
+ .filter(entry -> entry.getValue() > 0)
+ .map(Map.Entry::getKey)
+ .map(Project::nameKey)
+ .forEach(this::updateReplicationLag);
}
@Override
@@ -140,4 +227,9 @@
projectCache.all().stream().map(Project.NameKey::get).collect(Collectors.toSet());
replicationStatusPerProject.putAll(cache.getAllPresent(cachedProjects));
}
+
+ @Override
+ public void onProjectDeleted(Event event) {
+ removeProjectFromReplicationLagMetrics(Project.nameKey(event.getProjectName()));
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
index e954bc4..605e119 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
@@ -14,14 +14,27 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
import com.google.inject.Scopes;
public class ReplicationStatusModule extends LifecycleModule {
+
+ private final WorkQueue workQueue;
+
+ @Inject
+ public ReplicationStatusModule(WorkQueue workQueue) {
+ this.workQueue = workQueue;
+ }
+
@Override
protected void configure() {
bind(ReplicationStatus.class).in(Scopes.SINGLETON);
- install(ReplicationStatus.cacheModule());
+ install(ReplicationStatus.cacheModule(workQueue));
listener().to(ReplicationStatus.class);
+ DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationStatus.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 20c355e..ab64651 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -14,31 +14,45 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import java.util.UUID;
@Singleton
public class StreamEventSubscriber extends AbstractSubcriber {
+ private final ProjectsFilter projectsFilter;
+
@Inject
public StreamEventSubscriber(
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @InstanceId UUID instanceId,
+ @GerritInstanceId String instanceId,
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics,
- Configuration cfg) {
+ Configuration cfg,
+ ProjectsFilter projectsFilter) {
super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+ this.projectsFilter = projectsFilter;
}
@Override
protected EventTopic getTopic() {
return EventTopic.STREAM_EVENT_TOPIC;
}
+
+ @Override
+ protected Boolean shouldConsumeEvent(Event event) {
+ if (event instanceof ProjectEvent) {
+ return projectsFilter.matches(((ProjectEvent) event).getProjectNameKey().get());
+ }
+ return true;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 899233e..bb971ff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,32 +14,43 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.metrics.CallbackMetric1;
import com.google.gerrit.metrics.Counter1;
import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.logging.Metadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
-import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
-import com.googlesource.gerrit.plugins.replication.ReplicationScheduledEvent;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@Singleton
public class SubscriberMetrics extends MultiSiteMetrics {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
private static final String SUBSCRIBER_FAILURE_COUNTER =
"subscriber_msg_consumer_failure_counter";
- private static final String REPLICATION_LAG_SEC =
+ public static final String REPLICATION_LAG_SEC =
"multi_site/subscriber/subscriber_replication_status/sec_behind";
+ private static final String REPLICATION_LAG_MSEC =
+ "multi_site/subscriber/subscriber_replication_status/msec_behind";
+ private static final String REPLICATION_LAG_MSEC_PROJECT =
+ "multi_site/subscriber/subscriber_replication_status/msec_behind/per_project";
private final Counter1<String> subscriberSuccessCounter;
private final Counter1<String> subscriberFailureCounter;
private final ReplicationStatus replicationStatus;
+ private static final Pattern isValidMetricNamePattern = Pattern.compile("[a-zA-Z0-9_-]");
+ private static final Field<String> PROJECT_NAME =
+ Field.ofString("project_name", Metadata.Builder::cacheName).build();
@Inject
public SubscriberMetrics(MetricMaker metricMaker, ReplicationStatus replicationStatus) {
@@ -64,6 +75,51 @@
Long.class,
new Description("Replication lag (sec)").setGauge().setUnit(Description.Units.SECONDS),
replicationStatus::getMaxLag);
+ metricMaker.newCallbackMetric(
+ REPLICATION_LAG_MSEC,
+ Long.class,
+ new Description("Replication lag (msec)")
+ .setGauge()
+ .setUnit(Description.Units.MILLISECONDS),
+ replicationStatus::getMaxLagMillis);
+
+ CallbackMetric1<String, Long> metrics =
+ metricMaker.newCallbackMetric(
+ SubscriberMetrics.REPLICATION_LAG_MSEC_PROJECT,
+ Long.class,
+ new Description("Per-project replication lag (msec)")
+ .setGauge()
+ .setUnit(Description.Units.MILLISECONDS),
+ PROJECT_NAME);
+ metricMaker.newTrigger(metrics, replicationStatus.replicationLagMetricPerProject(metrics));
+ }
+
+ /**
+ * Ensures that the generated metric is compatible with prometheus metric names, as the set of
+ * values that represent a valid metric name are different from the ones that represent a valid
+ * project name. Main differences: - All _ are replaced with __ - All characters that aren't a
+ * letter(uppercase or lowercase) or a hyphen(-) are replaced with `_<hex_code>` This is to avoid
+ * all chances of name clashes when modifying a project name.
+ *
+ * @param name name of the metric to sanitize
+ * @return sanitized metric name
+ */
+ public static String sanitizeProjectName(String name) {
+ StringBuilder sanitizedName = new StringBuilder();
+ for (int i = 0; i < name.length(); i++) {
+ Character c = name.charAt(i);
+ Matcher matcher = isValidMetricNamePattern.matcher(String.valueOf(c));
+ if (matcher.find()) {
+ if (c == '_') {
+ sanitizedName.append("__");
+ } else {
+ sanitizedName.append(c);
+ }
+ } else {
+ sanitizedName.append("_").append(Integer.toHexString((int) c));
+ }
+ }
+ return sanitizedName.toString();
}
public void incrementSubscriberConsumedMessage() {
@@ -74,20 +130,18 @@
subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
}
- public void updateReplicationStatusMetrics(EventMessage eventMessage) {
- Event event = eventMessage.getEvent();
- if (event instanceof RefReplicationDoneEvent) {
- RefReplicationDoneEvent replicationDone = (RefReplicationDoneEvent) event;
- replicationStatus.updateReplicationLag(replicationDone.getProjectNameKey());
- } else if (event instanceof RefReplicatedEvent) {
- RefReplicatedEvent replicated = (RefReplicatedEvent) event;
- replicationStatus.updateReplicationLag(replicated.getProjectNameKey());
- } else if (event instanceof ReplicationScheduledEvent) {
- ReplicationScheduledEvent updated = (ReplicationScheduledEvent) event;
- replicationStatus.updateReplicationLag(updated.getProjectNameKey());
- } else if (event instanceof RefUpdatedEvent) {
- RefUpdatedEvent updated = (RefUpdatedEvent) event;
- replicationStatus.updateReplicationLag(updated.getProjectNameKey());
+ public void updateReplicationStatusMetrics(Event event) {
+
+ if (event instanceof RefReplicationDoneEvent
+ || event instanceof RefReplicatedEvent
+ || event instanceof ReplicationScheduledEvent
+ || event instanceof RefUpdatedEvent) {
+ ProjectEvent projectEvent = (ProjectEvent) event;
+ replicationStatus.updateReplicationLag(projectEvent.getProjectNameKey());
+ } else if (event instanceof ProjectDeletionReplicationSucceededEvent) {
+ ProjectDeletionReplicationSucceededEvent projectDeletion =
+ (ProjectDeletionReplicationSucceededEvent) event;
+ replicationStatus.removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java
index 1c29d75..269388c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java
@@ -14,14 +14,22 @@
package com.googlesource.gerrit.plugins.multisite.forwarder;
+import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECTS_API_ENDPOINT;
+import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
+
/** Allows to tag a forwarded event to avoid infinitely looping events. */
public class Context {
+ public static final String PULL_REPLICATION_PLUGIN_NAME = "pull-replication";
private static final ThreadLocal<Boolean> forwardedEvent = ThreadLocal.withInitial(() -> false);
private Context() {}
public static Boolean isForwardedEvent() {
- return forwardedEvent.get();
+ return forwardedEvent.get()
+ ||
+ // When the event is a result of pull-replication event, is considered as
+ // "forwarded" action because did not happen on this node.
+ isPullReplicationApplyObjectIndexing();
}
public static void setForwardedEvent(Boolean b) {
@@ -31,4 +39,10 @@
public static void unsetForwardedEvent() {
forwardedEvent.remove();
}
+
+ public static boolean isPullReplicationApplyObjectIndexing() {
+ String threadName = Thread.currentThread().getName();
+ return threadName.contains(PULL_REPLICATION_PLUGIN_NAME + "~" + APPLY_OBJECT_API_ENDPOINT)
+ || threadName.contains(PULL_REPLICATION_PLUGIN_NAME + "~" + APPLY_OBJECTS_API_ENDPOINT);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index 8d41500..522a78f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -63,17 +63,27 @@
protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
ChangeChecker checker = changeCheckerFactory.create(id);
Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
- if (changeNotes.isPresent()) {
+ boolean changeIsPresent = changeNotes.isPresent();
+ boolean changeIsConsistent = checker.isChangeConsistent();
+ if (changeIsPresent && changeIsConsistent) {
reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
} else {
log.warn(
- "Change {} not present yet in local Git repository (event={}) after {} attempt(s)",
+ "Change {} {} in local Git repository (event={}) after {} attempt(s)",
id,
+ !changeIsPresent
+ ? "not present yet"
+ : (changeIsConsistent ? "is" : "is not") + " consistent",
indexEvent,
retryCount);
if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
log.error(
- "Change {} could not be found in the local Git repository (event={})", id, indexEvent);
+ "Change {} {} in the local Git repository (event={})",
+ id,
+ !changeIsPresent
+ ? "could not be found"
+ : (changeIsConsistent ? "was" : "was not") + " consistent",
+ indexEvent);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
index 975916a..19936ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -49,6 +49,6 @@
return true;
}
- return broker.send(eventTopic.topic(cfg), event);
+ return broker.sendSync(eventTopic.topic(cfg), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index 9ff4688..8a020fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -35,6 +35,6 @@
@Override
public boolean send(Event event) {
- return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
+ return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
index 317eb76..065ef7e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
@@ -21,8 +21,8 @@
public int accountId;
- public AccountIndexEvent(int accountId) {
- super(TYPE);
+ public AccountIndexEvent(int accountId, String instanceId) {
+ super(TYPE, instanceId);
this.accountId = accountId;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
index 1c2185f..4444756 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
@@ -22,8 +22,8 @@
public String cacheName;
public Object key;
- public CacheEvictionEvent(String cacheName, Object key) {
- super(TYPE);
+ public CacheEvictionEvent(String cacheName, Object key, String instanceId) {
+ super(TYPE, instanceId);
this.cacheName = cacheName;
this.key = key;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
index ab4ddf4..64fbdfb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
@@ -28,8 +28,8 @@
public String targetSha;
public boolean deleted;
- public ChangeIndexEvent(String projectName, int changeId, boolean deleted) {
- super(TYPE);
+ public ChangeIndexEvent(String projectName, int changeId, boolean deleted, String instanceId) {
+ super(TYPE, instanceId);
this.projectName = projectName;
this.changeId = changeId;
this.deleted = deleted;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
index 4981b2f..05ac198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
@@ -24,8 +24,8 @@
public final String groupUUID;
public final ObjectId sha1;
- public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1) {
- super(TYPE);
+ public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1, String instanceId) {
+ super(TYPE, instanceId);
this.groupUUID = groupUUID;
this.sha1 = sha1;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
index ea2c3fb..2fdda72 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.events;
public abstract class IndexEvent extends MultiSiteEvent {
- protected IndexEvent(String type) {
- super(type);
+ protected IndexEvent(String type, String instanceId) {
+ super(type, instanceId);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
index 404d168..f29204b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
@@ -29,7 +29,8 @@
register(ProjectListUpdateEvent.TYPE, ProjectListUpdateEvent.class);
}
- protected MultiSiteEvent(String type) {
+ protected MultiSiteEvent(String type, String instanceId) {
super(type);
+ this.instanceId = instanceId;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
index 8bdb7b5..954befb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
@@ -21,8 +21,8 @@
public String projectName;
- public ProjectIndexEvent(String projectName) {
- super(TYPE);
+ public ProjectIndexEvent(String projectName, String instanceId) {
+ super(TYPE, instanceId);
this.projectName = projectName;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
index d030e5b..0e18b27 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
@@ -22,8 +22,8 @@
public String projectName;
public boolean remove;
- public ProjectListUpdateEvent(String projectName, boolean remove) {
- super(TYPE);
+ public ProjectListUpdateEvent(String projectName, boolean remove, String instanceId) {
+ super(TYPE, instanceId);
this.projectName = projectName;
this.remove = remove;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 202fb42..a647bce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -21,6 +21,10 @@
import com.google.gerrit.entities.Account;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.config.AllUsersName;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.RefEvent;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
@@ -32,12 +36,12 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
-public class IndexEventRouter implements ForwardedEventRouter<IndexEvent>, LifecycleListener {
+public class IndexEventRouter
+ implements ForwardedEventRouter<IndexEvent>, EventListener, LifecycleListener {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final ForwardedIndexAccountHandler indexAccountHandler;
@@ -45,6 +49,7 @@
private final ForwardedIndexGroupHandler indexGroupHandler;
private final ForwardedIndexProjectHandler indexProjectHandler;
private final AllUsersName allUsersName;
+ private final String gerritInstanceId;
@Inject
public IndexEventRouter(
@@ -52,12 +57,14 @@
ForwardedIndexChangeHandler indexChangeHandler,
ForwardedIndexGroupHandler indexGroupHandler,
ForwardedIndexProjectHandler indexProjectHandler,
- AllUsersName allUsersName) {
+ AllUsersName allUsersName,
+ @GerritInstanceId String gerritInstanceId) {
this.indexAccountHandler = indexAccountHandler;
this.indexChangeHandler = indexChangeHandler;
this.indexGroupHandler = indexGroupHandler;
this.indexProjectHandler = indexProjectHandler;
this.allUsersName = allUsersName;
+ this.gerritInstanceId = gerritInstanceId;
}
@Override
@@ -85,7 +92,7 @@
}
}
- public void onRefReplicated(RefReplicationDoneEvent replicationEvent) throws IOException {
+ public void onRefReplicated(RefEvent replicationEvent) throws IOException {
if (replicationEvent.getProjectNameKey().equals(allUsersName)) {
Account.Id accountId = Account.Id.fromRef(replicationEvent.getRefName());
if (accountId != null) {
@@ -97,6 +104,20 @@
}
@Override
+ public void onEvent(Event event) {
+ if (event instanceof RefEvent
+ && (event.getType().contains("fetch-ref-replicated")
+ || event.getType().contains("fetch-ref-replication-done"))
+ && gerritInstanceId.equals(event.instanceId)) {
+ try {
+ onRefReplicated((RefEvent) event);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while processing event %s", event);
+ }
+ }
+ }
+
+ @Override
public void start() {}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
index bac907e..2193034 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
@@ -14,7 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.router;
+import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
import com.google.inject.Scopes;
public class RouterModule extends LifecycleModule {
@@ -22,6 +24,8 @@
protected void configure() {
bind(IndexEventRouter.class).in(Scopes.SINGLETON);
listener().to(IndexEventRouter.class).in(Scopes.SINGLETON);
+ DynamicSet.bind(binder(), EventListener.class).to(IndexEventRouter.class);
+
bind(CacheEvictionEventRouter.class).in(Scopes.SINGLETON);
bind(ProjectListUpdateRouter.class).in(Scopes.SINGLETON);
bind(StreamEventRouter.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 4ef3426..95a3e66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -18,7 +18,7 @@
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
import java.io.IOException;
public class StreamEventRouter implements ForwardedEventRouter<Event> {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
index 2ae2d9b..26bfb53 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
@@ -14,14 +14,25 @@
package com.googlesource.gerrit.plugins.multisite.http;
+import com.google.inject.Inject;
import com.google.inject.servlet.ServletModule;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
public class HttpModule extends ServletModule {
public static final String LAG_ENDPOINT_SEGMENT = "replication-lag";
+ private final Configuration config;
+
+ @Inject
+ public HttpModule(Configuration config) {
+ this.config = config;
+ }
+
@Override
protected void configureServlets() {
- serve(String.format("/%s", LAG_ENDPOINT_SEGMENT)).with(ReplicationStatusServlet.class);
+ if (config.event().synchronize()) {
+ serve(String.format("/%s", LAG_ENDPOINT_SEGMENT)).with(ReplicationStatusServlet.class);
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
index 9ee59eb..3277150 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
@@ -59,4 +59,11 @@
* @throws IOException if an I/O error occurred while reading the local Change
*/
public Optional<Long> getComputedChangeTs() throws IOException;
+
+ /**
+ * Check if the local Change contains current patchset refs
+ *
+ * @return true if local change contains meta and current patchset refs
+ */
+ public boolean isChangeConsistent();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 08b26f7..4ef5b7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -19,6 +19,7 @@
import com.google.gerrit.exceptions.StorageException;
import com.google.gerrit.server.CommentsUtil;
import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.util.ManualRequestContext;
@@ -30,8 +31,11 @@
import java.sql.Timestamp;
import java.util.Objects;
import java.util.Optional;
+import org.eclipse.jgit.errors.MissingObjectException;
+import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +46,7 @@
private final OneOffRequestContext oneOffReqCtx;
private final String changeId;
private final ChangeFinder changeFinder;
+ private final String instanceId;
private Optional<Long> computedChangeTs = Optional.empty();
private Optional<ChangeNotes> changeNotes = Optional.empty();
@@ -55,12 +60,14 @@
CommentsUtil commentsUtil,
ChangeFinder changeFinder,
OneOffRequestContext oneOffReqCtx,
+ @GerritInstanceId String instanceId,
@Assisted String changeId) {
this.changeFinder = changeFinder;
this.gitRepoMgr = gitRepoMgr;
this.commentsUtil = commentsUtil;
this.oneOffReqCtx = oneOffReqCtx;
this.changeId = changeId;
+ this.instanceId = instanceId;
}
@Override
@@ -69,7 +76,8 @@
return getComputedChangeTs()
.map(
ts -> {
- ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, deleted);
+ ChangeIndexEvent event =
+ new ChangeIndexEvent(projectName, changeId, deleted, instanceId);
event.eventCreatedOn = ts;
event.targetSha = getBranchTargetSha();
return event;
@@ -140,6 +148,34 @@
}
}
+ @Override
+ public boolean isChangeConsistent() {
+ Optional<ChangeNotes> notes = getChangeNotes();
+ if (!notes.isPresent()) {
+ log.warn("Unable to compute change notes for change {}", changeId);
+ return true;
+ }
+ ObjectId currentPatchSetCommitId = notes.get().getCurrentPatchSet().commitId();
+ try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName());
+ RevWalk walk = new RevWalk(repo)) {
+ walk.parseCommit(currentPatchSetCommitId);
+ } catch (StorageException | MissingObjectException e) {
+ log.warn(
+ String.format(
+ "Consistency check failed for change %s, missing current patchset commit %s",
+ changeId, currentPatchSetCommitId.getName()),
+ e);
+ return false;
+ } catch (IOException e) {
+ log.warn(
+ String.format(
+ "Cannot check consistency for change %s, current patchset commit %s. Assuming change is consistent",
+ changeId, currentPatchSetCommitId.getName()),
+ e);
+ }
+ return true;
+ }
+
private Optional<Long> computeLastChangeTs() {
return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(notes));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index eef3e4b..02f1b1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -21,6 +21,7 @@
import com.google.gerrit.extensions.events.GroupIndexedListener;
import com.google.gerrit.extensions.events.ProjectIndexedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
@@ -48,6 +49,7 @@
private final ChangeCheckerImpl.Factory changeChecker;
private final ProjectsFilter projectsFilter;
private final GroupChecker groupChecker;
+ private final String instanceId;
@Inject
IndexEventHandler(
@@ -55,18 +57,20 @@
DynamicSet<IndexEventForwarder> forwarders,
ChangeCheckerImpl.Factory changeChecker,
ProjectsFilter projectsFilter,
- GroupChecker groupChecker) {
+ GroupChecker groupChecker,
+ @GerritInstanceId String instanceId) {
this.forwarders = forwarders;
this.executor = executor;
this.changeChecker = changeChecker;
this.projectsFilter = projectsFilter;
this.groupChecker = groupChecker;
+ this.instanceId = instanceId;
}
@Override
public void onAccountIndexed(int id) {
if (!Context.isForwardedEvent()) {
- IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id));
+ IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
if (queuedTasks.add(task)) {
executor.execute(task);
}
@@ -87,7 +91,8 @@
public void onGroupIndexed(String groupUUID) {
if (!Context.isForwardedEvent()) {
IndexGroupTask task =
- new IndexGroupTask(new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID)));
+ new IndexGroupTask(
+ new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID), instanceId));
if (queuedTasks.add(task)) {
executor.execute(task);
}
@@ -97,7 +102,7 @@
@Override
public void onProjectIndexed(String projectName) {
if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
- IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName));
+ IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
if (queuedTasks.add(task)) {
executor.execute(task);
}
@@ -132,7 +137,7 @@
private void executeDeleteChangeTask(int id) {
if (!Context.isForwardedEvent()) {
- IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true));
+ IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true, instanceId));
if (queuedTasks.add(task)) {
executor.execute(task);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
new file mode 100644
index 0000000..d851420
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
@@ -0,0 +1,200 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationFetchFilter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefDatabase;
+import org.eclipse.jgit.lib.Repository;
+
+@Singleton
+public class MultisiteReplicationFetchFilter implements ReplicationFetchFilter {
+ private static final String ZERO_ID_NAME = ObjectId.zeroId().name();
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final SharedRefDatabaseWrapper sharedRefDb;
+ private final GitRepositoryManager gitRepositoryManager;
+ private Configuration config;
+
+ @Inject
+ public MultisiteReplicationFetchFilter(
+ SharedRefDatabaseWrapper sharedRefDb,
+ GitRepositoryManager gitRepositoryManager,
+ Configuration config) {
+ this.sharedRefDb = sharedRefDb;
+ this.gitRepositoryManager = gitRepositoryManager;
+ this.config = config;
+ }
+
+ @Override
+ public Set<String> filter(String projectName, Set<String> refs) {
+ try (Repository repository =
+ gitRepositoryManager.openRepository(Project.nameKey(projectName))) {
+ RefDatabase refDb = repository.getRefDatabase();
+ return refs.stream()
+ .filter(ref -> !hasBeenRemovedFromGlobalRefDb(projectName, ref))
+ .filter(
+ ref -> {
+ if (shouldNotBeTrackedAnymoreOnGlobalRefDb(ref)) {
+ return true;
+ }
+ Optional<ObjectId> localRefOid =
+ getLocalSha1IfEqualsToExistingGlobalRefDb(
+ repository, projectName, refDb, ref, true);
+ localRefOid.ifPresent(
+ oid ->
+ repLog.info(
+ "{}:{}={} is already up-to-date with the shared-refdb and thus will NOT BE"
+ + " fetched",
+ projectName,
+ ref,
+ oid.getName()));
+
+ return !localRefOid.isPresent();
+ })
+ .collect(Collectors.toSet());
+ } catch (IOException ioe) {
+ String message = String.format("Error while opening project: '%s'", projectName);
+ repLog.error(message);
+ logger.atSevere().withCause(ioe).log(message);
+ return Collections.emptySet();
+ }
+ }
+
+ /*
+ * Since ac43a5f94c773c9db7a73d44035961d69d13fa53 the 'refs/multi-site/version' is
+ * not updated anymore on the global-refdb; however, the values stored already
+ * on the global-refdb could get in the way and prevent replication from happening
+ * as expected.
+ *
+ * Exclude the 'refs/multi-site/version' from local vs. global refdb checking
+ * pretending that the global-refdb for that ref did not exist.
+ */
+ private boolean shouldNotBeTrackedAnymoreOnGlobalRefDb(String ref) {
+ return MULTI_SITE_VERSIONING_REF.equals(ref);
+ }
+
+ /* If the ref to fetch has been set to all zeros on the global-refdb, it means
+ * that whatever is the situation locally, we do not need to fetch it:
+ * - If the remote still has it, fetching it will be useless because the global
+ * state is that the ref should be removed.
+ * - If the remote doesn't have it anymore, trying to fetch the ref won't do
+ * anything because you can't just remove local refs by fetching.
+ */
+ private boolean hasBeenRemovedFromGlobalRefDb(String projectName, String ref) {
+ if (foundAsZeroInSharedRefDb(Project.nameKey(projectName), ref)) {
+ repLog.info(
+ "{}:{} is found as zeros (removed) in shared-refdb thus will NOT BE fetched",
+ projectName,
+ ref);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean foundAsZeroInSharedRefDb(NameKey projectName, String ref) {
+ return sharedRefDb
+ .get(projectName, ref, String.class)
+ .map(r -> ZERO_ID_NAME.equals(r))
+ .orElse(false);
+ }
+
+ private Optional<ObjectId> getLocalSha1IfEqualsToExistingGlobalRefDb(
+ Repository repository,
+ String projectName,
+ RefDatabase refDb,
+ String ref,
+ boolean retryWithRandomSleep) {
+ try {
+ Optional<ObjectId> localRefObjectId =
+ Optional.ofNullable(refDb.exactRef(ref))
+ .filter(
+ r ->
+ sharedRefDb
+ .get(Project.nameKey(projectName), r.getName(), String.class)
+ .map(sharedRefObjId -> r.getObjectId().getName().equals(sharedRefObjId))
+ .orElse(false))
+ .map(Ref::getObjectId);
+
+ if (!localRefObjectId.isPresent() && retryWithRandomSleep) {
+ randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+ projectName, localRefObjectId, ref);
+ localRefObjectId =
+ getLocalSha1IfEqualsToExistingGlobalRefDb(repository, projectName, refDb, ref, false);
+ }
+
+ return localRefObjectId;
+ } catch (GlobalRefDbLockException gle) {
+ String message = String.format("%s is locked on shared-refdb", ref);
+ repLog.error(message);
+ logger.atSevere().withCause(gle).log(message);
+ return Optional.empty();
+ } catch (IOException ioe) {
+ String message =
+ String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
+ repLog.error(message);
+ logger.atSevere().withCause(ioe).log(message);
+ return Optional.empty();
+ }
+ }
+
+ private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+ String projectName, Optional<ObjectId> refObjectId, String ref) {
+ if (!config.replicationFilter().isFetchFilterRandomSleepEnabled()) {
+ repLog.debug(
+ "'{}' is not up-to-date for project '{}' [local='{}']. Random sleep is disabled,"
+ + " reload local ref without delay and re-check",
+ ref,
+ projectName,
+ refObjectId);
+ return;
+ }
+
+ int randomSleepTimeMsec = config.replicationFilter().fetchFilterRandomSleepTimeMs();
+ repLog.debug(
+ "'{}' is not up-to-date for project '{}' [local='{}']. Reload local ref in '{} ms' and"
+ + " re-check",
+ ref,
+ projectName,
+ refObjectId,
+ randomSleepTimeMsec);
+ try {
+ Thread.sleep(randomSleepTimeMsec);
+ } catch (InterruptedException ie) {
+ String message =
+ String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
+ repLog.error(message);
+ logger.atWarning().withCause(ie).log(message);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 98f4897..6f9c2e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -22,12 +22,13 @@
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Random;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.eclipse.jgit.lib.ObjectId;
@@ -42,20 +43,22 @@
public class MultisiteReplicationPushFilter implements ReplicationPushFilter {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String REF_META_SUFFIX = "/meta";
- public static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
- public static final int RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
static final String REPLICATION_LOG_NAME = "replication_log";
static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
private final SharedRefDatabaseWrapper sharedRefDb;
private final GitRepositoryManager gitRepositoryManager;
+ private Configuration config;
@Inject
public MultisiteReplicationPushFilter(
- SharedRefDatabaseWrapper sharedRefDb, GitRepositoryManager gitRepositoryManager) {
+ SharedRefDatabaseWrapper sharedRefDb,
+ GitRepositoryManager gitRepositoryManager,
+ Configuration config) {
this.sharedRefDb = sharedRefDb;
this.gitRepositoryManager = gitRepositoryManager;
+ this.config = config;
}
@Override
@@ -66,19 +69,23 @@
gitRepositoryManager.openRepository(Project.nameKey(projectName))) {
List<RemoteRefUpdate> filteredRefUpdates =
remoteUpdatesList.stream()
- .filter(
+ .map(
refUpdate -> {
- boolean refUpToDate = isUpToDateWithRetry(projectName, repository, refUpdate);
- if (!refUpToDate) {
+ Optional<RemoteRefUpdate> updatedRefUpdate =
+ isUpToDateWithRetry(projectName, repository, refUpdate);
+ if (!updatedRefUpdate.isPresent()) {
repLog.warn(
- "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+ "{} is not up-to-date with the shared-refdb and thus will NOT BE"
+ + " replicated",
refUpdate);
if (refUpdate.getSrcRef().endsWith(REF_META_SUFFIX)) {
outdatedChanges.add(getRootChangeRefPrefix(refUpdate.getSrcRef()));
}
}
- return refUpToDate;
+ return updatedRefUpdate;
})
+ .filter(Optional::isPresent)
+ .map(Optional::get)
.collect(Collectors.toList());
return filteredRefUpdates.stream()
@@ -102,45 +109,67 @@
}
}
- private boolean isUpToDateWithRetry(
+ private Optional<RemoteRefUpdate> isUpToDateWithRetry(
String projectName, Repository repository, RemoteRefUpdate refUpdate) {
String ref = refUpdate.getSrcRef();
try {
if (sharedRefDb.isUpToDate(
Project.nameKey(projectName),
new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
- return true;
+ return Optional.of(refUpdate);
}
randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
projectName, refUpdate, ref);
+ ObjectId reloadedNewObjectId = getNotNullExactRef(repository, ref);
+ RemoteRefUpdate refUpdateReloaded =
+ newRemoteRefUpdateWithObjectId(repository, refUpdate, reloadedNewObjectId);
return sharedRefDb.isUpToDate(
- Project.nameKey(projectName),
- new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, getNotNullExactRef(repository, ref)));
+ Project.nameKey(projectName),
+ new ObjectIdRef.Unpeeled(
+ Ref.Storage.NETWORK, ref, refUpdateReloaded.getNewObjectId()))
+ ? Optional.of(refUpdateReloaded)
+ : Optional.empty();
} catch (GlobalRefDbLockException gle) {
String message =
String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
repLog.error(message);
logger.atSevere().withCause(gle).log(message);
- return false;
+ return Optional.empty();
} catch (IOException ioe) {
String message =
String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
repLog.error(message);
logger.atSevere().withCause(ioe).log(message);
- return false;
+ return Optional.empty();
}
}
+ private RemoteRefUpdate newRemoteRefUpdateWithObjectId(
+ Repository localDb, RemoteRefUpdate refUpdate, ObjectId reloadedNewObjectId)
+ throws IOException {
+ return new RemoteRefUpdate(
+ localDb,
+ refUpdate.getSrcRef(),
+ reloadedNewObjectId,
+ refUpdate.getRemoteName(),
+ refUpdate.isForceUpdate(),
+ null,
+ refUpdate.getExpectedOldObjectId());
+ }
+
private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
String projectName, RemoteRefUpdate refUpdate, String ref) {
- int randomSleepTimeMsec =
- MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS
- + new Random().nextInt(RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS);
+ if (!config.replicationFilter().isPushFilterRandomSleepEnabled()) {
+ return;
+ }
+
+ int randomSleepTimeMsec = config.replicationFilter().pushFilterRandomSleepTimeMs();
repLog.debug(
String.format(
- "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and re-check",
+ "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and"
+ + " re-check",
ref, projectName, refUpdate.getNewObjectId(), randomSleepTimeMsec));
try {
Thread.sleep(randomSleepTimeMsec);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
index 469122b..e07fef0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
@@ -20,27 +20,27 @@
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.GitRepositoryManager;
-import com.google.gerrit.server.notedb.IntBlob;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.Set;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.ObjectIdRef;
import org.eclipse.jgit.lib.ObjectInserter;
-import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
@@ -106,8 +106,7 @@
if (newProjectVersionRefUpdate.isPresent()) {
verLogger.log(projectNameKey, newVersion, 0L);
- if (updateSharedProjectVersion(
- projectNameKey, newProjectVersionRefUpdate.get().getNewObjectId(), newVersion)) {
+ if (updateSharedProjectVersion(projectNameKey, newVersion)) {
gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate.get(), null);
}
} else {
@@ -137,20 +136,9 @@
return newId;
}
- private boolean updateSharedProjectVersion(
- Project.NameKey projectNameKey, ObjectId newObjectId, Long newVersion)
+ private boolean updateSharedProjectVersion(Project.NameKey projectNameKey, Long newVersion)
throws SharedProjectVersionUpdateException {
- Ref sharedRef =
- sharedRefDb
- .get(projectNameKey, MULTI_SITE_VERSIONING_REF, String.class)
- .map(
- (String objectId) ->
- new ObjectIdRef.Unpeeled(
- Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.fromString(objectId)))
- .orElse(
- new ObjectIdRef.Unpeeled(
- Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.zeroId()));
Optional<Long> sharedVersion =
sharedRefDb
.get(projectNameKey, MULTI_SITE_VERSIONING_VALUE_REF, String.class)
@@ -160,59 +148,50 @@
if (sharedVersion.isPresent() && sharedVersion.get() >= newVersion) {
logger.atWarning().log(
String.format(
- "NOT Updating project %s version %s (value=%d) in shared ref-db because is more recent than the local one %s (value=%d) ",
- projectNameKey.get(),
- newObjectId,
- newVersion,
- sharedRef.getObjectId().getName(),
- sharedVersion.get()));
+ "NOT Updating project %s value=%d in shared ref-db because is more recent than the local value=%d",
+ projectNameKey.get(), newVersion, sharedVersion.get()));
return false;
}
logger.atFine().log(
String.format(
- "Updating shared project %s version to %s (value=%d)",
- projectNameKey.get(), newObjectId, newVersion));
+ "Updating shared project %s value to %d", projectNameKey.get(), newVersion));
- boolean success = sharedRefDb.compareAndPut(projectNameKey, sharedRef, newObjectId);
- if (!success) {
- String message =
- String.format(
- "Project version blob update failed for %s. Current value %s, new value: %s",
- projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
- logger.atSevere().log(message);
- throw new SharedProjectVersionUpdateException(message);
- }
-
- success =
- sharedRefDb.compareAndPut(
- projectNameKey,
- MULTI_SITE_VERSIONING_VALUE_REF,
- sharedVersion.map(Object::toString).orElse(null),
- newVersion.toString());
- if (!success) {
- String message =
- String.format(
- "Project version update failed for %s. Current value %s, new value: %s",
- projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
- logger.atSevere().log(message);
- throw new SharedProjectVersionUpdateException(message);
- }
-
+ updateProjectVersionValue(projectNameKey, newVersion, sharedVersion);
return true;
} catch (GlobalRefDbSystemError refDbSystemError) {
String message =
String.format(
- "Error while updating shared project version for %s. Current value %s, new value: %s. Error: %s",
+ "Error while updating shared project value for %s. Current value %s, new value: %s. Error: %s",
projectNameKey.get(),
- sharedRef.getObjectId(),
- newObjectId,
+ sharedVersion.map(Object::toString).orElse(null),
+ newVersion,
refDbSystemError.getMessage());
logger.atSevere().withCause(refDbSystemError).log(message);
throw new SharedProjectVersionUpdateException(message);
}
}
+ private void updateProjectVersionValue(
+ NameKey projectNameKey, Long newVersion, Optional<Long> sharedVersion) {
+ try {
+ if (sharedRefDb.isSetOperationSupported()) {
+ sharedRefDb.put(projectNameKey, MULTI_SITE_VERSIONING_VALUE_REF, newVersion.toString());
+ return;
+ }
+ } catch (NoSuchMethodError e) {
+ logger.atSevere().log(
+ "Global-refdb library is outdated and is not supporting "
+ + "'put' method, update global-refdb to the newest version. Falling back to 'compareAndPut'");
+ }
+
+ sharedRefDb.compareAndPut(
+ projectNameKey,
+ MULTI_SITE_VERSIONING_VALUE_REF,
+ sharedVersion.map(Object::toString).orElse(null),
+ newVersion.toString());
+ }
+
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate#getProjectLocalVersion(java.lang.String)
*/
@@ -220,9 +199,9 @@
public Optional<Long> getProjectLocalVersion(String projectName) {
try (Repository repository =
gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
- Optional<IntBlob> blob = IntBlob.parse(repository, MULTI_SITE_VERSIONING_REF);
+ Optional<Long> blob = longBlobParse(repository, MULTI_SITE_VERSIONING_REF);
if (blob.isPresent()) {
- Long repoVersion = Integer.toUnsignedLong(blob.get().value());
+ Long repoVersion = blob.get();
logger.atFine().log("Local project '%s' has version %d", projectName, repoVersion);
return Optional.of(repoVersion);
}
@@ -234,6 +213,22 @@
return Optional.empty();
}
+ private Optional<Long> longBlobParse(Repository repo, String refName) throws IOException {
+ return Optional.ofNullable(repo.exactRef(refName))
+ .map(
+ (ref) -> {
+ try {
+ return Long.parseLong(
+ new String(repo.open(ref.getObjectId()).getBytes(), StandardCharsets.UTF_8));
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log(
+ "Unable to extract long BLOB from %s:%s", repo.getDirectory(), ref);
+ return null;
+ }
+ })
+ .filter(Predicates.notNull());
+ }
+
/* (non-Javadoc)
* @see com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate#getProjectRemoteVersion(java.lang.String)
*/
@@ -245,10 +240,6 @@
return globalVersion.flatMap(longString -> getLongValueOf(longString));
}
- private Object safeGetObjectId(Ref currentRef) {
- return currentRef == null ? "null" : currentRef.getObjectId();
- }
-
private Optional<Long> getLongValueOf(String longString) {
try {
return Optional.ofNullable(Long.parseLong(longString));
@@ -286,7 +277,7 @@
}
private long getCurrentGlobalVersionNumber() {
- return System.currentTimeMillis() / 1000;
+ return System.currentTimeMillis();
}
private Boolean isSuccessful(RefUpdate.Result result) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PullReplicationFilterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PullReplicationFilterModule.java
new file mode 100644
index 0000000..b8ad99d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PullReplicationFilterModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationFetchFilter;
+
+public class PullReplicationFilterModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ DynamicItem.bind(binder(), ReplicationFetchFilter.class)
+ .to(MultisiteReplicationFetchFilter.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PushReplicationFilterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PushReplicationFilterModule.java
new file mode 100644
index 0000000..8a8e826
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PushReplicationFilterModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
+
+public class PushReplicationFilterModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ DynamicItem.bind(binder(), ReplicationPushFilter.class)
+ .to(MultisiteReplicationPushFilter.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index 8601cf2..fc4d505 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -30,14 +30,11 @@
import com.gerritforge.gerrit.globalrefdb.validation.dfsrefdb.SharedRefEnforcement;
import com.google.common.collect.ImmutableSet;
import com.google.gerrit.extensions.config.FactoryModule;
-import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.config.RepositoryConfig;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule;
-import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
public class ValidationModule extends FactoryModule {
private final Configuration cfg;
@@ -50,8 +47,6 @@
@Override
protected void configure() {
- install(new ReplicationExtensionPointModule());
-
bind(SharedRefDatabaseWrapper.class).in(Scopes.SINGLETON);
bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
factory(LockWrapper.Factory.class);
@@ -70,8 +65,6 @@
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF));
install(new RepositoryManagerModule(repoConfig));
- DynamicItem.bind(binder(), ReplicationPushFilter.class)
- .to(MultisiteReplicationPushFilter.class);
if (cfg.getSharedRefDbConfiguration().getSharedRefDb().getEnforcementRules().isEmpty()) {
bind(SharedRefEnforcement.class).to(DefaultSharedRefEnforcement.class).in(Scopes.SINGLETON);
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 623871b..d7c5648 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -7,7 +7,8 @@
message broker for aligning with the other masters over different sites.
The masters must be:
-
+* Gerrit instance id is mandatory for @PLUGIN@ plugin. All the master
+ must have [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit) populated.
* events-broker library must be installed as a library module in the
`$GERRIT_SITE/lib` directory of all the masters
* global-refdb library must be installed as a library module in the
@@ -98,3 +99,7 @@
* Subscriber replication lag (sec behind the producer)
`metric=site/multi_site/subscriber/subscriber_replication_status/sec_behind, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
+
+* Subscriber replication lag (millisec behind the producer)
+
+`metric=site/multi_site/subscriber/subscriber_replication_status/msec_behind, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
index bb38288..146341e 100644
--- a/src/main/resources/Documentation/build.md
+++ b/src/main/resources/Documentation/build.md
@@ -3,15 +3,29 @@
This plugin can be built with Bazel in the Gerrit tree.
Clone or link this plugin to the plugins directory of Gerrit's
-source tree. Put the external dependency Bazel build file into
-the Gerrit /plugins directory, replacing the existing empty one.
+source tree.
+
+Clone the [pull-replication](https://gerrit.googlesource.com/plugins/pull-replication) on
+the same branch of the @PLUGIN@ plugin and link it to the `gerrit/plugins` directory.
```
+ export BRANCH=$(git --git-dir=@PLUGIN@ branch)
+ git clone https://gerrit.googlesource.com/plugins/pull-replication
cd gerrit/plugins
rm external_plugin_deps.bzl
ln -s @PLUGIN@/external_plugin_deps.bzl .
```
+Clone the [global-refdb](git clone "https://gerrit.googlesource.com/modules/global-refdb") on
+the same branch of the @PLUGIN@ plugin and link it to the `gerrit/plugins` directory.
+
+```
+ export BRANCH=$(git --git-dir=@PLUGIN@ branch)
+ git clone "https://gerrit.googlesource.com/modules/global-refdb"
+ cd gerrit/plugins
+ ln -s ../../global-refdb .
+```
+
From the Gerrit source tree issue the command:
```
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 02806a1..e451cb3 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -3,15 +3,19 @@
=========================
The @PLUGIN@ plugin must be installed as a library module in the
-`$GERRIT_SITE/lib` folder of all the instances. Configuration should
-be specified in the `$site_path/etc/@PLUGIN@.config` file.
+`$GERRIT_SITE/lib` folder of all the instances and linked to
+`$GERRIT_SITE/plugins`, which enable to use it as both libModule
+and plugin.
+Configuration should be specified in the `$site_path/etc/@PLUGIN@.config` file.
## Configuration parameters
```cache.synchronize```
-: Whether to run cache evictions synchronously. It requires disabling the
- background cache evictions notifications in `gerrit.config` by setting
- `cache.threads = 0`.
+: Whether to synchronize cache evictions. Set to false when relying on
+ low cache TTLs and therefore cache eviction is not strictly needed.
+ It requires disabling the background cache evictions notifications in
+ `gerrit.config` by setting `cache.threads = 0`.
+
Defaults to true.
```cache.threadPoolSize```
@@ -29,7 +33,8 @@
forwarded.
```event.synchronize```
-: Whether to synchronize stream events.
+: Whether to synchronize stream events. Set to false when not using the SSH
+ stream events.
Defaults to true.
```index.numStripedLocks```
@@ -37,7 +42,9 @@
Defaults to 10
```index.synchronize```
-: Whether to synchronize secondary indexes.
+: Whether to synchronize secondary indexes. Set to false when using multi-site
+ on Gerrit replicas that do not have an index, or when using an external
+ service such as ElasticSearch.
Defaults to true.
```index.threadPoolSize```
@@ -72,10 +79,19 @@
: Name of the topic to use for publishing cache eviction events
Defaults to GERRIT.EVENT.PROJECT.LIST
+**NOTE**: All broker settings are ignored when all of the `cache`,
+`index` or `event` synchronization is disabled.
+
```ref-database.enabled```
: Enable the use of a shared ref-database
Defaults: true
+```ref-database.replicationLagRefreshInterval```
+: Enable the auto-refresh of the metrics to trace the auto-replication
+ lag by polling on a regular basis. Set to zero for disabling the polling
+ mechanism.
+ Defaults: 60 min
+
```ref-database.enforcementRules.<policy>```
: Level of consistency enforcement across sites on a project:refs basis.
Supports two values for enforcing the policy on multiple projects or refs.
@@ -123,3 +139,80 @@
the project `foo/bar`, but no other project.
By default, all projects are matched.
+
+```replication.push-filter.minWaitBeforeReloadLocalVersionMs```
+: Specifies the minimum amount of time in milliseconds replication plugin filter will
+ wait before retrying check for ref which is not up to date with global-refdb.
+
+ By default: 1000 milliseconds
+
+```replication.push-filter.maxRandomWaitBeforeReloadLocalVersionMs```
+: Specifies the additional amount of time in milliseconds replication filter will
+ wait before retrying check for ref which is not up to date with global-refdb.
+
+ If maxRandomWaitBeforeReloadLocalVersionMs is set to zero random sleep for not in sync
+ refs is disabled.
+
+ By default: 1000 milliseconds
+
+```replication.fetch-filter.minWaitBeforeReloadLocalVersionMs```
+: Specifies the minimum amount of time in milliseconds pull-replication filter wait
+ before retrying check for ref which is not up to date with global-refdb.
+
+ By default: 1000 milliseconds
+
+```replication.fetch-filter.maxRandomWaitBeforeReloadLocalVersionMs```
+: Specifies the additional amount of time in milliseconds pull-replication filter will
+ wait before retrying check for ref which is not up to date with global-refdb.
+
+ If maxRandomWaitBeforeReloadLocalVersionMs is set to zero random sleep for not in sync
+ refs is disabled.
+
+ By default: 1000 milliseconds
+
+## Replication filters
+
+The @PLUGIN@ plugin is also responsible for filtering out replication events that may
+risk to create a split-brain situation.
+It integrates the push and pull replication filtering extension points for validating
+the refs to be replicated and dropping some of them.
+
+**Replication plugin**
+
+When using the Gerrit core replication plugin, also known as push-replication, link the
+`replication.jar` to the `$GERRIT_SITE/lib` directory and add the following libModule
+to `gerrit.config`:
+
+```
+[gerrit]
+ installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule
+```
+
+The above configuration would be automatically detected by the @PLUGIN@ plugin which would then
+install the PushReplicationFilterModule for filtering outgoing replication refs based
+on their global-refdb status:
+
+- Outgoing replication of refs that are NOT up-to-date with the global-refdb will be
+ discarded, because they may cause split-brain on the remote replication endpoints.
+
+- All other refs will be pushed as normal to the remote replication ends.
+
+**Pull-replication plugin**
+
+When using the [pull-replication](https://gerrit.googlesource.com/plugins/pull-replication)
+plugin, link the `pull-replication.jar` to the `$GERRIT_SITE/lib` directory and add the following
+two libModules to `gerrit.config`:
+
+```
+[gerrit]
+ installModule = com.googlesource.gerrit.plugins.replication.pull.ReplicationExtensionPointModule
+```
+
+The above configuration would be automatically detected by the @PLUGIN@ plugin which would then
+install the PullReplicationFilterModule for filtering incoming fetch replication refs based
+on their global-refdb status.
+
+- Incoming replication of refs that locally are already up-to-date with the global-refdb will be
+ discarded, because they would not add anything more to the current status of the local refs.
+
+- All other refs will be fetched as normal from the replication sources.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
deleted file mode 100644
index 2df60dd..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright (C) 2017 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.gerrit.server.config.SitePaths;
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ModuleTest {
-
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private Configuration configMock;
-
- @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
- private Module module;
-
- @Before
- public void setup() {
- module = new Module(configMock);
- }
-
- @Test
- public void shouldGetInstanceId() throws Exception {
- File tmpSitePath = tempFolder.newFolder();
- File tmpPluginDataPath =
- Paths.get(tmpSitePath.getPath(), "data", Configuration.PLUGIN_NAME).toFile();
- tmpPluginDataPath.mkdirs();
- Path path = Paths.get(tmpPluginDataPath.getPath(), Configuration.INSTANCE_ID_FILE);
- SitePaths sitePaths = new SitePaths(Paths.get(tmpSitePath.getPath()));
- assertThat(path.toFile().exists()).isFalse();
-
- UUID gotUUID1 = module.getInstanceId(sitePaths);
- assertThat(gotUUID1).isNotNull();
- assertThat(path.toFile().exists()).isTrue();
-
- UUID gotUUID2 = module.getInstanceId(sitePaths);
- assertThat(gotUUID1).isEqualTo(gotUUID2);
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 92fa101..7d1751c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -1,15 +1,19 @@
package com.googlesource.gerrit.plugins.multisite.broker;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -22,28 +26,35 @@
@Mock private BrokerApi brokerApi;
@Mock Event event;
@Mock MessageLogger msgLog;
- private UUID instanceId = UUID.randomUUID();
private String topic = "index";
private BrokerApiWrapper objectUnderTest;
@Before
public void setUp() {
+ event.instanceId = "instance-id";
objectUnderTest =
new BrokerApiWrapper(
- DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
+ MoreExecutors.directExecutor(),
+ DynamicItem.itemOf(BrokerApi.class, brokerApi),
+ brokerMetrics,
+ msgLog);
}
@Test
public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
- when(brokerApi.send(any(), any())).thenReturn(true);
+ SettableFuture<Boolean> resultF = SettableFuture.create();
+ resultF.set(true);
+ when(brokerApi.send(any(), any())).thenReturn(resultF);
objectUnderTest.send(topic, event);
verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
}
@Test
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
- when(brokerApi.send(any(), any())).thenReturn(false);
+ SettableFuture<Boolean> resultF = SettableFuture.create();
+ resultF.setException(new Exception("Force Future failure"));
+ when(brokerApi.send(any(), any())).thenReturn(resultF);
objectUnderTest.send(topic, event);
verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
}
@@ -53,10 +64,26 @@
when(brokerApi.send(any(), any()))
.thenThrow(new RuntimeException("Unexpected runtime exception"));
try {
- objectUnderTest.send(topic, event);
+ objectUnderTest.sendSync(topic, event);
} catch (RuntimeException e) {
// expected
}
verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
}
+
+ @Test
+ public void shouldSkipMessageSendingWhenInstanceIdIsNull() {
+ ProjectCreatedEvent event = new ProjectCreatedEvent();
+ event.instanceId = null;
+ objectUnderTest.send(topic, event);
+ verify(brokerApi, never()).send(any(), eq(event));
+ }
+
+ @Test
+ public void shouldSkipMessageSendingWhenInstanceIdIsEmpty() {
+ ProjectCreatedEvent event = new ProjectCreatedEvent();
+ event.instanceId = "";
+ objectUnderTest.send(topic, event);
+ verify(brokerApi, never()).send(any(), eq(event));
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
index 67be583..ce222d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
@@ -36,9 +36,10 @@
@Test
public void shouldNotPublishAccountsCacheEvictions() {
-
+ String instanceId = "instance-id";
final CacheEvictionHandler<String, String> handler =
- new CacheEvictionHandler<>(DynamicSet.emptySet(), executorMock, defaultCacheMatcher);
+ new CacheEvictionHandler<>(
+ DynamicSet.emptySet(), executorMock, defaultCacheMatcher, instanceId);
handler.onRemoval(
"test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index 4263ddb..c8216bd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -41,6 +41,8 @@
@RunWith(MockitoJUnitRunner.class)
public class ProjectListUpdateHandlerTest {
+ private static final String INSTANCE_ID = "instance-id";
+
private ProjectListUpdateHandler handler;
@Mock private ProjectListUpdateForwarder forwarder;
@@ -51,7 +53,7 @@
when(projectsFilter.matches(any(String.class))).thenReturn(true);
handler =
new ProjectListUpdateHandler(
- asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
+ asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
}
private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -69,7 +71,8 @@
handler.onNewProjectCreated(event);
verify(forwarder)
.updateProjectList(
- any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, false)));
+ any(ProjectListUpdateTask.class),
+ eq(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID)));
}
@Test
@@ -80,7 +83,8 @@
handler.onProjectDeleted(event);
verify(forwarder)
.updateProjectList(
- any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+ any(ProjectListUpdateTask.class),
+ eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
}
@Test
@@ -101,18 +105,22 @@
handler.onNewProjectCreated(event);
verify(forwarder, never())
.updateProjectList(
- any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+ any(ProjectListUpdateTask.class),
+ eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
}
@Test
public void testProjectUpdateTaskToString() throws Exception {
String projectName = "someProjectName";
ProjectListUpdateTask task =
- handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false));
+ handler
+ .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID));
assertThat(task.toString())
.isEqualTo(String.format("Update project list in target instance: add '%s'", projectName));
- task = handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true));
+ task =
+ handler
+ .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID));
assertThat(task.toString())
.isEqualTo(
String.format("Update project list in target instance: remove '%s'", projectName));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
new file mode 100644
index 0000000..cb1f78e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -0,0 +1,162 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Broker;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Ignore
+public abstract class AbstractSubscriberTestBase {
+ protected static final String NODE_INSTANCE_ID = "node-instance-id";
+ protected static final String INSTANCE_ID = "other-node-instance-id";
+ protected static final String PROJECT_NAME = "project-name";
+
+ @Mock protected DroppedEventListener droppedEventListeners;
+ @Mock protected MessageLogger msgLog;
+ @Mock protected SubscriberMetrics subscriberMetrics;
+ @Mock protected Configuration cfg;
+ @Mock protected Broker brokerCfg;
+ @Mock protected ProjectsFilter projectsFilter;
+
+ @SuppressWarnings("rawtypes")
+ protected ForwardedEventRouter eventRouter;
+
+ protected AbstractSubcriber objectUnderTest;
+
+ @Before
+ public void setup() {
+ when(cfg.broker()).thenReturn(brokerCfg);
+ when(brokerCfg.getTopic(any(), any())).thenReturn("test-topic");
+ eventRouter = eventRouter();
+ objectUnderTest = objectUnderTest();
+ }
+
+ @Test
+ public void shouldConsumeEventsWhenNotFilteredByProjectName()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ for (Event event : events()) {
+ when(projectsFilter.matches(any(String.class))).thenReturn(true);
+ objectUnderTest.getConsumer().accept(event);
+ verifyConsumed(event);
+ }
+ }
+
+ @Test
+ public void shouldSkipEventsWhenFilteredByProjectName()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ for (Event event : events()) {
+ when(projectsFilter.matches(any(String.class))).thenReturn(false);
+ objectUnderTest.getConsumer().accept(event);
+ verifySkipped(event);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldSkipLocalEvents()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ for (Event event : events()) {
+ event.instanceId = NODE_INSTANCE_ID;
+ when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, never()).route(event);
+ verify(droppedEventListeners, times(1)).onEventDropped(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners);
+ }
+ }
+
+ @Test
+ public void shouldUpdateReplicationMetricsWithLocalEvents() {
+ for (Event event : events()) {
+ event.instanceId = NODE_INSTANCE_ID;
+ when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+ }
+ }
+
+ @Test
+ public void shouldUpdateReplicationMetricsWithNonLocalEvents() {
+ for (Event event : events()) {
+ event.instanceId = INSTANCE_ID;
+ when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+ }
+ }
+
+ protected abstract AbstractSubcriber objectUnderTest();
+
+ protected abstract List<Event> events();
+
+ @SuppressWarnings("rawtypes")
+ protected abstract ForwardedEventRouter eventRouter();
+
+ @SuppressWarnings("unchecked")
+ protected void verifySkipped(Event event)
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+ verify(eventRouter, never()).route(event);
+ verify(droppedEventListeners, times(1)).onEventDropped(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void verifyConsumed(Event event)
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners);
+ }
+
+ protected DynamicSet<DroppedEventListener> asDynamicSet(DroppedEventListener listener) {
+ DynamicSet<DroppedEventListener> result = new DynamicSet<>();
+ result.add("multi-site", listener);
+ return result;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
new file mode 100644
index 0000000..5031f36
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class BatchIndexEventSubscriberTest extends AbstractSubscriberTestBase {
+ private static final boolean DELETED = false;
+ private static final int CHANGE_ID = 1;
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(IndexEventRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ return ImmutableList.of(
+ new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+ new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new BatchIndexEventSubscriber(
+ (IndexEventRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
new file mode 100644
index 0000000..a197c91
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
@@ -0,0 +1,26 @@
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.Field;
+import org.junit.Ignore;
+
+@Ignore
+public class CallbackMetricMaker extends DisabledMetricMaker {
+ private int callbackMetricCounter = 0;
+
+ public int getCallbackMetricCounter() {
+ return callbackMetricCounter;
+ }
+
+ @Override
+ public <F1> Counter1<F1> newCounter(String name, Description desc, Field<F1> field1) {
+ callbackMetricCounter += 1;
+ return super.newCounter(name, desc, field1);
+ }
+
+ public void resetCallbackMetricCounter() {
+ callbackMetricCounter = 0;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
new file mode 100644
index 0000000..0e63528
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Account;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.util.time.TimeUtil;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class IndexEventSubscriberTest extends AbstractSubscriberTestBase {
+ private static final boolean DELETED = false;
+ private static final int CHANGE_ID = 1;
+ private static final String EMPTY_PROJECT_NAME = "";
+
+ @Mock protected ChangeFinder changeFinderMock;
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldConsumeDeleteChangeIndexEventWithEmptyProjectNameWhenFound()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ ChangeIndexEvent event = new ChangeIndexEvent(EMPTY_PROJECT_NAME, CHANGE_ID, true, INSTANCE_ID);
+
+ ChangeNotes changeNotesMock = mock(ChangeNotes.class);
+ when(changeNotesMock.getChange()).thenReturn(newChange());
+ when(changeFinderMock.findOne(any(Change.Id.class))).thenReturn(Optional.of(changeNotesMock));
+ when(projectsFilter.matches(PROJECT_NAME)).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldNOTConsumeDeleteChangeIndexEventWithEmptyProjectNameWhenNotFound()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ ChangeIndexEvent event = new ChangeIndexEvent("", CHANGE_ID, true, INSTANCE_ID);
+
+ when(changeFinderMock.findOne(any(Change.Id.class))).thenReturn(Optional.empty());
+ when(projectsFilter.matches(EMPTY_PROJECT_NAME)).thenReturn(false);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, times(1)).matches(EMPTY_PROJECT_NAME);
+ verify(eventRouter, never()).route(event);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(IndexEventRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ return ImmutableList.of(
+ new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+ new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new IndexEventSubscriber(
+ (IndexEventRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter,
+ changeFinderMock);
+ }
+
+ private Change newChange() {
+ return new Change(
+ Change.key(Integer.toString(CHANGE_ID)),
+ Change.id(CHANGE_ID),
+ Account.id(9999),
+ BranchNameKey.create(Project.nameKey(PROJECT_NAME), "refs/heads/master"),
+ TimeUtil.nowTs());
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
new file mode 100644
index 0000000..201ee81
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import java.util.List;
+
+public class ProjectUpdateEventSubscriberTest extends AbstractSubscriberTestBase {
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(ProjectListUpdateRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ return ImmutableList.of(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new ProjectUpdateEventSubscriber(
+ (ProjectListUpdateRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
index ce82954..7828062 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -15,16 +15,27 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSortedSet;
import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.DisabledMetricMaker;
import com.google.gerrit.server.project.ProjectCache;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -37,6 +48,7 @@
@Mock private ProjectVersionLogger verLogger;
@Mock private ProjectCache projectCache;
@Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
+ @Mock private CallbackMetric1<String, Long> perProjectReplicationLagMetricCallback;
private ReplicationStatus objectUnderTest;
private Cache<String, Long> replicationStatusCache;
@@ -48,7 +60,13 @@
replicationStatusCache = CacheBuilder.newBuilder().build();
objectUnderTest =
new ReplicationStatus(
- replicationStatusCache, Optional.of(projectVersionRefUpdate), verLogger, projectCache);
+ replicationStatusCache,
+ Optional.of(projectVersionRefUpdate),
+ verLogger,
+ projectCache,
+ Executors.newScheduledThreadPool(1),
+ new Configuration(new Config(), new Config()),
+ new DisabledMetricMaker());
}
@Test
@@ -57,6 +75,14 @@
replicationStatusCache.put("projectB", 3L);
objectUnderTest.start();
+ assertThat(objectUnderTest.getMaxLagMillis()).isEqualTo(10L);
+ }
+
+ @Test
+ public void shouldConvertMillisLagFromPersistedCacheOnStartToSecs() {
+ replicationStatusCache.put("projectA", 10000L);
+
+ objectUnderTest.start();
assertThat(objectUnderTest.getMaxLag()).isEqualTo(10L);
}
@@ -67,7 +93,7 @@
objectUnderTest.start();
objectUnderTest.doUpdateLag(Project.nameKey("projectA"), 20L);
- assertThat(objectUnderTest.getMaxLag()).isEqualTo(20L);
+ assertThat(objectUnderTest.getMaxLagMillis()).isEqualTo(20L);
}
@Test
@@ -87,4 +113,127 @@
assertThat(replicationStatusCache.getIfPresent("projectA")).isEqualTo(20L);
}
+
+ @Test
+ public void shouldRemoveProjectFromPersistedCache() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+ }
+
+ @Test
+ public void shouldRemoveProjectFromReplicationLags() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+ assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(objectUnderTest.getReplicationLags(1).keySet()).isEmpty();
+ }
+
+ @Test
+ public void shouldNotRemoveProjectFromReplicationLagsIfLocalVersionStillExists() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+ .thenReturn(Optional.of(System.currentTimeMillis()));
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+ }
+
+ @Test
+ public void shouldNotEvictFromPersistentCacheIfLocalVersionStillExists() {
+ String projectName = "projectA";
+ long lag = 100;
+ setupReplicationLag(projectName, lag);
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+ .thenReturn(Optional.of(System.currentTimeMillis()));
+
+ objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(lag);
+ }
+
+ @Test
+ public void shouldUpdateReplicationLagForProject() {
+ String projectName = "projectA";
+ long projectLocalVersion = 10L;
+ long projectRemoteVersion = 20L;
+
+ when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectLocalVersion));
+ when(projectVersionRefUpdate.getProjectRemoteVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectRemoteVersion));
+
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+ objectUnderTest.replicationLagMetricPerProject(perProjectReplicationLagMetricCallback).run();
+
+ assertThat(replicationStatusCache.getIfPresent(projectName))
+ .isEqualTo(projectRemoteVersion - projectLocalVersion);
+ verify(perProjectReplicationLagMetricCallback)
+ .set(eq(projectName), eq(projectRemoteVersion - projectLocalVersion));
+ }
+
+ @Test
+ public void shouldNotGenerateCallbackMetricIfNoReplicationLag() {
+ String projectName = "projectA";
+ long projectLatestVersion = 10L;
+ when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectLatestVersion));
+
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+ verify(perProjectReplicationLagMetricCallback, never()).set(any(), any());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldAutoRefreshReplicationLagForProject() {
+ String projectName = "projectA";
+ long projectLocalVersion = 10L;
+ long projectRemoteVersion = 20L;
+
+ when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectLocalVersion), Optional.of(projectRemoteVersion));
+ when(projectVersionRefUpdate.getProjectRemoteVersion(eq(projectName)))
+ .thenReturn(Optional.of(projectRemoteVersion));
+
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+
+ assertThat(replicationStatusCache.getIfPresent(projectName))
+ .isEqualTo(projectRemoteVersion - projectLocalVersion);
+ objectUnderTest.refreshProjectsWithLag();
+
+ assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(0);
+ }
+
+ private void setupReplicationLag(String projectName, long lag) {
+ long currentVersion = System.currentTimeMillis();
+ long newVersion = currentVersion + lag;
+ replicationStatusCache.put(projectName, 3L);
+ when(projectVersionRefUpdate.getProjectRemoteVersion(projectName))
+ .thenReturn(Optional.of(newVersion));
+ when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+ .thenReturn(Optional.of(currentVersion));
+ objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+ }
+
+ private ProjectDeletedListener.Event projectDeletedEvent(String projectName) {
+ ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
+ when(event.getProjectName()).thenReturn(projectName);
+ return event;
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
new file mode 100644
index 0000000..71fd828
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class StreamEventSubscriberTest extends AbstractSubscriberTestBase {
+ private static final NameKey PROJECT_NAME_KEY = NameKey.parse(PROJECT_NAME);
+ private @Mock RefUpdatedEvent refUpdatedEvent;
+ private @Mock RefReplicationDoneEvent refReplicationDoneEvent;
+ private @Mock ReplicationScheduledEvent replicationScheduledEvent;
+ private @Mock RefReplicatedEvent refReplicatedEvent;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ protected ForwardedEventRouter eventRouter() {
+ return mock(StreamEventRouter.class);
+ }
+
+ @Override
+ protected List<Event> events() {
+ when(refUpdatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ refUpdatedEvent.instanceId = INSTANCE_ID;
+ when(refReplicationDoneEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ refReplicationDoneEvent.instanceId = INSTANCE_ID;
+ when(replicationScheduledEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ replicationScheduledEvent.instanceId = INSTANCE_ID;
+ when(refReplicatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+ refReplicatedEvent.instanceId = INSTANCE_ID;
+
+ return ImmutableList.of(
+ refUpdatedEvent, refReplicationDoneEvent, replicationScheduledEvent, refReplicatedEvent);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldNotConsumeNonProjectEventTypeEvents()
+ throws IOException, PermissionBackendException, CacheNotFoundException {
+ IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(projectsFilter, never()).matches(PROJECT_NAME);
+ verify(eventRouter, times(1)).route(event);
+ verify(droppedEventListeners, never()).onEventDropped(event);
+ }
+
+ @Test
+ public void shouldUpdateReplicationMetricsWithRemoteRefEvent() {
+ Event event =
+ new RefEvent("ref-replicated") {
+
+ @Override
+ public String getRefName() {
+ return "foo-ref";
+ }
+
+ @Override
+ public NameKey getProjectNameKey() {
+ return Project.nameKey(PROJECT_NAME);
+ }
+ };
+
+ event.instanceId = INSTANCE_ID;
+ when(projectsFilter.matches(eq(PROJECT_NAME))).thenReturn(true);
+
+ objectUnderTest.getConsumer().accept(event);
+
+ verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+ reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+ }
+
+ @Override
+ protected AbstractSubcriber objectUnderTest() {
+ return new StreamEventSubscriber(
+ (StreamEventRouter) eventRouter,
+ asDynamicSet(droppedEventListeners),
+ NODE_INSTANCE_ID,
+ msgLog,
+ subscriberMetrics,
+ cfg,
+ projectsFilter);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index 4c45e07..6413b18 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -14,23 +14,28 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
import com.google.common.base.Suppliers;
import com.google.common.cache.CacheBuilder;
import com.google.gerrit.entities.Project;
+import com.google.gerrit.metrics.DisabledMetricMaker;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.project.ProjectCache;
import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import java.net.URISyntaxException;
import java.util.Optional;
-import java.util.UUID;
+import java.util.concurrent.Executors;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.URIish;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -43,37 +48,36 @@
private static final Project.NameKey A_TEST_PROJECT_NAME_KEY =
Project.nameKey(A_TEST_PROJECT_NAME);
- @Mock private SharedRefDatabaseWrapper sharedRefDb;
- @Mock private GitReferenceUpdated gitReferenceUpdated;
@Mock private MetricMaker metricMaker;
@Mock private ProjectVersionLogger verLogger;
@Mock private ProjectCache projectCache;
@Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
private SubscriberMetrics metrics;
- private EventMessage.Header msgHeader;
+ private ReplicationStatus replicationStatus;
@Before
public void setup() throws Exception {
- msgHeader = new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID());
- metrics =
- new SubscriberMetrics(
- metricMaker,
- new ReplicationStatus(
- CacheBuilder.newBuilder().build(),
- Optional.of(projectVersionRefUpdate),
- verLogger,
- projectCache));
+ replicationStatus =
+ new ReplicationStatus(
+ CacheBuilder.newBuilder().build(),
+ Optional.of(projectVersionRefUpdate),
+ verLogger,
+ projectCache,
+ Executors.newScheduledThreadPool(1),
+ new com.googlesource.gerrit.plugins.multisite.Configuration(new Config(), new Config()),
+ new DisabledMetricMaker());
+ metrics = new SubscriberMetrics(metricMaker, replicationStatus);
}
@Test
public void shouldLogProjectVersionWhenReceivingRefUpdatedEventWithoutLag() {
- Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis() / 1000);
+ Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis());
when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
.thenReturn(globalRefDbVersion);
when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
.thenReturn(globalRefDbVersion);
- EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+ Event eventMessage = newRefUpdateEvent();
metrics.updateReplicationStatusMetrics(eventMessage);
@@ -82,20 +86,137 @@
@Test
public void shouldLogProjectVersionWhenReceivingRefUpdatedEventWithALag() {
- Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis() / 1000);
+ Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis());
long replicationLag = 60;
when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
.thenReturn(globalRefDbVersion.map(ts -> ts + replicationLag));
when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
.thenReturn(globalRefDbVersion);
- EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+ Event eventMessage = newRefUpdateEvent();
metrics.updateReplicationStatusMetrics(eventMessage);
verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, globalRefDbVersion.get(), replicationLag);
}
+ @Test
+ public void
+ shouldLogUponProjectDeletionSuccessWhenLocalVersionDoesNotExistAndSubscriberMetricsExist()
+ throws Exception {
+ long nowSecs = System.currentTimeMillis();
+ long replicationLagSecs = 60;
+ Optional<Long> globalRefDbVersion = Optional.of(nowSecs);
+ when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLagSecs));
+ when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(globalRefDbVersion);
+
+ Event refUpdateEventMessage = newRefUpdateEvent();
+ metrics.updateReplicationStatusMetrics(refUpdateEventMessage);
+
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+ .isEqualTo(replicationLagSecs);
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+
+ when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(Optional.empty());
+
+ Event projectDeleteEventMessage = projectDeletionSuccess();
+ metrics.updateReplicationStatusMetrics(projectDeleteEventMessage);
+
+ verify(verLogger).logDeleted(A_TEST_PROJECT_NAME_KEY);
+ }
+
+ @Test
+ public void shouldNotLogUponProjectDeletionSuccessWhenSubscriberMetricsDoNotExist()
+ throws Exception {
+ Event eventMessage = projectDeletionSuccess();
+ when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(Optional.empty());
+
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+
+ metrics.updateReplicationStatusMetrics(eventMessage);
+
+ verifyZeroInteractions(verLogger);
+ }
+
+ @Test
+ public void shouldNotLogUponProjectDeletionSuccessWhenLocalVersionStillExists() throws Exception {
+ Event eventMessage = projectDeletionSuccess();
+ Optional<Long> anyRefVersionValue = Optional.of(System.currentTimeMillis());
+ when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(anyRefVersionValue);
+
+ metrics.updateReplicationStatusMetrics(eventMessage);
+
+ verifyZeroInteractions(verLogger);
+ }
+
+ @Test
+ public void shouldRemoveProjectMetricsUponProjectDeletionSuccess() throws Exception {
+ long nowSecs = System.currentTimeMillis();
+ long replicationLagSecs = 60;
+ Optional<Long> globalRefDbVersion = Optional.of(nowSecs);
+ when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLagSecs));
+ when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(globalRefDbVersion);
+
+ Event eventMessage = newRefUpdateEvent();
+
+ metrics.updateReplicationStatusMetrics(eventMessage);
+
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+ .isEqualTo(replicationLagSecs);
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+
+ when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+ .thenReturn(Optional.empty());
+ Event projectDeleteEvent = projectDeletionSuccess();
+
+ metrics.updateReplicationStatusMetrics(projectDeleteEvent);
+
+ assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+ assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+ }
+
+ @Test
+ public void shouldDoNothingIfNameIsValid() {
+ String validProjectName = "aValidProject-Name123";
+
+ assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo(validProjectName);
+ }
+
+ @Test
+ public void shouldSanitizeNameWithDot() {
+ String validProjectName = "nameWithA.InTheMiddle";
+
+ assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo("nameWithA_2eInTheMiddle");
+ }
+
+ @Test
+ public void shouldSanitizeNameWithSlash() {
+ String validProjectName = "nameWithA/InTheMiddle";
+
+ assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo("nameWithA_2fInTheMiddle");
+ }
+
+ @Test
+ public void shouldDoubleUnderscoresInName() {
+ String validProjectName = "nameWithA_InTheMiddle";
+
+ assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo("nameWithA__InTheMiddle");
+ }
+
+ private ProjectDeletionReplicationSucceededEvent projectDeletionSuccess()
+ throws URISyntaxException {
+ return new ProjectDeletionReplicationSucceededEvent(
+ A_TEST_PROJECT_NAME, new URIish("git://target"));
+ }
+
private RefUpdatedEvent newRefUpdateEvent() {
RefUpdateAttribute refUpdate = new RefUpdateAttribute();
refUpdate.project = A_TEST_PROJECT_NAME;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index 76c001f..6762ba2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -34,6 +34,7 @@
@RunWith(MockitoJUnitRunner.class)
public class CacheEvictionEventRouterTest {
+ private static final String INSTANCE_ID = "instance-id";
private static Gson gson = new EventGsonProvider().get();
private CacheEvictionEventRouter router;
@Mock private ForwardedCacheEvictionHandler cacheEvictionHandler;
@@ -45,7 +46,7 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_CacheEviction() throws Exception {
- final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key");
+ final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key", INSTANCE_ID);
router.route(event);
verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -54,7 +55,7 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_CacheEvictionWithSlash()
throws Exception {
- final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/key");
+ final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/key", INSTANCE_ID);
router.route(event);
verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -63,7 +64,8 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_ProjectCacheEvictionWithSlash()
throws Exception {
- final CacheEvictionEvent event = new CacheEvictionEvent(Constants.PROJECTS, "some/project");
+ final CacheEvictionEvent event =
+ new CacheEvictionEvent(Constants.PROJECTS, "some/project", INSTANCE_ID);
router.route(event);
verify(cacheEvictionHandler)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 1dc07bd..413b6c1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -33,7 +33,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
import java.util.Optional;
import org.eclipse.jgit.lib.ObjectId;
import org.junit.Before;
@@ -44,7 +44,7 @@
@RunWith(MockitoJUnitRunner.class)
public class IndexEventRouterTest {
-
+ private static final String INSTANCE_ID = "instance-id";
private IndexEventRouter router;
@Mock private ForwardedIndexAccountHandler indexAccountHandler;
@Mock private ForwardedIndexChangeHandler indexChangeHandler;
@@ -61,12 +61,13 @@
indexChangeHandler,
indexGroupHandler,
indexProjectHandler,
- allUsersName);
+ allUsersName,
+ INSTANCE_ID);
}
@Test
public void routerShouldSendEventsToTheAppropriateHandler_AccountIndex() throws Exception {
- final AccountIndexEvent event = new AccountIndexEvent(1);
+ final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
router.route(event);
verify(indexAccountHandler)
@@ -80,7 +81,7 @@
StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
- final AccountIndexEvent event = new AccountIndexEvent(1);
+ final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
router.route(event);
verify(indexAccountHandler)
@@ -96,7 +97,7 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
final String groupId = "12";
- final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId());
+ final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId(), INSTANCE_ID);
router.route(event);
verify(indexGroupHandler)
@@ -108,7 +109,7 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_ProjectIndex() throws Exception {
final String projectName = "projectName";
- final ProjectIndexEvent event = new ProjectIndexEvent(projectName);
+ final ProjectIndexEvent event = new ProjectIndexEvent(projectName, INSTANCE_ID);
router.route(event);
verify(indexProjectHandler)
@@ -119,7 +120,7 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndex() throws Exception {
- final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false);
+ final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false, INSTANCE_ID);
router.route(event);
verify(indexChangeHandler)
@@ -133,7 +134,7 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndexDelete() throws Exception {
- final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true);
+ final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true, INSTANCE_ID);
router.route(event);
verify(indexChangeHandler)
@@ -147,7 +148,7 @@
@Test
public void routerShouldFailForNotRecognisedEvents() throws Exception {
- final IndexEvent newEventType = new IndexEvent("new-type") {};
+ final IndexEvent newEventType = new IndexEvent("new-type", INSTANCE_ID) {};
assertThrows(UnsupportedOperationException.class, () -> router.route(newEventType));
verifyZeroInteractions(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
index 93daf92..8a21c39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
@@ -38,7 +38,8 @@
@Test
public void routerShouldSendEventsToTheAppropriateHandler_ProjectListUpdate() throws Exception {
- final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false);
+ String instanceId = "instance-id";
+ final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false, instanceId);
router.route(event);
verify(projectListUpdateHandler).update(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
index e3d1ae0..28bf56d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -68,10 +68,10 @@
}
}
- public class TestEvent extends MultiSiteEvent {
+ public static class TestEvent extends MultiSiteEvent {
protected TestEvent() {
- super("test");
+ super("test", "instance-id");
}
}
@@ -93,7 +93,7 @@
@Test
public void shouldSendEventToBrokerFromGenericSourceThread() {
brokerForwarder.send(newForwarderTask(), testTopic, testEvent);
- verify(brokerMock).send(eq(testTopicName), eq(testEvent));
+ verify(brokerMock).sendSync(eq(testTopicName), eq(testEvent));
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
index 826e154..75596ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Sets;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.api.projects.ProjectInput;
import com.google.gerrit.extensions.registration.DynamicSet;
@@ -123,8 +124,10 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldEvictProjectCache() throws Exception {
- objectUnderTest.route(new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, project.get()));
+ objectUnderTest.route(
+ new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, project.get(), "instance-id"));
evictionsCacheTracker.waitForExpectedEvictions();
assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
@@ -132,6 +135,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "instance-id")
public void shouldEvictProjectCacheWithSlash() throws Exception {
ProjectInput in = new ProjectInput();
in.name = name("my/project");
@@ -141,7 +145,7 @@
restartCacheEvictionsTracking();
objectUnderTest.route(
- new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, projectNameKey.get()));
+ new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, projectNameKey.get(), "instance-id"));
evictionsCacheTracker.waitForExpectedEvictions();
assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
index 90ae8da..dbd358d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.multisite.forwarder;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.Mockito.doReturn;
import com.google.common.cache.Cache;
@@ -48,10 +49,12 @@
public void shouldThrowAnExceptionWhenCacheNotFound() throws Exception {
CacheEntry entry = new CacheEntry("somePlugin", "unexistingCache", null);
- exception.expect(CacheNotFoundException.class);
- exception.expectMessage(
- String.format("cache %s.%s not found", entry.getPluginName(), entry.getCacheName()));
- handler.evict(entry);
+ CacheNotFoundException thrown =
+ assertThrows(CacheNotFoundException.class, () -> handler.evict(entry));
+ assertThat(thrown)
+ .hasMessageThat()
+ .isEqualTo(
+ String.format("cache %s.%s not found", entry.getPluginName(), entry.getCacheName()));
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
index 32c6319..02efb77 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
@@ -61,9 +61,11 @@
@Test
public void deleteIsNotSupported() throws Exception {
- exception.expect(UnsupportedOperationException.class);
- exception.expectMessage("Delete from account index not supported");
- handler.index(id, Operation.DELETE, Optional.empty());
+ UnsupportedOperationException thrown =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> handler.index(id, Operation.DELETE, Optional.empty()));
+ assertThat(thrown).hasMessageThat().isEqualTo("Delete from account index not supported");
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index 96470b6..b827cdf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -17,6 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@@ -49,6 +50,7 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
@RunWith(MockitoJUnitRunner.class)
public class ForwardedIndexChangeHandlerTest {
@@ -62,6 +64,8 @@
private static final boolean THROW_STORAGE_EXCEPTION = true;
private static final boolean CHANGE_UP_TO_DATE = true;
private static final boolean CHANGE_OUTDATED = false;
+ private static final boolean CHANGE_CONSISTENT = true;
+ private static final boolean CHANGE_INCONSISTENT = false;
@Rule public ExpectedException exception = ExpectedException.none();
@Mock private ChangeIndexer indexerMock;
@@ -88,6 +92,7 @@
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
when(configurationMock.index()).thenReturn(index);
when(index.numStripedLocks()).thenReturn(10);
+ when(index.maxTries()).thenReturn(1);
handler =
new ForwardedIndexChangeHandler(
indexerMock, configurationMock, indexExecutorMock, ctxMock, changeCheckerFactoryMock);
@@ -95,16 +100,40 @@
@Test
public void changeIsIndexedWhenUpToDate() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
verify(indexerMock, times(1)).index(any(Change.class));
}
@Test
public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED, CHANGE_CONSISTENT);
handler.index(
- TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false)));
+ TEST_CHANGE_ID,
+ Operation.INDEX,
+ Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
+ verify(indexerMock, times(1)).index(any(Change.class));
+ }
+
+ @Test
+ public void changeIsIndexeAtFirstRetryWhenInitiallyInconsistent() throws Exception {
+ setupChangeAccessRelatedMocks(
+ CHANGE_EXISTS,
+ DO_NOT_THROW_STORAGE_EXCEPTION,
+ CHANGE_UP_TO_DATE,
+ CHANGE_INCONSISTENT,
+ CHANGE_CONSISTENT);
+ handler.index(
+ TEST_CHANGE_ID,
+ Operation.INDEX,
+ Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
+ verify(indexerMock, never()).index(any(Change.class));
+ verify(indexExecutorMock, times(1)).schedule(any(Runnable.class), anyLong(), any());
+
+ handler.index(
+ TEST_CHANGE_ID,
+ Operation.INDEX,
+ Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
verify(indexerMock, times(1)).index(any(Change.class));
}
@@ -124,9 +153,11 @@
@Test
public void indexerThrowsStorageExceptionTryingToIndexChange() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE);
- exception.expect(StorageException.class);
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
+ setupChangeAccessRelatedMocks(
+ CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
+ assertThrows(
+ StorageException.class,
+ () -> handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()));
}
@Test
@@ -175,11 +206,21 @@
private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate)
throws Exception {
- setupChangeAccessRelatedMocks(changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate);
+ setupChangeAccessRelatedMocks(
+ changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate, CHANGE_CONSISTENT);
}
private void setupChangeAccessRelatedMocks(
- boolean changeExists, boolean storageException, boolean changeIsUpToDate)
+ boolean changeExist, boolean changeUpToDate, boolean changeConsistent) throws Exception {
+ setupChangeAccessRelatedMocks(
+ changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate, changeConsistent);
+ }
+
+ private void setupChangeAccessRelatedMocks(
+ boolean changeExists,
+ boolean storageException,
+ boolean changeIsUpToDate,
+ boolean... changeConsistentReturnValues)
throws StorageException {
if (changeExists) {
when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock);
@@ -190,5 +231,11 @@
}
when(changeCheckerPresentMock.isUpToDate(any())).thenReturn(changeIsUpToDate);
+
+ OngoingStubbing<Boolean> changeConsistentCall =
+ when(changeCheckerPresentMock.isChangeConsistent());
+ for (boolean changeConsistent : changeConsistentReturnValues) {
+ changeConsistentCall = changeConsistentCall.thenReturn(changeConsistent);
+ }
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index 982ac52..b49dbfa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -74,9 +74,11 @@
@Test
public void deleteIsNotSupported() throws Exception {
- exception.expect(UnsupportedOperationException.class);
- exception.expectMessage("Delete from group index not supported");
- handler.index(uuid, Operation.DELETE, Optional.empty());
+ UnsupportedOperationException thrown =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> handler.index(uuid, Operation.DELETE, Optional.empty()));
+ assertThat(thrown).hasMessageThat().isEqualTo("Delete from group index not supported");
}
@Test
@@ -141,6 +143,6 @@
}
private Optional<GroupIndexEvent> groupIndexEvent(String uuid) {
- return Optional.of(new GroupIndexEvent(uuid, null));
+ return Optional.of(new GroupIndexEvent(uuid, null, "instance-id"));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
index 3ce5e14..72b9427 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -73,9 +73,11 @@
@Test
public void deleteIsNotSupported() throws Exception {
- exception.expect(UnsupportedOperationException.class);
- exception.expectMessage("Delete from project index not supported");
- handler.index(nameKey, Operation.DELETE, Optional.empty());
+ UnsupportedOperationException thrown =
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> handler.index(nameKey, Operation.DELETE, Optional.empty()));
+ assertThat(thrown).hasMessageThat().isEqualTo("Delete from project index not supported");
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
index 9893ce7..2412c79 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
@@ -34,6 +34,7 @@
@RunWith(MockitoJUnitRunner.class)
public class ForwardedProjectListUpdateHandlerTest {
+ private static final String INSTANCE_ID = "instance-id";
private static final String PROJECT_NAME = "someProject";
private static final String SOME_MESSAGE = "someMessage";
private static final Project.NameKey PROJECT_KEY = Project.nameKey(PROJECT_NAME);
@@ -48,13 +49,13 @@
@Test
public void testSuccessfulAdd() throws Exception {
- handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+ handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
verify(projectCacheMock).onCreateProject(PROJECT_KEY);
}
@Test
public void testSuccessfulRemove() throws Exception {
- handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+ handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
verify(projectCacheMock).remove(PROJECT_KEY);
}
@@ -72,7 +73,7 @@
.onCreateProject(PROJECT_KEY);
assertThat(Context.isForwardedEvent()).isFalse();
- handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+ handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
assertThat(Context.isForwardedEvent()).isFalse();
verify(projectCacheMock).onCreateProject(PROJECT_KEY);
@@ -92,7 +93,7 @@
.remove(PROJECT_KEY);
assertThat(Context.isForwardedEvent()).isFalse();
- handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+ handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
assertThat(Context.isForwardedEvent()).isFalse();
verify(projectCacheMock).remove(PROJECT_KEY);
@@ -113,7 +114,7 @@
RuntimeException thrown =
assertThrows(
RuntimeException.class,
- () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false)));
+ () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID)));
assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
assertThat(Context.isForwardedEvent()).isFalse();
@@ -135,7 +136,7 @@
RuntimeException thrown =
assertThrows(
RuntimeException.class,
- () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true)));
+ () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID)));
assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
assertThat(Context.isForwardedEvent()).isFalse();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
index 0739eab..f940fc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
@@ -24,9 +24,12 @@
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.RestResponse;
import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.entities.Project;
import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
import com.google.inject.Scopes;
import com.google.inject.multibindings.OptionalBinder;
import com.googlesource.gerrit.plugins.multisite.Log4jProjectVersionLogger;
@@ -56,13 +59,15 @@
private ReplicationStatus replicationStatus;
public static class TestModule extends AbstractModule {
+ @Inject WorkQueue workQueue;
+
@Override
protected void configure() {
install(new ForwarderModule());
install(new CacheModule());
install(new RouterModule());
install(new IndexModule());
- install(new ReplicationStatusModule());
+ install(new ReplicationStatusModule(workQueue));
SharedRefDbConfiguration sharedRefDbConfig =
new SharedRefDbConfiguration(new Config(), "multi-site");
bind(SharedRefDbConfiguration.class).toInstance(sharedRefDbConfig);
@@ -81,6 +86,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldSucceedForAdminUsers() throws Exception {
RestResponse result = adminRestSession.get(LAG_ENDPOINT);
@@ -89,6 +95,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldFailWhenUserHasNoAdminServerCapability() throws Exception {
RestResponse result = userRestSession.get(LAG_ENDPOINT);
result.assertForbidden();
@@ -96,6 +103,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldReturnCurrentProjectLag() throws Exception {
replicationStatus.doUpdateLag(Project.nameKey("foo"), 123L);
@@ -106,6 +114,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldReturnProjectsOrderedDescendinglyByLag() throws Exception {
replicationStatus.doUpdateLag(Project.nameKey("bar"), 123L);
replicationStatus.doUpdateLag(Project.nameKey("foo"), 3L);
@@ -118,6 +127,7 @@
}
@Test
+ @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldHonourTheLimitParameter() throws Exception {
replicationStatus.doUpdateLag(Project.nameKey("bar"), 1L);
replicationStatus.doUpdateLag(Project.nameKey("foo"), 2L);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
index 6403cce..09f8f59 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
@@ -101,7 +101,7 @@
}
private Optional<GroupIndexEvent> groupIndexEvent(String uuid, @Nullable ObjectId sha1) {
- return Optional.of(new GroupIndexEvent(uuid, sha1));
+ return Optional.of(new GroupIndexEvent(uuid, sha1, "instance-id"));
}
private void setCommitExistsInRepo(boolean commitExists) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
index 93cec05..099f1dd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
@@ -16,7 +16,7 @@
import static com.google.common.truth.Truth.assertThat;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gson.Gson;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
import java.util.UUID;
@@ -24,13 +24,14 @@
import org.junit.Test;
public class GroupEventIndexTest {
+ private static final String INSTANCE_ID = "instance-id";
private static final Gson gson = new EventGsonProvider().get();
@Test
public void groupEventIndexRoundTripWithSha1() {
String aGroupUUID = UUID.randomUUID().toString();
ObjectId anObjectId = ObjectId.fromString("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
- GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId);
+ GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId, INSTANCE_ID);
assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
}
@@ -38,7 +39,7 @@
@Test
public void groupEventIndexRoundTripWithoutSha1() {
String aGroupUUID = UUID.randomUUID().toString();
- GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null);
+ GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null, INSTANCE_ID);
assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index 660a302..8cf6ea9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -14,16 +14,21 @@
package com.googlesource.gerrit.plugins.multisite.index;
+import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
import com.googlesource.gerrit.plugins.multisite.index.IndexEventHandler.IndexProjectTask;
@@ -36,6 +41,8 @@
@RunWith(MockitoJUnitRunner.class)
public class IndexEventHandlerTest {
+ private static final String INSTANCE_ID = "instance-id";
+
private IndexEventHandler eventHandler;
@Mock private ProjectsFilter projectsFilter;
@@ -50,7 +57,8 @@
asDynamicSet(forwarder),
changeChecker,
projectsFilter,
- new TestGroupChecker(true));
+ new TestGroupChecker(true),
+ INSTANCE_ID);
}
private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -65,7 +73,7 @@
eventHandler.onProjectIndexed("test_project");
verify(forwarder, never())
- .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project")));
+ .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
}
@Test
@@ -76,4 +84,25 @@
eventHandler.onChangeIndexed("test_project", changeId);
verifyZeroInteractions(changeChecker);
}
+
+ @Test
+ public void shouldNotForwardIndexChangeIfCurrentThreadIsPullReplicationApplyObject()
+ throws Exception {
+ String currentThreadName = Thread.currentThread().getName();
+ try {
+ Thread.currentThread().setName("pull-replication~" + APPLY_OBJECT_API_ENDPOINT);
+ int changeId = 1;
+ Context.setForwardedEvent(false);
+ lenient().when(projectsFilter.matches(any(String.class))).thenReturn(true);
+ lenient()
+ .when(changeChecker.create(anyString()))
+ .thenThrow(
+ new IllegalStateException("Change indexing event should have not been triggered"));
+
+ eventHandler.onChangeIndexed("test_project", changeId);
+ verifyNoInteractions(changeChecker);
+ } finally {
+ Thread.currentThread().setName(currentThreadName);
+ }
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java
new file mode 100644
index 0000000..fc2259f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefLogger;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.Ignore;
+
+@Ignore
+public class DisabledSharedRefLogger implements SharedRefLogger {
+
+ @Override
+ public void logRefUpdate(String project, Ref currRef, ObjectId newRefValue) {}
+
+ @Override
+ public void logProjectDelete(String project) {}
+
+ @Override
+ public void logLockAcquisition(String project, String refName) {}
+
+ @Override
+ public void logLockRelease(String project, String refName) {}
+
+ @Override
+ public <T> void logRefUpdate(String project, String refName, T currRef, T newRefValue) {}
+
+ @Override
+ public <T> void logRefUpdate(String project, String refName, T newRefValue) {}
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/FakeSharedRefDatabaseWrapper.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/FakeSharedRefDatabaseWrapper.java
new file mode 100644
index 0000000..bf98a4b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/FakeSharedRefDatabaseWrapper.java
@@ -0,0 +1,84 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDBMetrics;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import java.util.Arrays;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.Ignore;
+
+@Ignore
+public class FakeSharedRefDatabaseWrapper extends SharedRefDatabaseWrapper {
+
+ public FakeSharedRefDatabaseWrapper(String... rejectedRefs) {
+
+ super(
+ DynamicItem.itemOf(
+ GlobalRefDatabase.class,
+ new GlobalRefDatabase() {
+
+ @Override
+ public boolean isUpToDate(Project.NameKey project, Ref ref)
+ throws GlobalRefDbLockException {
+ return !Arrays.stream(rejectedRefs).anyMatch(r -> r.equals(ref.getName()));
+ }
+
+ @Override
+ public boolean exists(Project.NameKey project, String refName) {
+ return true;
+ }
+
+ @Override
+ public boolean compareAndPut(
+ Project.NameKey project, Ref currRef, ObjectId newRefValue)
+ throws GlobalRefDbSystemError {
+ return false;
+ }
+
+ @Override
+ public <T> boolean compareAndPut(
+ Project.NameKey project, String refName, T currValue, T newValue)
+ throws GlobalRefDbSystemError {
+ return false;
+ }
+
+ @Override
+ public AutoCloseable lockRef(Project.NameKey project, String refName)
+ throws GlobalRefDbLockException {
+ return null;
+ }
+
+ @Override
+ public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+ @Override
+ public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+ throws GlobalRefDbSystemError {
+ return Optional.empty();
+ }
+ }),
+ new DisabledSharedRefLogger(),
+ new SharedRefDBMetrics(new DisabledMetricMaker()));
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilterTest.java
new file mode 100644
index 0000000..e09f6f6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilterTest.java
@@ -0,0 +1,258 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
+import java.util.Optional;
+import java.util.Set;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MultisiteReplicationFetchFilterTest extends LocalDiskRepositoryTestCase
+ implements RefFixture {
+
+ @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
+
+ @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
+ @Mock Configuration config;
+ @Mock Configuration.ReplicationFilter replicationFilterConfig;
+
+ @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+ String project = A_TEST_PROJECT_NAME;
+ Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+ private TestRepository<InMemoryRepository> repo;
+
+ @Before
+ public void setupTestRepo() throws Exception {
+ InMemoryRepository inMemoryRepo =
+ gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+ repo = new TestRepository<>(inMemoryRepo);
+ doReturn(replicationFilterConfig).when(config).replicationFilter();
+ }
+
+ @Test
+ public void shouldReturnEmptyRefsWhenAllUpToDate() throws Exception {
+ String fooRefName = "refs/heads/foo";
+ ObjectId fooObjectId = newRef(fooRefName).getId();
+ String barRefName = "refs/heads/bar";
+ ObjectId barObjectId = newRef(barRefName).getId();
+ Set<String> refs = Set.of(fooRefName, barRefName);
+
+ doReturn(Optional.of(fooObjectId.getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(fooRefName), eq(String.class));
+ doReturn(Optional.of(barObjectId.getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(barRefName), eq(String.class));
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefs = fetchFilter.filter(project, refs);
+
+ assertThat(filteredRefs).isEmpty();
+ }
+
+ @Test
+ public void shouldFilterOutOneUpToDateRef() throws Exception {
+ String refUpToDate = "refs/heads/uptodate";
+ String outdatedRef = "refs/heads/outdated";
+ ObjectId upToDateObjectId = newRef(refUpToDate).getId();
+ newRef(outdatedRef).getId();
+ Set<String> refsToFetch = Set.of(refUpToDate, outdatedRef);
+
+ doReturn(Optional.of(upToDateObjectId.getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(refUpToDate), eq(String.class));
+ doReturn(Optional.of(AN_OUTDATED_OBJECT_ID.getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(outdatedRef), eq(String.class));
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).containsExactly(outdatedRef);
+ }
+
+ @Test
+ public void shouldLoadLocalVersionAndFilterOut() throws Exception {
+ String temporaryOutdated = "refs/heads/temporaryOutdated";
+ RevCommit localRef = newRef(temporaryOutdated);
+
+ Set<String> refsToFetch = Set.of(temporaryOutdated);
+ doReturn(Optional.empty())
+ .doReturn(Optional.of(localRef.getId().getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).isEmpty();
+
+ verify(sharedRefDatabaseMock, times(2)).get(any(), any(), any());
+ }
+
+ @Test
+ public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+ String temporaryOutdated = "refs/heads/temporaryOutdated";
+ newRef(temporaryOutdated);
+
+ Set<String> refsToFetch = Set.of(temporaryOutdated);
+ doReturn(Optional.of(AN_OUTDATED_OBJECT_ID.getName()))
+ .doReturn(Optional.of(AN_OUTDATED_OBJECT_ID.getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(1);
+ verify(sharedRefDatabaseMock, times(3))
+ .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+ }
+
+ @Test
+ public void shouldNotFilterOutWhenMissingInTheSharedRefDb() throws Exception {
+ String temporaryOutdated = "refs/heads/temporaryOutdated";
+ newRef(temporaryOutdated);
+
+ doReturn(Optional.empty())
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+ Set<String> refsToFetch = Set.of(temporaryOutdated);
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(1);
+ }
+
+ @Test
+ public void shouldNotFilterOutWhenRefsMultisiteVersionIsPresentInSharedRefDb() throws Exception {
+ String refsMultisiteVersionRef = ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
+ RevCommit multiSiteVersionRef = newRef(refsMultisiteVersionRef);
+
+ doReturn(Optional.of(multiSiteVersionRef.getId().getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(refsMultisiteVersionRef), eq(String.class));
+
+ Set<String> refsToFetch = Set.of(refsMultisiteVersionRef);
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(1);
+ }
+
+ @Test
+ public void shouldFilterOutWhenRefIsDeletedInTheSharedRefDb() throws Exception {
+ String temporaryOutdated = "refs/heads/temporaryOutdated";
+ newRef(temporaryOutdated);
+
+ Set<String> refsToFetch = Set.of(temporaryOutdated);
+ doReturn(Optional.of(ObjectId.zeroId().getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(0);
+ verify(sharedRefDatabaseMock).get(eq(projectName), any(), any());
+ }
+
+ @Test
+ public void shouldNotFilterOutWhenRefIsMissingOnlyInTheLocalRepository() throws Exception {
+ String refObjectId = "0000000000000000000000000000000000000001";
+ String nonExistingLocalRef = "refs/heads/temporaryOutdated";
+
+ Set<String> refsToFetch = Set.of(nonExistingLocalRef);
+ doReturn(Optional.of(refObjectId))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), any(), any());
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(1);
+ }
+
+ @Test
+ public void shouldNotFilterOutRefThatDoesntExistLocallyOrInSharedRefDb() throws Exception {
+ String nonExisting = "refs/heads/non-existing-ref";
+
+ Set<String> refsToFetch = Set.of(nonExisting);
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(1);
+ }
+
+ @Test
+ public void shouldFilterOutRefMissingInTheLocalRepositoryAndDeletedInSharedRefDb()
+ throws Exception {
+ String nonExistingLocalRef = "refs/heads/temporaryOutdated";
+
+ Set<String> refsToFetch = Set.of(nonExistingLocalRef);
+ doReturn(Optional.of(ObjectId.zeroId().getName()))
+ .when(sharedRefDatabaseMock)
+ .get(eq(projectName), any(), any());
+
+ MultisiteReplicationFetchFilter fetchFilter =
+ new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+ assertThat(filteredRefsToFetch).hasSize(0);
+ }
+
+ private RevCommit newRef(String refName) throws Exception {
+ return repo.branch(refName).commit().create();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java
new file mode 100644
index 0000000..b4a5e5d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java
@@ -0,0 +1,159 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MultisiteReplicationPushFilterTest extends LocalDiskRepositoryTestCase
+ implements RefFixture {
+
+ @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
+
+ @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
+ @Mock Configuration config;
+ @Mock Configuration.ReplicationFilter replicationFilterConfig;
+
+ @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+ String project = A_TEST_PROJECT_NAME;
+ Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+ private TestRepository<InMemoryRepository> repo;
+
+ @Before
+ public void setupTestRepo() throws Exception {
+ InMemoryRepository inMemoryRepo =
+ gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+ doReturn(replicationFilterConfig).when(config).replicationFilter();
+ repo = new TestRepository<>(inMemoryRepo);
+ }
+
+ @Test
+ public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
+ List<RemoteRefUpdate> refUpdates =
+ Arrays.asList(refUpdate("refs/heads/foo"), refUpdate("refs/heads/bar"));
+ doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
+ }
+
+ @Test
+ public void shouldFilterOutOneOutdatedRef() throws Exception {
+ RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+ RemoteRefUpdate outdatedRef = refUpdate("refs/heads/outdated");
+ List<RemoteRefUpdate> refUpdates = Arrays.asList(refUpToDate, outdatedRef);
+ SharedRefDatabaseWrapper sharedRefDatabase =
+ new FakeSharedRefDatabaseWrapper(outdatedRef.getSrcRef());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager, config);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).containsExactly(refUpToDate);
+ }
+
+ @Test
+ public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+ String refName = "refs/heads/temporaryOutdated";
+ RemoteRefUpdate temporaryOutdated = refUpdate(refName);
+ ObjectId latestObjectId = repo.getRepository().exactRef(refName).getObjectId();
+
+ List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+ doReturn(false).doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).hasSize(1);
+ assertThat(filteredRefUpdates.get(0).getNewObjectId()).isEqualTo(latestObjectId);
+
+ verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+ }
+
+ @Test
+ public void shouldLoadLocalVersionAndFilter() throws Exception {
+ RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+ repo.branch("refs/heads/temporaryOutdated").commit().create();
+ List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+ doReturn(false).doReturn(false).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).isEmpty();
+ verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+ }
+
+ @Test
+ public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
+ RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+ RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
+ RemoteRefUpdate changeMetaRef = refUpdate("refs/changes/12/4512/meta");
+ RemoteRefUpdate changeRef = refUpdate("refs/changes/12/4512/1");
+ List<RemoteRefUpdate> refUpdates =
+ Arrays.asList(refUpToDate, refChangeUpToDate, changeMetaRef, changeRef);
+ SharedRefDatabaseWrapper sharedRefDatabase =
+ new FakeSharedRefDatabaseWrapper(changeMetaRef.getSrcRef());
+
+ MultisiteReplicationPushFilter pushFilter =
+ new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager, config);
+ List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+ assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
+ }
+
+ private RemoteRefUpdate refUpdate(String refName) throws Exception {
+ ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
+ Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+ repo.branch(refName).commit().create();
+ return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index 88d2270..d99fb4b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -18,16 +18,21 @@
import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.base.Suppliers;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.data.RefUpdateAttribute;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
@@ -95,16 +100,9 @@
Context.setForwardedEvent(false);
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
- ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
- String.class))
- .thenReturn(Optional.of("26f7ee61bf0e470e8393c884526eec8a9b943a63"));
- when(sharedRefDb.get(
- A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
String.class))
.thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
- when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class)))
- .thenReturn(true);
when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
.thenReturn(true);
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
@@ -129,6 +127,35 @@
}
@Test
+ public void producerShouldUsePutInsteadOfCompareAndPutWhenExtendedGlobalRefDb()
+ throws IOException {
+ when(sharedRefDb.isSetOperationSupported()).thenReturn(true);
+ RefUpdatedEvent refUpdatedEvent = new RefUpdatedEvent();
+ RefUpdateAttribute refUpdatedAttribute = new RefUpdateAttribute();
+ refUpdatedAttribute.project = A_TEST_PROJECT_NAME_KEY.get();
+ refUpdatedAttribute.refName = A_TEST_REF_NAME;
+ refUpdatedEvent.refUpdate = Suppliers.memoize(() -> refUpdatedAttribute);
+
+ new ProjectVersionRefUpdateImpl(
+ repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+ .onEvent(refUpdatedEvent);
+
+ Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+
+ verify(sharedRefDb, never())
+ .compareAndPut(any(Project.NameKey.class), anyString(), anyLong(), anyLong());
+
+ verify(sharedRefDb).put(any(Project.NameKey.class), any(String.class), any(String.class));
+ assertThat(ref).isNotNull();
+
+ ObjectLoader loader = repo.getRepository().open(ref.getObjectId());
+ long storedVersion = readLongObject(loader);
+ assertThat(storedVersion).isGreaterThan((long) masterCommit.getCommitTime());
+
+ verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, storedVersion, 0);
+ }
+
+ @Test
public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception {
Context.setForwardedEvent(false);
@@ -137,27 +164,20 @@
Thread.sleep(1000L);
repo.branch("master").update(masterCommit);
-
- when(sharedRefDb.get(
- A_TEST_PROJECT_NAME_KEY,
- ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
- String.class))
- .thenReturn(Optional.of("26f7ee61bf0e470e8393c884526eec8a9b943a63"));
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
String.class))
.thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
- when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class)))
- .thenReturn(true);
when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
.thenReturn(true);
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
- new ProjectVersionRefUpdateImpl(
- repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
- .onEvent(refUpdatedEvent);
+ ProjectVersionRefUpdateImpl projectVersion =
+ new ProjectVersionRefUpdateImpl(
+ repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter);
+ projectVersion.onEvent(refUpdatedEvent);
Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
@@ -168,6 +188,10 @@
ObjectLoader loader = repo.getRepository().open(ref.getObjectId());
long storedVersion = readLongObject(loader);
+
+ Optional<Long> localStoredVersion = projectVersion.getProjectLocalVersion(A_TEST_PROJECT_NAME);
+ assertThat(localStoredVersion).isEqualTo(Optional.of(storedVersion));
+
assertThat(storedVersion).isGreaterThan((long) masterPlusOneCommit.getCommitTime());
verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, storedVersion, 0);
@@ -179,17 +203,10 @@
Context.setForwardedEvent(false);
when(sharedRefDb.get(
A_TEST_PROJECT_NAME_KEY,
- ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
- String.class))
- .thenReturn(Optional.empty());
- when(sharedRefDb.get(
- A_TEST_PROJECT_NAME_KEY,
ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
String.class))
.thenReturn(Optional.empty());
- when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class)))
- .thenReturn(true);
when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
.thenReturn(true);
when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java
index baba94b..1885d27 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java
@@ -33,6 +33,8 @@
static final ObjectId AN_OBJECT_ID_1 = new ObjectId(1, 2, 3, 4, 5);
static final ObjectId AN_OBJECT_ID_2 = new ObjectId(1, 2, 3, 4, 6);
static final ObjectId AN_OBJECT_ID_3 = new ObjectId(1, 2, 3, 4, 7);
+ static final ObjectId AN_OUTDATED_OBJECT_ID =
+ ObjectId.fromString("da37cb098dd7df4c8662ae8afc1acae5f7567775");
static final String A_TEST_REF_NAME = "refs/heads/master";
static final String A_REF_NAME_OF_A_PATCHSET = "refs/changes/01/1/1";