Merge branch 'stable-3.3' into stable-3.4

* stable-3.3:
  Specify cache.threads = 0 in the multi-site config documentation

Change-Id: If053c76ec9f77029103d30c91888b309fa3ef221
diff --git a/BUILD b/BUILD
index d7bd3d8..d95dc33 100644
--- a/BUILD
+++ b/BUILD
@@ -18,9 +18,10 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
+        ":events-broker-neverlink",
+        ":global-refdb-neverlink",
+        ":pull-replication-neverlink",
         ":replication-neverlink",
-        "@events-broker//jar",
-        "@global-refdb//jar",
     ],
 )
 
@@ -30,6 +31,24 @@
     exports = ["//plugins/replication"],
 )
 
+java_library(
+    name = "pull-replication-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/pull-replication"],
+)
+
+java_library(
+    name = "events-broker-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/events-broker"],
+)
+
+java_library(
+    name = "global-refdb-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/global-refdb"],
+)
+
 junit_tests(
     name = "multi_site_tests",
     srcs = glob(["src/test/java/**/*.java"]),
@@ -49,8 +68,9 @@
     visibility = ["//visibility:public"],
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":multi-site__plugin",
-        "@global-refdb//jar",
-        "@events-broker//jar",
+        "//plugins/events-broker",
+        "//plugins/global-refdb",
+        "//plugins/pull-replication",
         "//plugins/replication",
     ],
 )
@@ -74,12 +94,6 @@
     srcs = [
         "e2e-tests/test.sh",
     ],
-    data = [
-        "//plugins/multi-site",
-        "//plugins/multi-site:e2e_multi_site_test_dir",
-        "//plugins/multi-site:e2e_multi_site_setup_local_env_dir",
-        "external_plugin_deps.bzl",
-    ] + glob(["setup_local_env/**/*"]) + glob(["e2e-tests/**/*"]),
     args = [
         "--multisite-lib-file $(location //plugins/multi-site)",
         "--healthcheck-interval 5s",
@@ -88,6 +102,11 @@
         "--location '$(location //plugins/multi-site:e2e_multi_site_test_dir)'",
         "--local-env '$(location //plugins/multi-site:e2e_multi_site_setup_local_env_dir)'",
     ],
+    data = [
+        "//plugins/multi-site",
+        "//plugins/multi-site:e2e_multi_site_test_dir",
+        "//plugins/multi-site:e2e_multi_site_setup_local_env_dir",
+    ] + glob(["setup_local_env/**/*"]) + glob(["e2e-tests/**/*"]),
     tags = [
         "e2e-multi-site",
     ],
diff --git a/Jenkinsfile b/Jenkinsfile
index f008749..6ef43af 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,4 @@
 pluginPipeline(formatCheckId: 'gerritforge:multi-site-format-47168e90078b0b3f11401610930e82830e76bff7',
-               buildCheckId: 'gerritforge:multi-site-47168e90078b0b3f11401610930e82830e76bff7')
+               buildCheckId: 'gerritforge:multi-site-47168e90078b0b3f11401610930e82830e76bff7',
+               extraPlugins: [ 'pull-replication' ],
+               extraModules: [ 'events-broker', 'global-refdb' ])
diff --git a/README.md b/README.md
index 79094d3..f683f4e 100644
--- a/README.md
+++ b/README.md
@@ -40,8 +40,6 @@
 
 cd gerrit/plugins
 ln -s ../../multi-site .
-rm external_plugin_deps.bzl
-ln -s multi-site/external_plugin_deps.bzl .
 ```
 
 Example of building the multi-site plugin:
@@ -65,6 +63,21 @@
 daemon running (/var/run/docker.sock accessible) or a DOCKER_HOST pointing to
 a Docker server.
 
+## Pre-requisites
+
+Each Gerrit server of the cluster must be identified with a globally unique
+[instance-id](https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.5/config-gerrit.html#gerrit.instanceId)
+defined in `$GERRIT_SITE/etc/gerrit.config`.
+When migrating from a multi-site configuration with Gerrit v3.3 or earlier,
+you must reuse the instance-id value stored under `$GERRIT_SITE/data/multi-site`.
+
+Example:
+
+```
+[gerrit]
+  instanceId = 758fe5b7-1869-46e6-942a-3ae0ae7e3bd2
+```
+
 ## How to configure
 
 Install the multi-site plugin into the `$GERRIT_SITE/lib` directory of all
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh
index 19c8b04..3ecb9fd 100755
--- a/e2e-tests/test.sh
+++ b/e2e-tests/test.sh
@@ -16,11 +16,11 @@
 
 LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
 LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
 GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar
-DEF_GERRIT_IMAGE=3.3.6-centos8
+DEF_GERRIT_IMAGE=3.4.0-centos8
 DEF_GERRIT_HEALTHCHECK_START_PERIOD=60s
 DEF_GERRIT_HEALTHCHECK_INTERVAL=5s
 DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
@@ -64,7 +64,7 @@
 
   export BROKER_HOST=$2
   export BROKER_PORT=$3
-  export GROUP_ID=$4
+  export INSTANCE_ID=$4
   export SSH_ADVERTISED_PORT=$5
   export LOCATION_TEST_SITE=/var/gerrit
   export REMOTE_DEBUG_PORT=5005
@@ -196,8 +196,6 @@
 done
 
 # Defaults
-EVENTS_BROKER_VER=`grep 'com.gerritforge:events-broker' ${LOCATION}/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
-GLOBAL_REFDB_VER=`grep 'com.gerritforge:global-refdb' ${LOCATION}/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
 DEPLOYMENT_LOCATION=$(mktemp -d || $(echo >&2 "Could not create temp dir" && exit 1))
 MULTISITE_LIB_LOCATION=${MULTISITE_LIB_LOCATION:-${DEF_MULTISITE_LOCATION}}
 BROKER_TYPE=${BROKER_TYPE:-"kafka"}
@@ -261,13 +259,11 @@
 docker cp ${CONTAINER_NAME}:/var/gerrit/plugins/replication.jar $COMMON_LIBS/
 docker rm -fv ${CONTAINER_NAME}
 
-echo "Downloading global-refdb library $GERRIT_BRANCH"
-wget https://repo1.maven.org/maven2/com/gerritforge/global-refdb/$GLOBAL_REFDB_VER/global-refdb-$GLOBAL_REFDB_VER.jar \
-  -O $COMMON_LIBS/global-refdb.jar || { echo >&2 "Cannot download global-refdb library: Check internet connection. Aborting"; exit 1; }
+echo "Copying global-refdb library $GERRIT_BRANCH"
+cp bazel-bin/plugins/global-refdb/global-refdb.jar $COMMON_LIBS/global-refdb.jar
 
 echo "Downloading events-broker library $GERRIT_BRANCH"
-wget https://repo1.maven.org/maven2/com/gerritforge/events-broker/$EVENTS_BROKER_VER/events-broker-$EVENTS_BROKER_VER.jar \
-  -O $COMMON_LIBS/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Aborting"; exit 1; }
+cp bazel-bin/plugins/events-broker/events-broker.jar $COMMON_LIBS/events-broker.jar
 
 echo "Setting up directories"
 mkdir -p ${GERRIT_1_ETC} ${GERRIT_1_PLUGINS} ${GERRIT_1_LIBS} ${GERRIT_2_ETC} ${GERRIT_2_PLUGINS} ${GERRIT_2_LIBS}
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
deleted file mode 100644
index 223778e..0000000
--- a/external_plugin_deps.bzl
+++ /dev/null
@@ -1,14 +0,0 @@
-load("//tools/bzl:maven_jar.bzl", "maven_jar")
-
-def external_plugin_deps():
-    maven_jar(
-        name = "global-refdb",
-        artifact = "com.gerritforge:global-refdb:3.3.1",
-        sha1 = "5df9dddad2fc67c922406f41549186b210cd957e",
-    )
-
-    maven_jar(
-        name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.2",
-        sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
-    )
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index 88b40d5..e6ec9e7 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -74,8 +74,6 @@
 [--gerrit2-httpd-port]          Gerrit Instance 2 http port; default 18081
 [--gerrit2-sshd-port]           Gerrit Instance 2 sshd port; default 49418
 
-[--replication-type]            Options [file,ssh]; default file
-[--replication-ssh-user]        SSH user for the replication plugin; default $(whoami)
 [--replication-delay]           Replication delay across the two instances in seconds
 
 [--just-cleanup-env]            Cleans up previous deployment; default false
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 2cd8c9d..8a176f2 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -5,6 +5,7 @@
     installModule = com.gerritforge.gerrit.eventbroker.BrokerApiModule # events-broker module to setup BrokerApi dynamic item
     installModule = com.googlesource.gerrit.plugins.multisite.Module # multi-site needs to be a gerrit lib
     installDbModule = com.googlesource.gerrit.plugins.multisite.GitModule
+    instanceId = $INSTANCE_ID
 [database]
     type = h2
     database = $LOCATION_TEST_SITE/db/ReviewDB
@@ -43,7 +44,7 @@
 [plugin "events-kafka"]
     sendAsync = true
     bootstrapServers = $BROKER_HOST:$BROKER_PORT
-    groupId = $GROUP_ID
+    groupId = $INSTANCE_ID
     numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
@@ -55,12 +56,12 @@
     pollingIntervalMs = 1000
     region = us-east-1
     endpoint = http://localhost:$BROKER_PORT
-    applicationName = $GROUP_ID
+    applicationName = $INSTANCE_ID
     initialPosition = trim_horizon
 [plugin "events-gcloud-pubsub"]
     numberOfSubscribers = 6
     gcloudProject="test-project"
-    subscriptionId=$GROUP_ID
+    subscriptionId=$INSTANCE_ID
     privateKeyLocation="not used in local mode"
 [plugin "metrics-reporter-prometheus"]
     prometheusBearerToken = token
diff --git a/setup_local_env/configs/replication.config b/setup_local_env/configs/replication.config
index ece1b3e..1c2aac4 100644
--- a/setup_local_env/configs/replication.config
+++ b/setup_local_env/configs/replication.config
@@ -1,6 +1,7 @@
-[remote "Replication"]
-    $REPLICATION_URL
-    push = +refs/*:refs/*
+[remote "$REPLICA_INSTANCE_ID"]
+    $PULL_REPLICATION_URL
+    $PULL_REPLICATION_API_URL
+    fetch = +refs/*:refs/*
     mirror = true
     timeout = 600
     rescheduleDelay = 15
@@ -13,4 +14,8 @@
     replicateOnStartup = false
 [replication]
     lockErrorMaxRetries = 5
-    maxRetries = 5
\ No newline at end of file
+    maxRetries = 5
+    useCGitClient = false
+    consumeStreamEvents = false
+    syncRefs="ALL REFS ASYNC"
+    maxApiPayloadSize=40000
diff --git a/setup_local_env/configs/secure.config b/setup_local_env/configs/secure.config
new file mode 100644
index 0000000..faeb659
--- /dev/null
+++ b/setup_local_env/configs/secure.config
@@ -0,0 +1,3 @@
+[auth]
+	registerEmailPrivateKey = A3D00Ny0bLW2z7xdInvIf35Q5mDCi5u9o/E=
+	bearertoken = some-bearer-token
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 11ae04f..247a721 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,17 +16,15 @@
 
 
 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
 GERRIT_CI=https://archive-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
-EVENTS_BROKER_VER=`grep 'com.gerritforge:events-broker' $(dirname $0)/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
-GLOBAL_REFDB_VER=`grep 'com.gerritforge:global-refdb' $(dirname $0)/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
 
 function check_application_requirements {
   type haproxy >/dev/null 2>&1 || { echo >&2 "Require haproxy but it's not installed. Aborting."; exit 1; }
   type java >/dev/null 2>&1 || { echo >&2 "Require java but it's not installed. Aborting."; exit 1; }
   type docker >/dev/null 2>&1 || { echo >&2 "Require docker but it's not installed. Aborting."; exit 1; }
-  type docker-compose >/dev/null 2>&1 || { echo >&2 "Require docker-compose but it's not installed. Aborting."; exit 1; }
+  [ $($SUDO docker --version | awk '{print $3}' | cut -d '.' -f 1) -ge 20 ] || { echo >&2 "Require docker v20 or later. Aborting."; exit 1; }
   type wget >/dev/null 2>&1 || { echo >&2 "Require wget but it's not installed. Aborting."; exit 1; }
   type envsubst >/dev/null 2>&1 || { echo >&2 "Require envsubst but it's not installed. Aborting."; exit 1; }
   type openssl >/dev/null 2>&1 || { echo >&2 "Require openssl but it's not installed. Aborting."; exit 1; }
@@ -35,16 +33,16 @@
   fi
 }
 
-function get_replication_url {
-  REPLICATION_LOCATION_TEST_SITE=$1
-  REPLICATION_HOSTNAME=$2
-  USER=$REPLICATION_SSH_USER
+function get_pull_replication_api_url {
+  REPLICATION_HOSTNAME=$1
 
-  if [ "$REPLICATION_TYPE" = "file" ];then
-    echo "url = file://$REPLICATION_LOCATION_TEST_SITE/git/#{name}#.git"
-  elif [ "$REPLICATION_TYPE" = "ssh" ];then
-    echo "url = ssh://$USER@$REPLICATION_HOSTNAME:$REPLICATION_LOCATION_TEST_SITE/git/#{name}#.git"
-  fi
+  echo "apiUrl = http://$REPLICATION_HOSTNAME:$REPLICATION_HTTPD_PORT"
+}
+
+function get_pull_replication_url {
+  REPLICATION_HOSTNAME=$1
+
+  echo "url = http://$REPLICATION_HOSTNAME:$REPLICATION_HTTPD_PORT/#{name}#.git"
 }
 
 function deploy_tls_certificates {
@@ -66,12 +64,13 @@
     export LOCATION_TEST_SITE=$3
     export GERRIT_SSHD_PORT=$4
     export REPLICATION_HTTPD_PORT=$5
-    export REPLICATION_LOCATION_TEST_SITE=$6
-    export GERRIT_HOSTNAME=$7
-    export REPLICATION_HOSTNAME=$8
-    export REMOTE_DEBUG_PORT=$9
-    export GROUP_ID=${10}
-    export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
+    export GERRIT_HOSTNAME=$6
+    export REPLICATION_HOSTNAME=$7
+    export REMOTE_DEBUG_PORT=$8
+    export INSTANCE_ID=${9}
+    export REPLICA_INSTANCE_ID=${10}
+    export PULL_REPLICATION_URL=$(get_pull_replication_url $REPLICATION_HOSTNAME)
+    export PULL_REPLICATION_API_URL=$(get_pull_replication_api_url $REPLICATION_HOSTNAME)
 
     echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
 
@@ -126,25 +125,25 @@
   GERRIT_SITE1_SSHD_PORT=$3
   CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
   GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
-  GERRIT_SITE1_GROUP_ID="instance-1"
+  GERRIT_SITE1_INSTANCE_ID="instance-1"
   # SITE 2
   GERRIT_SITE2_HOSTNAME=$4
   GERRIT_SITE2_HTTPD_PORT=$5
   GERRIT_SITE2_SSHD_PORT=$6
   CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
   GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
-  GERRIT_SITE2_GROUP_ID="instance-2"
+  GERRIT_SITE2_INSTANCE_ID="instance-2"
 
   # Set config SITE1
-  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_INSTANCE_ID $GERRIT_SITE2_INSTANCE_ID
 
 
   # Set config SITE2
-  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_INSTANCE_ID $GERRIT_SITE1_INSTANCE_ID
 }
 
 function is_docker_desktop {
-  echo $(docker info | grep "Operating System: Docker Desktop" | wc -l)
+  echo $($SUDO docker info | grep "Operating System: Docker Desktop" | wc -l)
 }
 
 function docker_host_env {
@@ -183,9 +182,10 @@
   kill $(ps -ax | grep haproxy | grep "gerrit_setup/ha-proxy-config" | awk '{print $1}') 2> /dev/null
 
   echo "Stopping $BROKER_TYPE docker container"
-  docker-compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" down 2> /dev/null
+  printenv > "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.env"
+  $SUDO docker compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" --env-file "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.env" down 2> /dev/null
   echo "Stopping core docker containers"
-  docker-compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" down 2> /dev/null
+  $SUDO docker compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" --env-file "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.env" down 2> /dev/null
 
   echo "Stopping GERRIT instances"
   $1/bin/gerrit.sh stop 2> /dev/null
@@ -197,7 +197,7 @@
 
 function check_if_container_is_running {
   local container=$1;
-  echo $(docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
+  echo $($SUDO docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
 }
 
 function ensure_docker_compose_is_up_and_running {
@@ -207,8 +207,9 @@
 
   local is_container_running=$(check_if_container_is_running "$container_name")
   if [ "$is_container_running" -lt 1 ];then
+    printenv > "${SCRIPT_DIR}/${docker_compose_file}.env"
     echo "[$log_label] Starting docker containers"
-    docker-compose -f "${SCRIPT_DIR}/${docker_compose_file}" up -d
+    $SUDO docker compose -f "${SCRIPT_DIR}/${docker_compose_file}" --env-file "${SCRIPT_DIR}/${docker_compose_file}.env" up -d
 
     echo "[$log_label] Waiting for docker containers to start..."
     while [[ $(check_if_container_is_running "$container_name") -lt 1 ]];do sleep 10s; done
@@ -231,6 +232,8 @@
     echo
     echo "[--release-war-file]            Location to release.war file"
     echo "[--multisite-lib-file]          Location to lib multi-site.jar file"
+    echo "[--eventsbroker-lib-file]       Location to lib events-broker.jar file"
+    echo "[--globalrefdb-lib-file]        Location to lib global-refdb.jar file"
     echo
     echo "[--new-deployment]              Cleans up previous gerrit deployment and re-installs it. default true"
     echo "[--get-websession-plugin]       Download websession-broker plugin from CI lastSuccessfulBuild; default true"
@@ -247,8 +250,6 @@
     echo "[--gerrit2-httpd-port]          Gerrit Instance 2 http port; default 18081"
     echo "[--gerrit2-sshd-port]           Gerrit Instance 2 sshd port; default 49418"
     echo
-    echo "[--replication-type]            Options [file,ssh]; default ssh"
-    echo "[--replication-ssh-user]        SSH user for the replication plugin; default $(whoami)"
     echo "[--replication-delay]           Replication delay across the two instances in seconds"
     echo
     echo "[--just-cleanup-env]            Cleans up previous deployment; default false"
@@ -257,6 +258,8 @@
     echo
     echo "[--broker-type]                 events broker type; 'kafka', 'kinesis' or 'gcloud-pubsub'. Default 'kafka'"
     echo
+    echo "[--sudo]                        run docker commands with sudo"
+    echo
     exit 0
   ;;
   "--new-deployment")
@@ -284,6 +287,16 @@
     shift
     shift
   ;;
+  "--eventsbroker-lib-file" )
+    EVENTS_BROKER_LIB_LOCATION=$2
+    shift
+    shift
+  ;;
+  "--globalrefdb-lib-file" )
+    GLOBAL_REFDB_LIB_LOCATION=$2
+    shift
+    shift
+  ;;
   "--gerrit-canonical-host" )
     export GERRIT_CANONICAL_HOSTNAME=$2
     shift
@@ -319,16 +332,6 @@
     shift
     shift
   ;;
-  "--replication-ssh-user" )
-    export REPLICATION_SSH_USER=$2
-    shift
-    shift
-  ;;
-  "--replication-type")
-    export REPLICATION_TYPE=$2
-    shift
-    shift
-  ;;
   "--replication-delay")
     export REPLICATION_DELAY_SEC=$2
     shift
@@ -353,6 +356,11 @@
       exit 1
     fi
   ;;
+  "--sudo" )
+    SUDO=sudo
+    shift
+    shift
+  ;;
   *     )
     echo "Unknown option argument: $1"
     shift
@@ -376,8 +384,6 @@
 GERRIT_2_HTTPD_PORT=${GERRIT_2_HTTPD_PORT:-"18081"}
 GERRIT_1_SSHD_PORT=${GERRIT_1_SSHD_PORT:-"39418"}
 GERRIT_2_SSHD_PORT=${GERRIT_2_SSHD_PORT:-"49418"}
-REPLICATION_TYPE=${REPLICATION_TYPE:-"file"}
-REPLICATION_SSH_USER=${REPLICATION_SSH_USER:-$(whoami)}
 export REPLICATION_DELAY_SEC=${REPLICATION_DELAY_SEC:-"5"}
 export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
 HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
@@ -392,7 +398,8 @@
 
 RELEASE_WAR_FILE_LOCATION=${RELEASE_WAR_FILE_LOCATION:-bazel-bin/release.war}
 MULTISITE_LIB_LOCATION=${MULTISITE_LIB_LOCATION:-bazel-bin/plugins/multi-site/multi-site.jar}
-
+EVENTS_BROKER_LIB_LOCATION=${EVENTS_BROKER_LIB_LOCATION:-bazel-bin/plugins/events-broker/events-broker.jar}
+GLOBAL_REFDB_LIB_LOCATION=${GLOBAL_REFDB_LIB_LOCATION:-bazel-bin/plugins/global-refdb/global-refdb.jar}
 
 export FAKE_NFS=$COMMON_LOCATION/fake_nfs
 
@@ -413,10 +420,20 @@
 else
   cp -f $MULTISITE_LIB_LOCATION $DEPLOYMENT_LOCATION/multi-site.jar  >/dev/null 2>&1 || { echo >&2 "$MULTISITE_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
 fi
+
+echo "Copying events-broker library"
+  cp -f $EVENTS_BROKER_LIB_LOCATION $DEPLOYMENT_LOCATION/events-broker.jar  >/dev/null 2>&1 || { echo >&2 "$EVENTS_BROKER_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
+
+echo "Copying global-refdb library"
+  cp -f $GLOBAL_REFDB_LIB_LOCATION $DEPLOYMENT_LOCATION/global-refdb.jar  >/dev/null 2>&1 || { echo >&2 "$GLOBAL_REFDB_LIB_LOCATION: Not able to copy the file. Aborting"; exit 1; }
+
 if [ $DOWNLOAD_WEBSESSION_PLUGIN = "true" ];then
   echo "Downloading websession-broker plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
-  -O $DEPLOYMENT_LOCATION/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
+  -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
+  wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
+  -O $DEPLOYMENT_LOCATION/websession-broker.jar || \
+  { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
 ing"; exit 1; }
   wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
   -O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
@@ -427,30 +444,29 @@
 
 echo "Downloading zookeeper plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
-  -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
-ing"; exit 1; }
-
-echo "Downloading global-refdb library $GERRIT_BRANCH"
-  wget https://repo1.maven.org/maven2/com/gerritforge/global-refdb/$GLOBAL_REFDB_VER/global-refdb-$GLOBAL_REFDB_VER.jar \
-  -O $DEPLOYMENT_LOCATION/global-refdb.jar || { echo >&2 "Cannot download global-refdb library: Check internet connection. Abort\
-ing"; exit 1; }
-
-echo "Downloading events-broker library $GERRIT_BRANCH"
-  wget https://repo1.maven.org/maven2/com/gerritforge/events-broker/$EVENTS_BROKER_VER/events-broker-$EVENTS_BROKER_VER.jar \
-  -O $DEPLOYMENT_LOCATION/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Abort\
+  -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
+  wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
+  -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || \
+  { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
 ing"; exit 1; }
 
 if [ "$BROKER_TYPE" = "kafka" ]; then
 echo "Downloading events-kafka plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-events-kafka-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
-  -O $DEPLOYMENT_LOCATION/events-kafka.jar || { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Abort\
+  -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
+  wget $GERRIT_CI/plugin-events-kafka-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-kafka/events-kafka.jar \
+  -O $DEPLOYMENT_LOCATION/events-kafka.jar || \
+  { echo >&2 "Cannot download events-kafka plugin: Check internet connection. Abort\
 ing"; exit 1; }
 fi
 
 if [ "$BROKER_TYPE" = "kinesis" ]; then
 echo "Downloading events-aws-kinesis plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
-  -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
+  -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
+  wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
+  -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || \
+  { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
 ing"; exit 1; }
 fi
 
@@ -458,19 +474,25 @@
 if [ "$BROKER_TYPE" = "gcloud-pubsub" ]; then
 echo "Downloading events-gcloud-pubsub plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
-  -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
+  -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
+  wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
+  -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || \
+  { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
 ing"; exit 1; }
 fi
 
 echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
+  wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
+  -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
   wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
-  -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || { echo >&2 "Cannot download metrics-reporter-prometheus plugin: Check internet connection. Abort\
+  -O $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar || \
+  { echo >&2 "Cannot download metrics-reporter-prometheus plugin: Check internet connection. Abort\
 ing"; exit 1; }
 
-if [ "$REPLICATION_TYPE" = "ssh" ];then
-  echo "Using 'SSH' replication type"
-  echo "Make sure ~/.ssh/authorized_keys and ~/.ssh/known_hosts are configured correctly"
-fi
+echo "Downloading pull-replication plugin $GERRIT_BRANCH"
+  wget $GERRIT_CI/plugin-pull-replication-bazel-$GERRIT_BRANCH/$LAST_BUILD/pull-replication/pull-replication.jar \
+  -O $DEPLOYMENT_LOCATION/pull-replication.jar || { echo >&2 "Cannot download pull-replication plugin: Check internet connection. Abort\
+ing"; exit 1; }
 
 if [ "$HTTPS_ENABLED" = "true" ];then
   export HTTP_PROTOCOL="https"
@@ -530,13 +552,18 @@
   echo "Replicating environment"
   cp -fR $LOCATION_TEST_SITE_1/* $LOCATION_TEST_SITE_2
 
-  echo "Link replication plugin"
-  ln -s $LOCATION_TEST_SITE_1/plugins/replication.jar $LOCATION_TEST_SITE_1/lib/replication.jar
-  ln -s $LOCATION_TEST_SITE_2/plugins/replication.jar $LOCATION_TEST_SITE_2/lib/replication.jar
+  echo "Link pullreplication plugin"
+  ln -s $LOCATION_TEST_SITE_1/plugins/pull-replication.jar $LOCATION_TEST_SITE_1/lib/pull-replication.jar
+  ln -s $LOCATION_TEST_SITE_2/plugins/pull-replication.jar $LOCATION_TEST_SITE_2/lib/pull-replication.jar
 
   echo "Link multi-site library to plugin directory"
   ln -s $LOCATION_TEST_SITE_1/lib/multi-site.jar $LOCATION_TEST_SITE_1/plugins/multi-site.jar
   ln -s $LOCATION_TEST_SITE_2/lib/multi-site.jar $LOCATION_TEST_SITE_2/plugins/multi-site.jar
+
+  echo "Copy pull-replication plugin"
+  cp -f $DEPLOYMENT_LOCATION/pull-replication.jar $LOCATION_TEST_SITE_1/plugins/pull-replication.jar
+  cp -f $DEPLOYMENT_LOCATION/pull-replication.jar $LOCATION_TEST_SITE_2/plugins/pull-replication.jar
+
 fi
 
 DOCKER_HOST_ENV=$(docker_host_env)
@@ -558,8 +585,12 @@
 
 echo "Re-deploying configuration files"
 deploy_config_files $GERRIT_1_HOSTNAME $GERRIT_1_HTTPD_PORT $GERRIT_1_SSHD_PORT $GERRIT_2_HOSTNAME $GERRIT_2_HTTPD_PORT $GERRIT_2_SSHD_PORT
+echo "Remove replication plugin from gerrit site 1"
+rm $LOCATION_TEST_SITE_1/plugins/replication.jar
 echo "Starting gerrit site 1"
 $LOCATION_TEST_SITE_1/bin/gerrit.sh restart
+echo "Remove replication plugin from gerrit site 2"
+rm $LOCATION_TEST_SITE_2/plugins/replication.jar
 echo "Starting gerrit site 2"
 $LOCATION_TEST_SITE_2/bin/gerrit.sh restart
 
@@ -574,8 +605,6 @@
 echo "==============================="
 echo "The admin password is 'secret'"
 echo "deployment-location=$DEPLOYMENT_LOCATION"
-echo "replication-type=$REPLICATION_TYPE"
-echo "replication-ssh-user=$REPLICATION_SSH_USER"
 echo "replication-delay=$REPLICATION_DELAY_SEC"
 echo "enable-https=$HTTPS_ENABLED"
 echo
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 6b77713..2d67eab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -23,15 +23,19 @@
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -50,6 +54,10 @@
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
 
+  private static final String REF_DATABASE = "ref-database";
+  private static final String REPLICATION_LAG_REFRESH_INTERVAL = "replicationLagRefreshInterval";
+  private static final Duration REPLICATION_LAG_REFRESH_INTERVAL_DEFAULT = Duration.ofSeconds(60);
+
   private static final String REPLICATION_CONFIG = "replication.config";
   // common parameters to cache and index sections
   private static final int DEFAULT_INDEX_MAX_TRIES = 2;
@@ -64,7 +72,9 @@
   private final Supplier<SharedRefDbConfiguration> sharedRefDb;
   private final Supplier<Collection<Message>> replicationConfigValidation;
   private final Supplier<Broker> broker;
+  private final Supplier<ReplicationFilter> replicationFilter;
   private final Config multiSiteConfig;
+  private final Supplier<Duration> replicationLagRefreshInterval;
 
   @Inject
   Configuration(SitePaths sitePaths) {
@@ -86,6 +96,18 @@
                 new SharedRefDbConfiguration(
                     enableSharedRefDbByDefault(lazyMultiSiteCfg.get()), PLUGIN_NAME));
     broker = memoize(() -> new Broker(lazyMultiSiteCfg));
+    replicationFilter = memoize(() -> new ReplicationFilter(lazyMultiSiteCfg));
+    replicationLagRefreshInterval =
+        memoize(
+            () ->
+                Duration.ofMillis(
+                    ConfigUtil.getTimeUnit(
+                        lazyMultiSiteCfg.get(),
+                        REF_DATABASE,
+                        null,
+                        REPLICATION_LAG_REFRESH_INTERVAL,
+                        REPLICATION_LAG_REFRESH_INTERVAL_DEFAULT.toMillis(),
+                        TimeUnit.MILLISECONDS)));
   }
 
   public Config getMultiSiteConfig() {
@@ -116,6 +138,14 @@
     return projects.get();
   }
 
+  public ReplicationFilter replicationFilter() {
+    return replicationFilter.get();
+  }
+
+  public Duration replicationLagRefreshInterval() {
+    return replicationLagRefreshInterval.get();
+  }
+
   public Collection<Message> validate() {
     return replicationConfigValidation.get();
   }
@@ -311,6 +341,81 @@
     }
   }
 
+  public static class ReplicationFilter {
+    static final String REPLICATION_FILTER_SECTION = "replication";
+    static final String REPLICATION_PUSH_FILTER_SUBSECTION = "push-filter";
+    static final String REPLICATION_FETCH_FILTER_SUBSECTION = "fetch-filter";
+
+    private static final String MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS =
+        "minWaitBeforeReloadLocalVersionMs";
+    private static final String RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS =
+        "maxRandomWaitBeforeReloadLocalVersionMs";
+
+    private static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT = 1000;
+    private static final int RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT = 1000;
+    private final Supplier<Integer> fetchMinWaitBeforeReloadLocalVersionMs;
+    private final Supplier<Integer> fetchWaitBeforeReloadLocalVersionMs;
+    private final Supplier<Integer> pushMinWaitBeforeReloadLocalVersionMs;
+    private final Supplier<Integer> pushWaitBeforeReloadLocalVersionMs;
+
+    public ReplicationFilter(Supplier<Config> cfg) {
+      fetchMinWaitBeforeReloadLocalVersionMs =
+          memoize(
+              () ->
+                  cfg.get()
+                      .getInt(
+                          REPLICATION_FILTER_SECTION,
+                          REPLICATION_FETCH_FILTER_SUBSECTION,
+                          MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS,
+                          MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+      fetchWaitBeforeReloadLocalVersionMs =
+          memoize(
+              () ->
+                  cfg.get()
+                      .getInt(
+                          REPLICATION_FILTER_SECTION,
+                          REPLICATION_FETCH_FILTER_SUBSECTION,
+                          RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS,
+                          RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+      pushMinWaitBeforeReloadLocalVersionMs =
+          memoize(
+              () ->
+                  cfg.get()
+                      .getInt(
+                          REPLICATION_FILTER_SECTION,
+                          REPLICATION_PUSH_FILTER_SUBSECTION,
+                          MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS,
+                          MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+      pushWaitBeforeReloadLocalVersionMs =
+          memoize(
+              () ->
+                  cfg.get()
+                      .getInt(
+                          REPLICATION_FILTER_SECTION,
+                          REPLICATION_PUSH_FILTER_SUBSECTION,
+                          RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS,
+                          RANDOM_WAIT_MAX_BOUND_BEFORE_RELOAD_LOCAL_VERSION_MS_DEFAULT));
+    }
+
+    public boolean isFetchFilterRandomSleepEnabled() {
+      return fetchWaitBeforeReloadLocalVersionMs.get() != 0;
+    }
+
+    public Integer fetchFilterRandomSleepTimeMs() {
+      return fetchMinWaitBeforeReloadLocalVersionMs.get()
+          + new Random().nextInt(fetchWaitBeforeReloadLocalVersionMs.get());
+    }
+
+    public boolean isPushFilterRandomSleepEnabled() {
+      return pushWaitBeforeReloadLocalVersionMs.get() != 0;
+    }
+
+    public Integer pushFilterRandomSleepTimeMs() {
+      return pushMinWaitBeforeReloadLocalVersionMs.get()
+          + new Random().nextInt(pushWaitBeforeReloadLocalVersionMs.get());
+    }
+  }
+
   static boolean getBoolean(
       Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
     try {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index 7c88655..95c417f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,9 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gerrit.server.util.PluginLogFile;
 import com.google.gerrit.server.util.SystemLog;
 import com.google.gson.Gson;
@@ -41,7 +41,7 @@
   }
 
   @Override
-  public void log(Direction direction, String topic, EventMessage event) {
+  public void log(Direction direction, String topic, Event event) {
     msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
index c2c4b46..23c720a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jProjectVersionLogger.java
@@ -45,4 +45,9 @@
       verLog.info("{ \"project\":\"{}\", \"version\":{} }", projectName, currentVersion);
     }
   }
+
+  @Override
+  public void logDeleted(Project.NameKey projectName) {
+    verLog.info("{ \"project\":\"{}\", \"status\":\"DELETED\" }", projectName);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index b1f3e79..cc64b02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 
 public interface MessageLogger {
 
@@ -23,5 +23,5 @@
     CONSUME;
   }
 
-  public void log(Direction direction, String topic, EventMessage event);
+  public void log(Direction direction, String topic, Event event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 91a927b..9392077 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -16,30 +16,17 @@
 
 import com.gerritforge.gerrit.globalrefdb.validation.LibModule;
 import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.CreationException;
 import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
-import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class Module extends LifecycleModule {
-  private static final Logger log = LoggerFactory.getLogger(Module.class);
   private Configuration config;
 
   @Inject
@@ -49,6 +36,7 @@
 
   @Override
   protected void configure() {
+    boolean brokerRouterNeeded = false;
 
     Collection<Message> validationErrors = config.validate();
     if (!validationErrors.isEmpty()) {
@@ -64,65 +52,19 @@
 
     if (config.cache().synchronize()) {
       install(new CacheModule());
+      brokerRouterNeeded = true;
     }
     if (config.event().synchronize()) {
-      install(new EventModule(config));
+      brokerRouterNeeded = true;
     }
     if (config.index().synchronize()) {
       install(new IndexModule());
+      brokerRouterNeeded = true;
     }
 
-    install(new RouterModule());
-  }
-
-  @Provides
-  @Singleton
-  @InstanceId
-  UUID getInstanceId(SitePaths sitePaths) throws IOException {
-    UUID instanceId = null;
-    Path dataDir = sitePaths.data_dir.resolve(Configuration.PLUGIN_NAME);
-    if (!dataDir.toFile().exists()) {
-      dataDir.toFile().mkdirs();
+    if (brokerRouterNeeded) {
+      install(new BrokerModule());
+      install(new RouterModule());
     }
-    String serverIdFile =
-        dataDir.toAbsolutePath().toString() + "/" + Configuration.INSTANCE_ID_FILE;
-
-    instanceId = tryToLoadSavedInstanceId(serverIdFile);
-
-    if (instanceId == null) {
-      instanceId = UUID.randomUUID();
-      Files.createFile(Paths.get(serverIdFile));
-      try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(serverIdFile))) {
-        writer.write(instanceId.toString());
-      } catch (IOException e) {
-        log.warn(
-            String.format(
-                "Cannot write instance ID, a new one will be generated at instance restart. (%s)",
-                e.getMessage()));
-      }
-    }
-    return instanceId;
-  }
-
-  private UUID tryToLoadSavedInstanceId(String serverIdFile) {
-    if (Files.exists(Paths.get(serverIdFile))) {
-      try (BufferedReader br = Files.newBufferedReader(Paths.get(serverIdFile))) {
-        return UUID.fromString(br.readLine());
-      } catch (IOException e) {
-        log.warn(
-            String.format(
-                "Cannot read instance ID from path '%s', deleting the old file and generating a new ID: (%s)",
-                serverIdFile, e.getMessage()));
-        try {
-          Files.delete(Paths.get(serverIdFile));
-        } catch (IOException e1) {
-          log.warn(
-              String.format(
-                  "Cannot delete old instance ID file at path '%s' with instance ID while generating a new one: (%s)",
-                  serverIdFile, e1.getMessage()));
-        }
-      }
-    }
-    return null;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index 20aaef6..db06c9f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,38 +15,99 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.gerritforge.gerrit.globalrefdb.validation.ProjectDeletedSharedDbCleanup;
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.ProvisionException;
 import com.google.inject.Scopes;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
 import com.googlesource.gerrit.plugins.multisite.consumer.ReplicationStatusModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
+import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
 
 public class PluginModule extends LifecycleModule {
-  private Configuration config;
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+  private static final String[] FILTER_MODULES_CLASS_NAMES =
+      new String[] {
+        /* Class names are defined as String for avoiding this class failing to load
+         * if either replication or pull-replication plugins are missing.
+         */
+        "com.googlesource.gerrit.plugins.multisite.validation.PullReplicationFilterModule",
+        "com.googlesource.gerrit.plugins.multisite.validation.PushReplicationFilterModule"
+      };
+
+  private final Configuration config;
+  private final WorkQueue workQueue;
+  private final Injector parentInjector;
 
   @Inject
-  public PluginModule(Configuration config) {
+  public PluginModule(Configuration config, WorkQueue workQueue, Injector parentInjector) {
     this.config = config;
+    this.workQueue = workQueue;
+    this.parentInjector = parentInjector;
   }
 
   @Override
   protected void configure() {
-    bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
-    install(new SubscriberModule());
+    if (config.index().synchronize()
+        || config.cache().synchronize()
+        || config.event().synchronize()) {
+      install(new EventModule(config));
+      bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
+      install(new SubscriberModule());
 
-    install(new BrokerForwarderModule());
-    listener().to(MultiSiteConsumerRunner.class);
+      install(new BrokerForwarderModule());
+      listener().to(MultiSiteConsumerRunner.class);
 
-    install(new ReplicationStatusModule());
+      install(new ReplicationStatusModule(workQueue));
+    }
+
     if (config.getSharedRefDbConfiguration().getSharedRefDb().isEnabled()) {
       listener().to(PluginStartup.class);
       DynamicSet.bind(binder(), ProjectDeletedListener.class)
           .to(ProjectDeletedSharedDbCleanup.class);
     }
+
+    detectFilterModules()
+        .forEach(
+            mod -> {
+              install(mod);
+              log.atInfo().log(
+                  "Replication filter module %s installed successfully",
+                  mod.getClass().getSimpleName());
+            });
+  }
+
+  private Iterable<AbstractModule> detectFilterModules() {
+    ImmutableList.Builder<AbstractModule> filterModulesBuilder = ImmutableList.builder();
+
+    for (String filterClassName : FILTER_MODULES_CLASS_NAMES) {
+      try {
+        @SuppressWarnings("unchecked")
+        Class<AbstractModule> filterClass = (Class<AbstractModule>) Class.forName(filterClassName);
+
+        AbstractModule filterModule = parentInjector.getInstance(filterClass);
+        // Check if the filterModule would be valid for creating a child Guice Injector
+        parentInjector.createChildInjector(filterModule);
+
+        filterModulesBuilder.add(filterModule);
+      } catch (NoClassDefFoundError | ClassNotFoundException e) {
+        log.atFine().withCause(e).log(
+            "Not loading %s because of missing the associated replication plugin", filterClassName);
+      } catch (Exception e) {
+        throw new ProvisionException(
+            "Unable to instantiate replication filter " + filterClassName, e);
+      }
+    }
+
+    return filterModulesBuilder.build();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
index 6ababb6..2ee2c13 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/ProjectVersionLogger.java
@@ -19,4 +19,6 @@
 public interface ProjectVersionLogger {
 
   public void log(Project.NameKey projectName, long currentVersion, long replicationLag);
+
+  public void logDeleted(Project.NameKey projectName);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 71be5e6..f58efa7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -15,62 +15,101 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.Set;
-import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class BrokerApiWrapper implements BrokerApi {
+  private static final Logger log = LoggerFactory.getLogger(BrokerApiWrapper.class);
+  private final Executor executor;
   private final DynamicItem<BrokerApi> apiDelegate;
   private final BrokerMetrics metrics;
   private final MessageLogger msgLog;
-  private final UUID instanceId;
 
   @Inject
   public BrokerApiWrapper(
+      @BrokerExecutor Executor executor,
       DynamicItem<BrokerApi> apiDelegate,
       BrokerMetrics metrics,
-      MessageLogger msgLog,
-      @InstanceId UUID instanceId) {
+      MessageLogger msgLog) {
     this.apiDelegate = apiDelegate;
+    this.executor = executor;
     this.metrics = metrics;
     this.msgLog = msgLog;
-    this.instanceId = instanceId;
   }
 
-  public boolean send(String topic, Event event) {
-    return send(topic, apiDelegate.get().newMessage(instanceId, event));
-  }
-
-  @Override
-  public boolean send(String topic, EventMessage message) {
-    if (Context.isForwardedEvent()) {
-      return true;
-    }
-    boolean succeeded = false;
+  public boolean sendSync(String topic, Event event) {
     try {
-      succeeded = apiDelegate.get().send(topic, message);
-    } finally {
-      if (succeeded) {
-        msgLog.log(Direction.PUBLISH, topic, message);
-        metrics.incrementBrokerPublishedMessage();
-      } else {
-        metrics.incrementBrokerFailedToPublishMessage();
-      }
+      return send(topic, event).get();
+    } catch (Throwable e) {
+      log.error(
+          "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
+          event,
+          topic,
+          e.getMessage(),
+          e.getStackTrace());
+      metrics.incrementBrokerFailedToPublishMessage();
+      return false;
     }
-    return succeeded;
   }
 
   @Override
-  public void receiveAsync(String topic, Consumer<EventMessage> messageConsumer) {
+  public ListenableFuture<Boolean> send(String topic, Event message) {
+    SettableFuture<Boolean> resultFuture = SettableFuture.create();
+    if (Context.isForwardedEvent()) {
+      resultFuture.set(true);
+      return resultFuture;
+    }
+
+    if (Strings.isNullOrEmpty(message.instanceId)) {
+      log.warn(
+          "Dropping event '{}' because event instance id cannot be null or empty",
+          message.toString());
+      resultFuture.set(true);
+      return resultFuture;
+    }
+
+    ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
+    Futures.addCallback(
+        resfultF,
+        new FutureCallback<Boolean>() {
+          @Override
+          public void onSuccess(Boolean result) {
+            msgLog.log(Direction.PUBLISH, topic, message);
+            metrics.incrementBrokerPublishedMessage();
+          }
+
+          @Override
+          public void onFailure(Throwable throwable) {
+            log.error(
+                "Failed to publish message '{}' to topic '{}' - error: {}",
+                message.toString(),
+                topic,
+                throwable.getMessage());
+            metrics.incrementBrokerFailedToPublishMessage();
+          }
+        },
+        executor);
+
+    return resfultF;
+  }
+
+  @Override
+  public void receiveAsync(String topic, Consumer<Event> messageConsumer) {
     apiDelegate.get().receiveAsync(topic, messageConsumer);
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
similarity index 83%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
index 87306a2..aa24eb1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/InstanceId.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2019 The Android Open Source Project
+// Copyright (C) 2021 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.broker;
 
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 
@@ -21,4 +21,4 @@
 
 @Retention(RUNTIME)
 @BindingAnnotation
-public @interface InstanceId {}
+@interface BrokerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
new file mode 100644
index 0000000..e843263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
+
+@Singleton
+class BrokerExecutorProvider extends ExecutorProvider {
+
+  @Inject
+  BrokerExecutorProvider(WorkQueue workQueue) {
+    super(workQueue, 1, "Multi-Site-Broker");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
new file mode 100644
index 0000000..a5dac4a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import java.util.concurrent.Executor;
+
+public class BrokerModule extends LifecycleModule {
+
+  @Override
+  protected void configure() {
+    bind(Executor.class)
+        .annotatedWith(BrokerExecutor.class)
+        .toProvider(BrokerExecutorProvider.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
index 2990264..e418da5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandler.java
@@ -17,6 +17,7 @@
 import com.google.common.cache.RemovalNotification;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -28,21 +29,25 @@
   private final Executor executor;
   private final DynamicSet<CacheEvictionForwarder> forwarders;
   private final CachePatternMatcher matcher;
+  private final String instanceId;
 
   @Inject
   CacheEvictionHandler(
       DynamicSet<CacheEvictionForwarder> forwarders,
       @CacheExecutor Executor executor,
-      CachePatternMatcher matcher) {
+      CachePatternMatcher matcher,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.matcher = matcher;
+    this.instanceId = instanceId;
   }
 
   @Override
   public void onRemoval(String plugin, String cache, RemovalNotification<K, V> notification) {
     if (!Context.isForwardedEvent() && !notification.wasEvicted() && matcher.matches(cache)) {
-      executor.execute(new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey())));
+      executor.execute(
+          new CacheEvictionTask(new CacheEvictionEvent(cache, notification.getKey(), instanceId)));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
index b8521a3..2dcb09a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/CachePatternMatcher.java
@@ -25,7 +25,7 @@
 
 @Singleton
 class CachePatternMatcher {
-  private static final List<String> DEFAULT_PATTERNS =
+  private static final ImmutableList<String> DEFAULT_PATTERNS =
       ImmutableList.of("^groups.*", "ldap_groups", "ldap_usernames", "projects", "sshkeys");
 
   private final Pattern pattern;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
index fdc6fc3..44f8417 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandler.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.events.ProjectEvent;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
@@ -33,15 +34,18 @@
   private final DynamicSet<ProjectListUpdateForwarder> forwarders;
   private final Executor executor;
   private final ProjectsFilter projectsFilter;
+  private final String instanceId;
 
   @Inject
   public ProjectListUpdateHandler(
       DynamicSet<ProjectListUpdateForwarder> forwarders,
       @CacheExecutor Executor executor,
-      ProjectsFilter filter) {
+      ProjectsFilter filter,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.projectsFilter = filter;
+    this.instanceId = instanceId;
   }
 
   @Override
@@ -59,7 +63,8 @@
   private void process(ProjectEvent event, boolean delete) {
     if (!Context.isForwardedEvent() && projectsFilter.matches(event.getProjectName())) {
       executor.execute(
-          new ProjectListUpdateTask(new ProjectListUpdateEvent(event.getProjectName(), delete)));
+          new ProjectListUpdateTask(
+              new ProjectListUpdateEvent(event.getProjectName(), delete, instanceId)));
     }
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index b37f434..7107b87 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,20 +14,19 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
 import com.google.common.base.Strings;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
 import java.io.IOException;
-import java.util.UUID;
 import java.util.function.Consumer;
 
 public abstract class AbstractSubcriber {
@@ -44,13 +43,13 @@
   public AbstractSubcriber(
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String gerritInstanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
     this.eventRouter = eventRouter;
     this.droppedEventListeners = droppedEventListeners;
-    this.instanceId = instanceId.toString();
+    this.instanceId = gerritInstanceId;
     this.msgLog = msgLog;
     this.subscriberMetrics = subscriberMetrics;
     this.cfg = cfg;
@@ -59,19 +58,22 @@
 
   protected abstract EventTopic getTopic();
 
-  public Consumer<EventMessage> getConsumer() {
+  protected abstract Boolean shouldConsumeEvent(Event event);
+
+  public Consumer<Event> getConsumer() {
     return this::processRecord;
   }
 
-  private void processRecord(EventMessage event) {
-    String sourceInstanceId = event.getHeader().sourceInstanceId;
+  private void processRecord(Event event) {
+    String sourceInstanceId = event.instanceId;
 
-    if (Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId)) {
+    if ((Strings.isNullOrEmpty(sourceInstanceId) || instanceId.equals(sourceInstanceId))
+        || !shouldConsumeEvent(event)) {
       if (Strings.isNullOrEmpty(sourceInstanceId)) {
         logger.atWarning().log(
             String.format(
                 "Dropping event %s because sourceInstanceId cannot be null", event.toString()));
-      } else {
+      } else if (instanceId.equals(sourceInstanceId)) {
         logger.atFiner().log(
             String.format(
                 "Dropping event %s produced by our instanceId %s", event.toString(), instanceId));
@@ -80,16 +82,16 @@
     } else {
       try {
         msgLog.log(Direction.CONSUME, topic, event);
-        eventRouter.route(event.getEvent());
+        eventRouter.route(event);
         subscriberMetrics.incrementSubscriberConsumedMessage();
-        subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log("Malformed event '%s'", event.getHeader());
+        logger.atSevere().withCause(e).log("Malformed event '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       } catch (PermissionBackendException | CacheNotFoundException e) {
-        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event.getHeader());
+        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event);
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
     }
+    subscriberMetrics.updateReplicationStatusMetrics(event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
index 5bbaee0..cdbf220 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriber.java
@@ -14,31 +14,49 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class BatchIndexEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public BatchIndexEventSubscriber(
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.BATCH_INDEX_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ChangeIndexEvent) {
+      return projectsFilter.matches(((ChangeIndexEvent) event).projectName);
+    }
+    if (event instanceof ProjectIndexEvent) {
+      return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+    }
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index eae66b4..d8342b0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -15,14 +15,14 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@@ -30,7 +30,7 @@
   public CacheEvictionEventSubscriber(
       CacheEvictionEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
       Configuration cfg) {
@@ -42,4 +42,9 @@
   protected EventTopic getTopic() {
     return EventTopic.CACHE_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 6f4680c..b34e02c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
 
 public interface DroppedEventListener {
   /**
@@ -22,5 +22,5 @@
    *
    * @param event information about the event.
    */
-  void onEventDropped(EventMessage event);
+  void onEventDropped(Event event);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 49d470a..480fc50 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -14,31 +14,68 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.entities.Change;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import java.util.UUID;
+import java.util.Optional;
 
 @Singleton
 public class IndexEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+  private final ChangeFinder changeFinder;
+
   @Inject
   public IndexEventSubscriber(
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter,
+      ChangeFinder changeFinder) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
+    this.changeFinder = changeFinder;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.INDEX_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ChangeIndexEvent) {
+      ChangeIndexEvent changeIndexEvent = (ChangeIndexEvent) event;
+      String projectName = changeIndexEvent.projectName;
+      if (isDeletedChangeWithEmptyProject(changeIndexEvent)) {
+        projectName = findProjectFromChangeId(changeIndexEvent.changeId).orElse(projectName);
+      }
+      return projectsFilter.matches(projectName);
+    }
+    if (event instanceof ProjectIndexEvent) {
+      return projectsFilter.matches(((ProjectIndexEvent) event).projectName);
+    }
+    return true;
+  }
+
+  private boolean isDeletedChangeWithEmptyProject(ChangeIndexEvent changeIndexEvent) {
+    return changeIndexEvent.deleted && changeIndexEvent.projectName.isEmpty();
+  }
+
+  private Optional<String> findProjectFromChangeId(int changeId) {
+    return changeFinder.findOne(Change.id(changeId)).map(c -> c.getChange().getProject().get());
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 6ff0969..069a516 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -14,31 +14,42 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import java.util.UUID;
 
 @Singleton
 public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public ProjectUpdateEventSubscriber(
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.PROJECT_LIST_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    return projectsFilter.matches(((ProjectListUpdateEvent) event).projectName);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
index 7360b8f..0253588 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatus.java
@@ -19,14 +19,20 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.cache.CacheModule;
 import com.google.gerrit.server.cache.serialize.JavaCacheSerializer;
 import com.google.gerrit.server.cache.serialize.StringCacheSerializer;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
 import com.google.inject.Module;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
+import com.google.inject.name.Names;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
 import java.util.Collection;
@@ -36,23 +42,29 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Singleton
-public class ReplicationStatus implements LifecycleListener {
+public class ReplicationStatus implements LifecycleListener, ProjectDeletedListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final Map<String, Long> replicationStatusPerProject = new HashMap<>();
-  static final String REPLICATION_STATUS_CACHE = "replication_status";
+  static final String REPLICATION_STATUS = "replication_status";
 
-  public static Module cacheModule() {
+  public static Module cacheModule(WorkQueue queue) {
     return new CacheModule() {
       @Override
       protected void configure() {
-        persist(REPLICATION_STATUS_CACHE, String.class, Long.class)
+        persist(REPLICATION_STATUS, String.class, Long.class)
             .version(1)
             .keySerializer(StringCacheSerializer.INSTANCE)
             .valueSerializer(new JavaCacheSerializer<>());
+
+        bind(ScheduledExecutorService.class)
+            .annotatedWith(Names.named(REPLICATION_STATUS))
+            .toInstance(queue.createQueue(0, REPLICATION_STATUS));
       }
     };
   }
@@ -62,20 +74,35 @@
   private final Optional<ProjectVersionRefUpdate> projectVersionRefUpdate;
   private final ProjectVersionLogger verLogger;
   private final ProjectCache projectCache;
+  private final ScheduledExecutorService statusScheduler;
+
+  private final Configuration config;
+
+  private final MetricMaker metricMaker;
 
   @Inject
   public ReplicationStatus(
-      @Named(REPLICATION_STATUS_CACHE) Cache<String, Long> cache,
+      @Named(REPLICATION_STATUS) Cache<String, Long> cache,
       Optional<ProjectVersionRefUpdate> projectVersionRefUpdate,
       ProjectVersionLogger verLogger,
-      ProjectCache projectCache) {
+      ProjectCache projectCache,
+      @Named(REPLICATION_STATUS) ScheduledExecutorService statusScheduler,
+      Configuration config,
+      MetricMaker metricMaker) {
     this.cache = cache;
     this.projectVersionRefUpdate = projectVersionRefUpdate;
     this.verLogger = verLogger;
     this.projectCache = projectCache;
+    this.statusScheduler = statusScheduler;
+    this.config = config;
+    this.metricMaker = metricMaker;
   }
 
   public Long getMaxLag() {
+    return getMaxLagMillis() / 1000;
+  }
+
+  public Long getMaxLagMillis() {
     Collection<Long> lags = replicationStatusPerProject.values();
     if (lags.isEmpty()) {
       return 0L;
@@ -121,15 +148,75 @@
     }
   }
 
+  void removeProjectFromReplicationLagMetrics(Project.NameKey projectName) {
+    Optional<Long> localVersion =
+        projectVersionRefUpdate.get().getProjectLocalVersion(projectName.get());
+
+    if (!localVersion.isPresent() && replicationStatusPerProject.containsKey(projectName.get())) {
+      cache.invalidate(projectName.get());
+      replicationStatusPerProject.remove(projectName.get());
+      localVersionPerProject.remove(projectName.get());
+      verLogger.logDeleted(projectName);
+      logger.atFine().log("Removed project '%s' from replication lag metrics", projectName);
+    }
+  }
+
+  @VisibleForTesting
+  Runnable replicationLagMetricPerProject(CallbackMetric1<String, Long> metricCallback) {
+    return () -> {
+      if (replicationStatusPerProject.isEmpty()) {
+        metricCallback.forceCreate("");
+      } else {
+        replicationStatusPerProject.entrySet().stream()
+            .filter(e -> e.getValue() > 0)
+            .forEach(
+                e ->
+                    metricCallback.set(
+                        SubscriberMetrics.sanitizeProjectName(e.getKey()), e.getValue()));
+        metricCallback.prune();
+      }
+    };
+  }
+
   @VisibleForTesting
   public void doUpdateLag(Project.NameKey projectName, Long lag) {
     cache.put(projectName.get(), lag);
     replicationStatusPerProject.put(projectName.get(), lag);
   }
 
+  @VisibleForTesting
+  Long getReplicationStatus(String projectName) {
+    return replicationStatusPerProject.get(projectName);
+  }
+
+  @VisibleForTesting
+  Long getLocalVersion(String projectName) {
+    return localVersionPerProject.get(projectName);
+  }
+
   @Override
   public void start() {
     loadAllFromCache();
+
+    long replicationLagPollingInterval = config.replicationLagRefreshInterval().toMillis();
+
+    if (replicationLagPollingInterval > 0) {
+      statusScheduler.scheduleAtFixedRate(
+          this::refreshProjectsWithLag,
+          replicationLagPollingInterval,
+          replicationLagPollingInterval,
+          TimeUnit.MILLISECONDS);
+    }
+  }
+
+  @VisibleForTesting
+  public void refreshProjectsWithLag() {
+    logger.atFine().log("Refreshing projects version lags triggered ...");
+    replicationStatusPerProject.entrySet().stream()
+        .filter(entry -> entry.getValue() > 0)
+        .map(Map.Entry::getKey)
+        .map(Project::nameKey)
+        .forEach(this::updateReplicationLag);
   }
 
   @Override
@@ -140,4 +227,9 @@
         projectCache.all().stream().map(Project.NameKey::get).collect(Collectors.toSet());
     replicationStatusPerProject.putAll(cache.getAllPresent(cachedProjects));
   }
+
+  @Override
+  public void onProjectDeleted(Event event) {
+    removeProjectFromReplicationLagMetrics(Project.nameKey(event.getProjectName()));
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
index e954bc4..605e119 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusModule.java
@@ -14,14 +14,27 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
 import com.google.inject.Scopes;
 
 public class ReplicationStatusModule extends LifecycleModule {
+
+  private final WorkQueue workQueue;
+
+  @Inject
+  public ReplicationStatusModule(WorkQueue workQueue) {
+    this.workQueue = workQueue;
+  }
+
   @Override
   protected void configure() {
     bind(ReplicationStatus.class).in(Scopes.SINGLETON);
-    install(ReplicationStatus.cacheModule());
+    install(ReplicationStatus.cacheModule(workQueue));
     listener().to(ReplicationStatus.class);
+    DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationStatus.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 20c355e..ab64651 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -14,31 +14,45 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import java.util.UUID;
 
 @Singleton
 public class StreamEventSubscriber extends AbstractSubcriber {
+  private final ProjectsFilter projectsFilter;
+
   @Inject
   public StreamEventSubscriber(
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
-      @InstanceId UUID instanceId,
+      @GerritInstanceId String instanceId,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics,
-      Configuration cfg) {
+      Configuration cfg,
+      ProjectsFilter projectsFilter) {
     super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
+    this.projectsFilter = projectsFilter;
   }
 
   @Override
   protected EventTopic getTopic() {
     return EventTopic.STREAM_EVENT_TOPIC;
   }
+
+  @Override
+  protected Boolean shouldConsumeEvent(Event event) {
+    if (event instanceof ProjectEvent) {
+      return projectsFilter.matches(((ProjectEvent) event).getProjectNameKey().get());
+    }
+    return true;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index 899233e..bb971ff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -14,32 +14,43 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.metrics.CallbackMetric1;
 import com.google.gerrit.metrics.Counter1;
 import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectEvent;
 import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.logging.Metadata;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.MultiSiteMetrics;
-import com.googlesource.gerrit.plugins.replication.RefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
-import com.googlesource.gerrit.plugins.replication.ReplicationScheduledEvent;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Singleton
 public class SubscriberMetrics extends MultiSiteMetrics {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
   private static final String SUBSCRIBER_FAILURE_COUNTER =
       "subscriber_msg_consumer_failure_counter";
-  private static final String REPLICATION_LAG_SEC =
+  public static final String REPLICATION_LAG_SEC =
       "multi_site/subscriber/subscriber_replication_status/sec_behind";
+  private static final String REPLICATION_LAG_MSEC =
+      "multi_site/subscriber/subscriber_replication_status/msec_behind";
+  private static final String REPLICATION_LAG_MSEC_PROJECT =
+      "multi_site/subscriber/subscriber_replication_status/msec_behind/per_project";
 
   private final Counter1<String> subscriberSuccessCounter;
   private final Counter1<String> subscriberFailureCounter;
   private final ReplicationStatus replicationStatus;
+  private static final Pattern isValidMetricNamePattern = Pattern.compile("[a-zA-Z0-9_-]");
+  private static final Field<String> PROJECT_NAME =
+      Field.ofString("project_name", Metadata.Builder::cacheName).build();
 
   @Inject
   public SubscriberMetrics(MetricMaker metricMaker, ReplicationStatus replicationStatus) {
@@ -64,6 +75,51 @@
         Long.class,
         new Description("Replication lag (sec)").setGauge().setUnit(Description.Units.SECONDS),
         replicationStatus::getMaxLag);
+    metricMaker.newCallbackMetric(
+        REPLICATION_LAG_MSEC,
+        Long.class,
+        new Description("Replication lag (msec)")
+            .setGauge()
+            .setUnit(Description.Units.MILLISECONDS),
+        replicationStatus::getMaxLagMillis);
+
+    CallbackMetric1<String, Long> metrics =
+        metricMaker.newCallbackMetric(
+            SubscriberMetrics.REPLICATION_LAG_MSEC_PROJECT,
+            Long.class,
+            new Description("Per-project replication lag (msec)")
+                .setGauge()
+                .setUnit(Description.Units.MILLISECONDS),
+            PROJECT_NAME);
+    metricMaker.newTrigger(metrics, replicationStatus.replicationLagMetricPerProject(metrics));
+  }
+
+  /**
+   * Ensures that the generated metric is compatible with prometheus metric names, as the set of
+   * values that represent a valid metric name are different from the ones that represent a valid
+   * project name. Main differences: - All _ are replaced with __ - All characters that aren't a
+   * letter(uppercase or lowercase) or a hyphen(-) are replaced with `_<hex_code>` This is to avoid
+   * all chances of name clashes when modifying a project name.
+   *
+   * @param name name of the metric to sanitize
+   * @return sanitized metric name
+   */
+  public static String sanitizeProjectName(String name) {
+    StringBuilder sanitizedName = new StringBuilder();
+    for (int i = 0; i < name.length(); i++) {
+      Character c = name.charAt(i);
+      Matcher matcher = isValidMetricNamePattern.matcher(String.valueOf(c));
+      if (matcher.find()) {
+        if (c == '_') {
+          sanitizedName.append("__");
+        } else {
+          sanitizedName.append(c);
+        }
+      } else {
+        sanitizedName.append("_").append(Integer.toHexString((int) c));
+      }
+    }
+    return sanitizedName.toString();
   }
 
   public void incrementSubscriberConsumedMessage() {
@@ -74,20 +130,18 @@
     subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
   }
 
-  public void updateReplicationStatusMetrics(EventMessage eventMessage) {
-    Event event = eventMessage.getEvent();
-    if (event instanceof RefReplicationDoneEvent) {
-      RefReplicationDoneEvent replicationDone = (RefReplicationDoneEvent) event;
-      replicationStatus.updateReplicationLag(replicationDone.getProjectNameKey());
-    } else if (event instanceof RefReplicatedEvent) {
-      RefReplicatedEvent replicated = (RefReplicatedEvent) event;
-      replicationStatus.updateReplicationLag(replicated.getProjectNameKey());
-    } else if (event instanceof ReplicationScheduledEvent) {
-      ReplicationScheduledEvent updated = (ReplicationScheduledEvent) event;
-      replicationStatus.updateReplicationLag(updated.getProjectNameKey());
-    } else if (event instanceof RefUpdatedEvent) {
-      RefUpdatedEvent updated = (RefUpdatedEvent) event;
-      replicationStatus.updateReplicationLag(updated.getProjectNameKey());
+  public void updateReplicationStatusMetrics(Event event) {
+
+    if (event instanceof RefReplicationDoneEvent
+        || event instanceof RefReplicatedEvent
+        || event instanceof ReplicationScheduledEvent
+        || event instanceof RefUpdatedEvent) {
+      ProjectEvent projectEvent = (ProjectEvent) event;
+      replicationStatus.updateReplicationLag(projectEvent.getProjectNameKey());
+    } else if (event instanceof ProjectDeletionReplicationSucceededEvent) {
+      ProjectDeletionReplicationSucceededEvent projectDeletion =
+          (ProjectDeletionReplicationSucceededEvent) event;
+      replicationStatus.removeProjectFromReplicationLagMetrics(projectDeletion.getProjectNameKey());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java
index 1c29d75..269388c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/Context.java
@@ -14,14 +14,22 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
+import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECTS_API_ENDPOINT;
+import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
+
 /** Allows to tag a forwarded event to avoid infinitely looping events. */
 public class Context {
+  public static final String PULL_REPLICATION_PLUGIN_NAME = "pull-replication";
   private static final ThreadLocal<Boolean> forwardedEvent = ThreadLocal.withInitial(() -> false);
 
   private Context() {}
 
   public static Boolean isForwardedEvent() {
-    return forwardedEvent.get();
+    return forwardedEvent.get()
+        ||
+        // When the event is a result of pull-replication event, is considered as
+        // "forwarded" action because did not happen on this node.
+        isPullReplicationApplyObjectIndexing();
   }
 
   public static void setForwardedEvent(Boolean b) {
@@ -31,4 +39,10 @@
   public static void unsetForwardedEvent() {
     forwardedEvent.remove();
   }
+
+  public static boolean isPullReplicationApplyObjectIndexing() {
+    String threadName = Thread.currentThread().getName();
+    return threadName.contains(PULL_REPLICATION_PLUGIN_NAME + "~" + APPLY_OBJECT_API_ENDPOINT)
+        || threadName.contains(PULL_REPLICATION_PLUGIN_NAME + "~" + APPLY_OBJECTS_API_ENDPOINT);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index 8d41500..522a78f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -63,17 +63,27 @@
   protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
     ChangeChecker checker = changeCheckerFactory.create(id);
     Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
-    if (changeNotes.isPresent()) {
+    boolean changeIsPresent = changeNotes.isPresent();
+    boolean changeIsConsistent = checker.isChangeConsistent();
+    if (changeIsPresent && changeIsConsistent) {
       reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
     } else {
       log.warn(
-          "Change {} not present yet in local Git repository (event={}) after {} attempt(s)",
+          "Change {} {} in local Git repository (event={}) after {} attempt(s)",
           id,
+          !changeIsPresent
+              ? "not present yet"
+              : (changeIsConsistent ? "is" : "is not") + " consistent",
           indexEvent,
           retryCount);
       if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
         log.error(
-            "Change {} could not be found in the local Git repository (event={})", id, indexEvent);
+            "Change {} {} in the local Git repository (event={})",
+            id,
+            !changeIsPresent
+                ? "could not be found"
+                : (changeIsConsistent ? "was" : "was not") + " consistent",
+            indexEvent);
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
index 975916a..19936ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarder.java
@@ -49,6 +49,6 @@
       return true;
     }
 
-    return broker.send(eventTopic.topic(cfg), event);
+    return broker.sendSync(eventTopic.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index 9ff4688..8a020fc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -35,6 +35,6 @@
 
   @Override
   public boolean send(Event event) {
-    return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
+    return broker.sendSync(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
index 317eb76..065ef7e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/AccountIndexEvent.java
@@ -21,8 +21,8 @@
 
   public int accountId;
 
-  public AccountIndexEvent(int accountId) {
-    super(TYPE);
+  public AccountIndexEvent(int accountId, String instanceId) {
+    super(TYPE, instanceId);
     this.accountId = accountId;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
index 1c2185f..4444756 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/CacheEvictionEvent.java
@@ -22,8 +22,8 @@
   public String cacheName;
   public Object key;
 
-  public CacheEvictionEvent(String cacheName, Object key) {
-    super(TYPE);
+  public CacheEvictionEvent(String cacheName, Object key, String instanceId) {
+    super(TYPE, instanceId);
     this.cacheName = cacheName;
     this.key = key;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
index ab4ddf4..64fbdfb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ChangeIndexEvent.java
@@ -28,8 +28,8 @@
   public String targetSha;
   public boolean deleted;
 
-  public ChangeIndexEvent(String projectName, int changeId, boolean deleted) {
-    super(TYPE);
+  public ChangeIndexEvent(String projectName, int changeId, boolean deleted, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
     this.changeId = changeId;
     this.deleted = deleted;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
index 4981b2f..05ac198 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
@@ -24,8 +24,8 @@
   public final String groupUUID;
   public final ObjectId sha1;
 
-  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1) {
-    super(TYPE);
+  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1, String instanceId) {
+    super(TYPE, instanceId);
     this.groupUUID = groupUUID;
     this.sha1 = sha1;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
index ea2c3fb..2fdda72 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/IndexEvent.java
@@ -15,7 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
 public abstract class IndexEvent extends MultiSiteEvent {
-  protected IndexEvent(String type) {
-    super(type);
+  protected IndexEvent(String type, String instanceId) {
+    super(type, instanceId);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
index 404d168..f29204b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/MultiSiteEvent.java
@@ -29,7 +29,8 @@
     register(ProjectListUpdateEvent.TYPE, ProjectListUpdateEvent.class);
   }
 
-  protected MultiSiteEvent(String type) {
+  protected MultiSiteEvent(String type, String instanceId) {
     super(type);
+    this.instanceId = instanceId;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
index 8bdb7b5..954befb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectIndexEvent.java
@@ -21,8 +21,8 @@
 
   public String projectName;
 
-  public ProjectIndexEvent(String projectName) {
-    super(TYPE);
+  public ProjectIndexEvent(String projectName, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
index d030e5b..0e18b27 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/ProjectListUpdateEvent.java
@@ -22,8 +22,8 @@
   public String projectName;
   public boolean remove;
 
-  public ProjectListUpdateEvent(String projectName, boolean remove) {
-    super(TYPE);
+  public ProjectListUpdateEvent(String projectName, boolean remove, String instanceId) {
+    super(TYPE, instanceId);
     this.projectName = projectName;
     this.remove = remove;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 202fb42..a647bce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -21,6 +21,10 @@
 import com.google.gerrit.entities.Account;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.config.AllUsersName;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.RefEvent;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexAccountHandler;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexChangeHandler;
@@ -32,12 +36,12 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
 import java.io.IOException;
 import java.util.Optional;
 import java.util.Set;
 
-public class IndexEventRouter implements ForwardedEventRouter<IndexEvent>, LifecycleListener {
+public class IndexEventRouter
+    implements ForwardedEventRouter<IndexEvent>, EventListener, LifecycleListener {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final ForwardedIndexAccountHandler indexAccountHandler;
@@ -45,6 +49,7 @@
   private final ForwardedIndexGroupHandler indexGroupHandler;
   private final ForwardedIndexProjectHandler indexProjectHandler;
   private final AllUsersName allUsersName;
+  private final String gerritInstanceId;
 
   @Inject
   public IndexEventRouter(
@@ -52,12 +57,14 @@
       ForwardedIndexChangeHandler indexChangeHandler,
       ForwardedIndexGroupHandler indexGroupHandler,
       ForwardedIndexProjectHandler indexProjectHandler,
-      AllUsersName allUsersName) {
+      AllUsersName allUsersName,
+      @GerritInstanceId String gerritInstanceId) {
     this.indexAccountHandler = indexAccountHandler;
     this.indexChangeHandler = indexChangeHandler;
     this.indexGroupHandler = indexGroupHandler;
     this.indexProjectHandler = indexProjectHandler;
     this.allUsersName = allUsersName;
+    this.gerritInstanceId = gerritInstanceId;
   }
 
   @Override
@@ -85,7 +92,7 @@
     }
   }
 
-  public void onRefReplicated(RefReplicationDoneEvent replicationEvent) throws IOException {
+  public void onRefReplicated(RefEvent replicationEvent) throws IOException {
     if (replicationEvent.getProjectNameKey().equals(allUsersName)) {
       Account.Id accountId = Account.Id.fromRef(replicationEvent.getRefName());
       if (accountId != null) {
@@ -97,6 +104,20 @@
   }
 
   @Override
+  public void onEvent(Event event) {
+    if (event instanceof RefEvent
+        && (event.getType().contains("fetch-ref-replicated")
+            || event.getType().contains("fetch-ref-replication-done"))
+        && gerritInstanceId.equals(event.instanceId)) {
+      try {
+        onRefReplicated((RefEvent) event);
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error while processing event %s", event);
+      }
+    }
+  }
+
+  @Override
   public void start() {}
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
index bac907e..2193034 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/RouterModule.java
@@ -14,7 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.router;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
 import com.google.inject.Scopes;
 
 public class RouterModule extends LifecycleModule {
@@ -22,6 +24,8 @@
   protected void configure() {
     bind(IndexEventRouter.class).in(Scopes.SINGLETON);
     listener().to(IndexEventRouter.class).in(Scopes.SINGLETON);
+    DynamicSet.bind(binder(), EventListener.class).to(IndexEventRouter.class);
+
     bind(CacheEvictionEventRouter.class).in(Scopes.SINGLETON);
     bind(ProjectListUpdateRouter.class).in(Scopes.SINGLETON);
     bind(StreamEventRouter.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 4ef3426..95a3e66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -18,7 +18,7 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.io.IOException;
 
 public class StreamEventRouter implements ForwardedEventRouter<Event> {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
index 2ae2d9b..26bfb53 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/http/HttpModule.java
@@ -14,14 +14,25 @@
 
 package com.googlesource.gerrit.plugins.multisite.http;
 
+import com.google.inject.Inject;
 import com.google.inject.servlet.ServletModule;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 
 public class HttpModule extends ServletModule {
 
   public static final String LAG_ENDPOINT_SEGMENT = "replication-lag";
 
+  private final Configuration config;
+
+  @Inject
+  public HttpModule(Configuration config) {
+    this.config = config;
+  }
+
   @Override
   protected void configureServlets() {
-    serve(String.format("/%s", LAG_ENDPOINT_SEGMENT)).with(ReplicationStatusServlet.class);
+    if (config.event().synchronize()) {
+      serve(String.format("/%s", LAG_ENDPOINT_SEGMENT)).with(ReplicationStatusServlet.class);
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
index 9ee59eb..3277150 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
@@ -59,4 +59,11 @@
    * @throws IOException if an I/O error occurred while reading the local Change
    */
   public Optional<Long> getComputedChangeTs() throws IOException;
+
+  /**
+   * Check if the local Change contains current patchset refs
+   *
+   * @return true if local change contains meta and current patchset refs
+   */
+  public boolean isChangeConsistent();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 08b26f7..4ef5b7f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.server.CommentsUtil;
 import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.notedb.ChangeNotes;
 import com.google.gerrit.server.util.ManualRequestContext;
@@ -30,8 +31,11 @@
 import java.sql.Timestamp;
 import java.util.Objects;
 import java.util.Optional;
+import org.eclipse.jgit.errors.MissingObjectException;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +46,7 @@
   private final OneOffRequestContext oneOffReqCtx;
   private final String changeId;
   private final ChangeFinder changeFinder;
+  private final String instanceId;
   private Optional<Long> computedChangeTs = Optional.empty();
   private Optional<ChangeNotes> changeNotes = Optional.empty();
 
@@ -55,12 +60,14 @@
       CommentsUtil commentsUtil,
       ChangeFinder changeFinder,
       OneOffRequestContext oneOffReqCtx,
+      @GerritInstanceId String instanceId,
       @Assisted String changeId) {
     this.changeFinder = changeFinder;
     this.gitRepoMgr = gitRepoMgr;
     this.commentsUtil = commentsUtil;
     this.oneOffReqCtx = oneOffReqCtx;
     this.changeId = changeId;
+    this.instanceId = instanceId;
   }
 
   @Override
@@ -69,7 +76,8 @@
     return getComputedChangeTs()
         .map(
             ts -> {
-              ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, deleted);
+              ChangeIndexEvent event =
+                  new ChangeIndexEvent(projectName, changeId, deleted, instanceId);
               event.eventCreatedOn = ts;
               event.targetSha = getBranchTargetSha();
               return event;
@@ -140,6 +148,34 @@
     }
   }
 
+  @Override
+  public boolean isChangeConsistent() {
+    Optional<ChangeNotes> notes = getChangeNotes();
+    if (!notes.isPresent()) {
+      log.warn("Unable to compute change notes for change {}", changeId);
+      return true;
+    }
+    ObjectId currentPatchSetCommitId = notes.get().getCurrentPatchSet().commitId();
+    try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName());
+        RevWalk walk = new RevWalk(repo)) {
+      walk.parseCommit(currentPatchSetCommitId);
+    } catch (StorageException | MissingObjectException e) {
+      log.warn(
+          String.format(
+              "Consistency check failed for change %s, missing current patchset commit %s",
+              changeId, currentPatchSetCommitId.getName()),
+          e);
+      return false;
+    } catch (IOException e) {
+      log.warn(
+          String.format(
+              "Cannot check consistency for change %s, current patchset commit %s. Assuming change is consistent",
+              changeId, currentPatchSetCommitId.getName()),
+          e);
+    }
+    return true;
+  }
+
   private Optional<Long> computeLastChangeTs() {
     return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(notes));
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index eef3e4b..02f1b1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.extensions.events.GroupIndexedListener;
 import com.google.gerrit.extensions.events.ProjectIndexedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderTask;
@@ -48,6 +49,7 @@
   private final ChangeCheckerImpl.Factory changeChecker;
   private final ProjectsFilter projectsFilter;
   private final GroupChecker groupChecker;
+  private final String instanceId;
 
   @Inject
   IndexEventHandler(
@@ -55,18 +57,20 @@
       DynamicSet<IndexEventForwarder> forwarders,
       ChangeCheckerImpl.Factory changeChecker,
       ProjectsFilter projectsFilter,
-      GroupChecker groupChecker) {
+      GroupChecker groupChecker,
+      @GerritInstanceId String instanceId) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.changeChecker = changeChecker;
     this.projectsFilter = projectsFilter;
     this.groupChecker = groupChecker;
+    this.instanceId = instanceId;
   }
 
   @Override
   public void onAccountIndexed(int id) {
     if (!Context.isForwardedEvent()) {
-      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id));
+      IndexAccountTask task = new IndexAccountTask(new AccountIndexEvent(id, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -87,7 +91,8 @@
   public void onGroupIndexed(String groupUUID) {
     if (!Context.isForwardedEvent()) {
       IndexGroupTask task =
-          new IndexGroupTask(new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID)));
+          new IndexGroupTask(
+              new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID), instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -97,7 +102,7 @@
   @Override
   public void onProjectIndexed(String projectName) {
     if (!Context.isForwardedEvent() && projectsFilter.matches(projectName)) {
-      IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName));
+      IndexProjectTask task = new IndexProjectTask(new ProjectIndexEvent(projectName, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
@@ -132,7 +137,7 @@
 
   private void executeDeleteChangeTask(int id) {
     if (!Context.isForwardedEvent()) {
-      IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true));
+      IndexChangeTask task = new IndexChangeTask(new ChangeIndexEvent("", id, true, instanceId));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
new file mode 100644
index 0000000..d851420
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilter.java
@@ -0,0 +1,200 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationFetchFilter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefDatabase;
+import org.eclipse.jgit.lib.Repository;
+
+@Singleton
+public class MultisiteReplicationFetchFilter implements ReplicationFetchFilter {
+  private static final String ZERO_ID_NAME = ObjectId.zeroId().name();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final SharedRefDatabaseWrapper sharedRefDb;
+  private final GitRepositoryManager gitRepositoryManager;
+  private Configuration config;
+
+  @Inject
+  public MultisiteReplicationFetchFilter(
+      SharedRefDatabaseWrapper sharedRefDb,
+      GitRepositoryManager gitRepositoryManager,
+      Configuration config) {
+    this.sharedRefDb = sharedRefDb;
+    this.gitRepositoryManager = gitRepositoryManager;
+    this.config = config;
+  }
+
+  @Override
+  public Set<String> filter(String projectName, Set<String> refs) {
+    try (Repository repository =
+        gitRepositoryManager.openRepository(Project.nameKey(projectName))) {
+      RefDatabase refDb = repository.getRefDatabase();
+      return refs.stream()
+          .filter(ref -> !hasBeenRemovedFromGlobalRefDb(projectName, ref))
+          .filter(
+              ref -> {
+                if (shouldNotBeTrackedAnymoreOnGlobalRefDb(ref)) {
+                  return true;
+                }
+                Optional<ObjectId> localRefOid =
+                    getLocalSha1IfEqualsToExistingGlobalRefDb(
+                        repository, projectName, refDb, ref, true);
+                localRefOid.ifPresent(
+                    oid ->
+                        repLog.info(
+                            "{}:{}={} is already up-to-date with the shared-refdb and thus will NOT BE"
+                                + " fetched",
+                            projectName,
+                            ref,
+                            oid.getName()));
+
+                return !localRefOid.isPresent();
+              })
+          .collect(Collectors.toSet());
+    } catch (IOException ioe) {
+      String message = String.format("Error while opening project: '%s'", projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return Collections.emptySet();
+    }
+  }
+
+  /*
+   * Since ac43a5f94c773c9db7a73d44035961d69d13fa53 the 'refs/multi-site/version' is
+   * not updated anymore on the global-refdb; however, the values stored already
+   * on the global-refdb could get in the way and prevent replication from happening
+   * as expected.
+   *
+   * Exclude the 'refs/multi-site/version' from local vs. global refdb checking
+   * pretending that the global-refdb for that ref did not exist.
+   */
+  private boolean shouldNotBeTrackedAnymoreOnGlobalRefDb(String ref) {
+    return MULTI_SITE_VERSIONING_REF.equals(ref);
+  }
+
+  /* If the ref to fetch has been set to all zeros on the global-refdb, it means
+   * that whatever is the situation locally, we do not need to fetch it:
+   * - If the remote still has it, fetching it will be useless because the global
+   *   state is that the ref should be removed.
+   * - If the remote doesn't have it anymore, trying to fetch the ref won't do
+   *   anything because you can't just remove local refs by fetching.
+   */
+  private boolean hasBeenRemovedFromGlobalRefDb(String projectName, String ref) {
+    if (foundAsZeroInSharedRefDb(Project.nameKey(projectName), ref)) {
+      repLog.info(
+          "{}:{} is found as zeros (removed) in shared-refdb thus will NOT BE fetched",
+          projectName,
+          ref);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean foundAsZeroInSharedRefDb(NameKey projectName, String ref) {
+    return sharedRefDb
+        .get(projectName, ref, String.class)
+        .map(r -> ZERO_ID_NAME.equals(r))
+        .orElse(false);
+  }
+
+  private Optional<ObjectId> getLocalSha1IfEqualsToExistingGlobalRefDb(
+      Repository repository,
+      String projectName,
+      RefDatabase refDb,
+      String ref,
+      boolean retryWithRandomSleep) {
+    try {
+      Optional<ObjectId> localRefObjectId =
+          Optional.ofNullable(refDb.exactRef(ref))
+              .filter(
+                  r ->
+                      sharedRefDb
+                          .get(Project.nameKey(projectName), r.getName(), String.class)
+                          .map(sharedRefObjId -> r.getObjectId().getName().equals(sharedRefObjId))
+                          .orElse(false))
+              .map(Ref::getObjectId);
+
+      if (!localRefObjectId.isPresent() && retryWithRandomSleep) {
+        randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+            projectName, localRefObjectId, ref);
+        localRefObjectId =
+            getLocalSha1IfEqualsToExistingGlobalRefDb(repository, projectName, refDb, ref, false);
+      }
+
+      return localRefObjectId;
+    } catch (GlobalRefDbLockException gle) {
+      String message = String.format("%s is locked on shared-refdb", ref);
+      repLog.error(message);
+      logger.atSevere().withCause(gle).log(message);
+      return Optional.empty();
+    } catch (IOException ioe) {
+      String message =
+          String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return Optional.empty();
+    }
+  }
+
+  private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+      String projectName, Optional<ObjectId> refObjectId, String ref) {
+    if (!config.replicationFilter().isFetchFilterRandomSleepEnabled()) {
+      repLog.debug(
+          "'{}' is not up-to-date for project '{}' [local='{}']. Random sleep is disabled,"
+              + " reload local ref without delay and re-check",
+          ref,
+          projectName,
+          refObjectId);
+      return;
+    }
+
+    int randomSleepTimeMsec = config.replicationFilter().fetchFilterRandomSleepTimeMs();
+    repLog.debug(
+        "'{}' is not up-to-date for project '{}' [local='{}']. Reload local ref in '{} ms' and"
+            + " re-check",
+        ref,
+        projectName,
+        refObjectId,
+        randomSleepTimeMsec);
+    try {
+      Thread.sleep(randomSleepTimeMsec);
+    } catch (InterruptedException ie) {
+      String message =
+          String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
+      repLog.error(message);
+      logger.atWarning().withCause(ie).log(message);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 98f4897..6f9c2e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -22,12 +22,13 @@
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Random;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.ObjectId;
@@ -42,20 +43,22 @@
 public class MultisiteReplicationPushFilter implements ReplicationPushFilter {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String REF_META_SUFFIX = "/meta";
-  public static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
-  public static final int RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
 
   static final String REPLICATION_LOG_NAME = "replication_log";
   static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
 
   private final SharedRefDatabaseWrapper sharedRefDb;
   private final GitRepositoryManager gitRepositoryManager;
+  private Configuration config;
 
   @Inject
   public MultisiteReplicationPushFilter(
-      SharedRefDatabaseWrapper sharedRefDb, GitRepositoryManager gitRepositoryManager) {
+      SharedRefDatabaseWrapper sharedRefDb,
+      GitRepositoryManager gitRepositoryManager,
+      Configuration config) {
     this.sharedRefDb = sharedRefDb;
     this.gitRepositoryManager = gitRepositoryManager;
+    this.config = config;
   }
 
   @Override
@@ -66,19 +69,23 @@
         gitRepositoryManager.openRepository(Project.nameKey(projectName))) {
       List<RemoteRefUpdate> filteredRefUpdates =
           remoteUpdatesList.stream()
-              .filter(
+              .map(
                   refUpdate -> {
-                    boolean refUpToDate = isUpToDateWithRetry(projectName, repository, refUpdate);
-                    if (!refUpToDate) {
+                    Optional<RemoteRefUpdate> updatedRefUpdate =
+                        isUpToDateWithRetry(projectName, repository, refUpdate);
+                    if (!updatedRefUpdate.isPresent()) {
                       repLog.warn(
-                          "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+                          "{} is not up-to-date with the shared-refdb and thus will NOT BE"
+                              + " replicated",
                           refUpdate);
                       if (refUpdate.getSrcRef().endsWith(REF_META_SUFFIX)) {
                         outdatedChanges.add(getRootChangeRefPrefix(refUpdate.getSrcRef()));
                       }
                     }
-                    return refUpToDate;
+                    return updatedRefUpdate;
                   })
+              .filter(Optional::isPresent)
+              .map(Optional::get)
               .collect(Collectors.toList());
 
       return filteredRefUpdates.stream()
@@ -102,45 +109,67 @@
     }
   }
 
-  private boolean isUpToDateWithRetry(
+  private Optional<RemoteRefUpdate> isUpToDateWithRetry(
       String projectName, Repository repository, RemoteRefUpdate refUpdate) {
     String ref = refUpdate.getSrcRef();
     try {
       if (sharedRefDb.isUpToDate(
           Project.nameKey(projectName),
           new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
-        return true;
+        return Optional.of(refUpdate);
       }
 
       randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
           projectName, refUpdate, ref);
 
+      ObjectId reloadedNewObjectId = getNotNullExactRef(repository, ref);
+      RemoteRefUpdate refUpdateReloaded =
+          newRemoteRefUpdateWithObjectId(repository, refUpdate, reloadedNewObjectId);
       return sharedRefDb.isUpToDate(
-          Project.nameKey(projectName),
-          new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, getNotNullExactRef(repository, ref)));
+              Project.nameKey(projectName),
+              new ObjectIdRef.Unpeeled(
+                  Ref.Storage.NETWORK, ref, refUpdateReloaded.getNewObjectId()))
+          ? Optional.of(refUpdateReloaded)
+          : Optional.empty();
     } catch (GlobalRefDbLockException gle) {
       String message =
           String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
       repLog.error(message);
       logger.atSevere().withCause(gle).log(message);
-      return false;
+      return Optional.empty();
     } catch (IOException ioe) {
       String message =
           String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
       repLog.error(message);
       logger.atSevere().withCause(ioe).log(message);
-      return false;
+      return Optional.empty();
     }
   }
 
+  private RemoteRefUpdate newRemoteRefUpdateWithObjectId(
+      Repository localDb, RemoteRefUpdate refUpdate, ObjectId reloadedNewObjectId)
+      throws IOException {
+    return new RemoteRefUpdate(
+        localDb,
+        refUpdate.getSrcRef(),
+        reloadedNewObjectId,
+        refUpdate.getRemoteName(),
+        refUpdate.isForceUpdate(),
+        null,
+        refUpdate.getExpectedOldObjectId());
+  }
+
   private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
       String projectName, RemoteRefUpdate refUpdate, String ref) {
-    int randomSleepTimeMsec =
-        MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS
-            + new Random().nextInt(RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS);
+    if (!config.replicationFilter().isPushFilterRandomSleepEnabled()) {
+      return;
+    }
+
+    int randomSleepTimeMsec = config.replicationFilter().pushFilterRandomSleepTimeMs();
     repLog.debug(
         String.format(
-            "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and re-check",
+            "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and"
+                + " re-check",
             ref, projectName, refUpdate.getNewObjectId(), randomSleepTimeMsec));
     try {
       Thread.sleep(randomSleepTimeMsec);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
index 469122b..e07fef0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateImpl.java
@@ -20,27 +20,27 @@
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
 import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.git.GitRepositoryManager;
-import com.google.gerrit.server.notedb.IntBlob;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 import java.util.Set;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.ObjectInserter;
-import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 
@@ -106,8 +106,7 @@
       if (newProjectVersionRefUpdate.isPresent()) {
         verLogger.log(projectNameKey, newVersion, 0L);
 
-        if (updateSharedProjectVersion(
-            projectNameKey, newProjectVersionRefUpdate.get().getNewObjectId(), newVersion)) {
+        if (updateSharedProjectVersion(projectNameKey, newVersion)) {
           gitReferenceUpdated.fire(projectNameKey, newProjectVersionRefUpdate.get(), null);
         }
       } else {
@@ -137,20 +136,9 @@
     return newId;
   }
 
-  private boolean updateSharedProjectVersion(
-      Project.NameKey projectNameKey, ObjectId newObjectId, Long newVersion)
+  private boolean updateSharedProjectVersion(Project.NameKey projectNameKey, Long newVersion)
       throws SharedProjectVersionUpdateException {
 
-    Ref sharedRef =
-        sharedRefDb
-            .get(projectNameKey, MULTI_SITE_VERSIONING_REF, String.class)
-            .map(
-                (String objectId) ->
-                    new ObjectIdRef.Unpeeled(
-                        Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.fromString(objectId)))
-            .orElse(
-                new ObjectIdRef.Unpeeled(
-                    Ref.Storage.NEW, MULTI_SITE_VERSIONING_REF, ObjectId.zeroId()));
     Optional<Long> sharedVersion =
         sharedRefDb
             .get(projectNameKey, MULTI_SITE_VERSIONING_VALUE_REF, String.class)
@@ -160,59 +148,50 @@
       if (sharedVersion.isPresent() && sharedVersion.get() >= newVersion) {
         logger.atWarning().log(
             String.format(
-                "NOT Updating project %s version %s (value=%d) in shared ref-db because is more recent than the local one %s (value=%d) ",
-                projectNameKey.get(),
-                newObjectId,
-                newVersion,
-                sharedRef.getObjectId().getName(),
-                sharedVersion.get()));
+                "NOT Updating project %s value=%d in shared ref-db because is more recent than the local value=%d",
+                projectNameKey.get(), newVersion, sharedVersion.get()));
         return false;
       }
 
       logger.atFine().log(
           String.format(
-              "Updating shared project %s version to %s (value=%d)",
-              projectNameKey.get(), newObjectId, newVersion));
+              "Updating shared project %s value to %d", projectNameKey.get(), newVersion));
 
-      boolean success = sharedRefDb.compareAndPut(projectNameKey, sharedRef, newObjectId);
-      if (!success) {
-        String message =
-            String.format(
-                "Project version blob update failed for %s. Current value %s, new value: %s",
-                projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
-        logger.atSevere().log(message);
-        throw new SharedProjectVersionUpdateException(message);
-      }
-
-      success =
-          sharedRefDb.compareAndPut(
-              projectNameKey,
-              MULTI_SITE_VERSIONING_VALUE_REF,
-              sharedVersion.map(Object::toString).orElse(null),
-              newVersion.toString());
-      if (!success) {
-        String message =
-            String.format(
-                "Project version update failed for %s. Current value %s, new value: %s",
-                projectNameKey.get(), safeGetObjectId(sharedRef), newObjectId);
-        logger.atSevere().log(message);
-        throw new SharedProjectVersionUpdateException(message);
-      }
-
+      updateProjectVersionValue(projectNameKey, newVersion, sharedVersion);
       return true;
     } catch (GlobalRefDbSystemError refDbSystemError) {
       String message =
           String.format(
-              "Error while updating shared project version for %s. Current value %s, new value: %s. Error: %s",
+              "Error while updating shared project value for %s. Current value %s, new value: %s. Error: %s",
               projectNameKey.get(),
-              sharedRef.getObjectId(),
-              newObjectId,
+              sharedVersion.map(Object::toString).orElse(null),
+              newVersion,
               refDbSystemError.getMessage());
       logger.atSevere().withCause(refDbSystemError).log(message);
       throw new SharedProjectVersionUpdateException(message);
     }
   }
 
+  private void updateProjectVersionValue(
+      NameKey projectNameKey, Long newVersion, Optional<Long> sharedVersion) {
+    try {
+      if (sharedRefDb.isSetOperationSupported()) {
+        sharedRefDb.put(projectNameKey, MULTI_SITE_VERSIONING_VALUE_REF, newVersion.toString());
+        return;
+      }
+    } catch (NoSuchMethodError e) {
+      logger.atSevere().log(
+          "Global-refdb library is outdated and is not supporting "
+              + "'put' method, update global-refdb to the newest version. Falling back to 'compareAndPut'");
+    }
+
+    sharedRefDb.compareAndPut(
+        projectNameKey,
+        MULTI_SITE_VERSIONING_VALUE_REF,
+        sharedVersion.map(Object::toString).orElse(null),
+        newVersion.toString());
+  }
+
   /* (non-Javadoc)
    * @see com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate#getProjectLocalVersion(java.lang.String)
    */
@@ -220,9 +199,9 @@
   public Optional<Long> getProjectLocalVersion(String projectName) {
     try (Repository repository =
         gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
-      Optional<IntBlob> blob = IntBlob.parse(repository, MULTI_SITE_VERSIONING_REF);
+      Optional<Long> blob = longBlobParse(repository, MULTI_SITE_VERSIONING_REF);
       if (blob.isPresent()) {
-        Long repoVersion = Integer.toUnsignedLong(blob.get().value());
+        Long repoVersion = blob.get();
         logger.atFine().log("Local project '%s' has version %d", projectName, repoVersion);
         return Optional.of(repoVersion);
       }
@@ -234,6 +213,22 @@
     return Optional.empty();
   }
 
+  private Optional<Long> longBlobParse(Repository repo, String refName) throws IOException {
+    return Optional.ofNullable(repo.exactRef(refName))
+        .map(
+            (ref) -> {
+              try {
+                return Long.parseLong(
+                    new String(repo.open(ref.getObjectId()).getBytes(), StandardCharsets.UTF_8));
+              } catch (IOException e) {
+                logger.atSevere().withCause(e).log(
+                    "Unable to extract long BLOB from %s:%s", repo.getDirectory(), ref);
+                return null;
+              }
+            })
+        .filter(Predicates.notNull());
+  }
+
   /* (non-Javadoc)
    * @see com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate#getProjectRemoteVersion(java.lang.String)
    */
@@ -245,10 +240,6 @@
     return globalVersion.flatMap(longString -> getLongValueOf(longString));
   }
 
-  private Object safeGetObjectId(Ref currentRef) {
-    return currentRef == null ? "null" : currentRef.getObjectId();
-  }
-
   private Optional<Long> getLongValueOf(String longString) {
     try {
       return Optional.ofNullable(Long.parseLong(longString));
@@ -286,7 +277,7 @@
   }
 
   private long getCurrentGlobalVersionNumber() {
-    return System.currentTimeMillis() / 1000;
+    return System.currentTimeMillis();
   }
 
   private Boolean isSuccessful(RefUpdate.Result result) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PullReplicationFilterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PullReplicationFilterModule.java
new file mode 100644
index 0000000..b8ad99d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PullReplicationFilterModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationFetchFilter;
+
+public class PullReplicationFilterModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.bind(binder(), ReplicationFetchFilter.class)
+        .to(MultisiteReplicationFetchFilter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PushReplicationFilterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PushReplicationFilterModule.java
new file mode 100644
index 0000000..8a8e826
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/PushReplicationFilterModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
+
+public class PushReplicationFilterModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    DynamicItem.bind(binder(), ReplicationPushFilter.class)
+        .to(MultisiteReplicationPushFilter.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index 8601cf2..fc4d505 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -30,14 +30,11 @@
 import com.gerritforge.gerrit.globalrefdb.validation.dfsrefdb.SharedRefEnforcement;
 import com.google.common.collect.ImmutableSet;
 import com.google.gerrit.extensions.config.FactoryModule;
-import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.config.RepositoryConfig;
 import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
 import com.google.inject.name.Names;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule;
-import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
 
 public class ValidationModule extends FactoryModule {
   private final Configuration cfg;
@@ -50,8 +47,6 @@
 
   @Override
   protected void configure() {
-    install(new ReplicationExtensionPointModule());
-
     bind(SharedRefDatabaseWrapper.class).in(Scopes.SINGLETON);
     bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
     factory(LockWrapper.Factory.class);
@@ -70,8 +65,6 @@
                 ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
                 ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF));
     install(new RepositoryManagerModule(repoConfig));
-    DynamicItem.bind(binder(), ReplicationPushFilter.class)
-        .to(MultisiteReplicationPushFilter.class);
 
     if (cfg.getSharedRefDbConfiguration().getSharedRefDb().getEnforcementRules().isEmpty()) {
       bind(SharedRefEnforcement.class).to(DefaultSharedRefEnforcement.class).in(Scopes.SINGLETON);
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 623871b..d7c5648 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -7,7 +7,8 @@
 message broker for aligning with the other masters over different sites.
 
 The masters must be:
-
+* Gerrit instance id is mandatory for @PLUGIN@ plugin. All the master 
+  must have [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit) populated.
 * events-broker library must be installed as a library module in the
   `$GERRIT_SITE/lib` directory of all the masters
 * global-refdb library must be installed as a library module in the
@@ -98,3 +99,7 @@
 * Subscriber replication lag (sec behind the producer)
 
 `metric=site/multi_site/subscriber/subscriber_replication_status/sec_behind, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
+
+* Subscriber replication lag (millisec behind the producer)
+
+`metric=site/multi_site/subscriber/subscriber_replication_status/msec_behind, type=com.google.gerrit.metrics.dropwizard.CallbackMetricImpl`
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
index bb38288..146341e 100644
--- a/src/main/resources/Documentation/build.md
+++ b/src/main/resources/Documentation/build.md
@@ -3,15 +3,29 @@
 This plugin can be built with Bazel in the Gerrit tree.
 
 Clone or link this plugin to the plugins directory of Gerrit's
-source tree. Put the external dependency Bazel build file into
-the Gerrit /plugins directory, replacing the existing empty one.
+source tree.
+
+Clone the [pull-replication](https://gerrit.googlesource.com/plugins/pull-replication) on
+the same branch of the @PLUGIN@ plugin and link it to the `gerrit/plugins` directory.
 
 ```
+  export BRANCH=$(git --git-dir=@PLUGIN@ branch)
+  git clone https://gerrit.googlesource.com/plugins/pull-replication
   cd gerrit/plugins
   rm external_plugin_deps.bzl
   ln -s @PLUGIN@/external_plugin_deps.bzl .
 ```
 
+Clone the [global-refdb](git clone "https://gerrit.googlesource.com/modules/global-refdb") on
+the same branch of the @PLUGIN@ plugin and link it to the `gerrit/plugins` directory.
+
+```
+  export BRANCH=$(git --git-dir=@PLUGIN@ branch)
+  git clone "https://gerrit.googlesource.com/modules/global-refdb"
+  cd gerrit/plugins
+  ln -s ../../global-refdb .
+```
+
 From the Gerrit source tree issue the command:
 
 ```
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 02806a1..e451cb3 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -3,15 +3,19 @@
 =========================
 
 The @PLUGIN@ plugin must be installed as a library module in the
-`$GERRIT_SITE/lib` folder of all the instances. Configuration should
-be specified in the `$site_path/etc/@PLUGIN@.config` file.
+`$GERRIT_SITE/lib` folder of all the instances and linked to
+`$GERRIT_SITE/plugins`, which enable to use it as both libModule
+and plugin.
+Configuration should be specified in the `$site_path/etc/@PLUGIN@.config` file.
 
 ## Configuration parameters
 
 ```cache.synchronize```
-:   Whether to run cache evictions synchronously. It requires disabling the
-    background cache evictions notifications in `gerrit.config` by setting
-    `cache.threads = 0`.
+:   Whether to synchronize cache evictions. Set to false when relying on
+    low cache TTLs and therefore cache eviction is not strictly needed.
+    It requires disabling the background cache evictions notifications in
+    `gerrit.config` by setting `cache.threads = 0`.
+
     Defaults to true.
 
 ```cache.threadPoolSize```
@@ -29,7 +33,8 @@
     forwarded.
 
 ```event.synchronize```
-:   Whether to synchronize stream events.
+:   Whether to synchronize stream events. Set to false when not using the SSH
+    stream events.
     Defaults to true.
 
 ```index.numStripedLocks```
@@ -37,7 +42,9 @@
     Defaults to 10
 
 ```index.synchronize```
-:   Whether to synchronize secondary indexes.
+:   Whether to synchronize secondary indexes. Set to false when using multi-site
+    on Gerrit replicas that do not have an index, or when using an external
+    service such as ElasticSearch.
     Defaults to true.
 
 ```index.threadPoolSize```
@@ -72,10 +79,19 @@
 :   Name of the topic to use for publishing cache eviction events
     Defaults to GERRIT.EVENT.PROJECT.LIST
 
+**NOTE**: All broker settings are ignored when all of the `cache`,
+`index` or `event` synchronization is disabled.
+
 ```ref-database.enabled```
 :   Enable the use of a shared ref-database
     Defaults: true
 
+```ref-database.replicationLagRefreshInterval```
+:   Enable the auto-refresh of the metrics to trace the auto-replication
+    lag by polling on a regular basis. Set to zero for disabling the polling
+    mechanism.
+    Defaults: 60 min
+
 ```ref-database.enforcementRules.<policy>```
 :   Level of consistency enforcement across sites on a project:refs basis.
     Supports two values for enforcing the policy on multiple projects or refs.
@@ -123,3 +139,80 @@
     the project `foo/bar`, but no other project.
 
     By default, all projects are matched.
+
+```replication.push-filter.minWaitBeforeReloadLocalVersionMs```
+:   Specifies the minimum amount of time in milliseconds replication plugin filter will
+    wait before retrying check for ref which is not up to date with global-refdb.
+
+    By default: 1000 milliseconds
+
+```replication.push-filter.maxRandomWaitBeforeReloadLocalVersionMs```
+:   Specifies the additional amount of time in milliseconds replication filter will
+    wait before retrying check for ref which is not up to date with global-refdb.
+
+    If maxRandomWaitBeforeReloadLocalVersionMs is set to zero random sleep for not in sync
+    refs is disabled.
+
+    By default: 1000 milliseconds
+
+```replication.fetch-filter.minWaitBeforeReloadLocalVersionMs```
+:   Specifies the minimum amount of time in milliseconds pull-replication filter wait
+    before retrying check for ref which is not up to date with global-refdb.
+
+    By default: 1000 milliseconds
+
+```replication.fetch-filter.maxRandomWaitBeforeReloadLocalVersionMs```
+:   Specifies the additional amount of time in milliseconds pull-replication filter will
+    wait before retrying check for ref which is not up to date with global-refdb.
+
+    If maxRandomWaitBeforeReloadLocalVersionMs is set to zero random sleep for not in sync
+    refs is disabled.
+
+    By default: 1000 milliseconds
+
+## Replication filters
+
+The @PLUGIN@ plugin is also responsible for filtering out replication events that may
+risk to create a split-brain situation.
+It integrates the push and pull replication filtering extension points for validating
+the refs to be replicated and dropping some of them.
+
+**Replication plugin**
+
+When using the Gerrit core replication plugin, also known as push-replication, link the
+`replication.jar` to the `$GERRIT_SITE/lib` directory and add the following libModule
+to `gerrit.config`:
+
+```
+[gerrit]
+  installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule
+```
+
+The above configuration would be automatically detected by the @PLUGIN@ plugin which would then
+install the PushReplicationFilterModule for filtering outgoing replication refs based
+on their global-refdb status:
+
+- Outgoing replication of refs that are NOT up-to-date with the global-refdb will be
+  discarded, because they may cause split-brain on the remote replication endpoints.
+
+- All other refs will be pushed as normal to the remote replication ends.
+
+**Pull-replication plugin**
+
+When using the [pull-replication](https://gerrit.googlesource.com/plugins/pull-replication)
+plugin, link the `pull-replication.jar` to the `$GERRIT_SITE/lib` directory and add the following
+two libModules to `gerrit.config`:
+
+```
+[gerrit]
+        installModule = com.googlesource.gerrit.plugins.replication.pull.ReplicationExtensionPointModule
+```
+
+The above configuration would be automatically detected by the @PLUGIN@ plugin which would then
+install the PullReplicationFilterModule for filtering incoming fetch replication refs based
+on their global-refdb status.
+
+- Incoming replication of refs that locally are already up-to-date with the global-refdb will be
+  discarded, because they would not add anything more to the current status of the local refs.
+
+- All other refs will be fetched as normal from the replication sources.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
deleted file mode 100644
index 2df60dd..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-// Copyright (C) 2017 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.gerrit.server.config.SitePaths;
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ModuleTest {
-
-  @Mock(answer = Answers.RETURNS_DEEP_STUBS)
-  private Configuration configMock;
-
-  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-  private Module module;
-
-  @Before
-  public void setup() {
-    module = new Module(configMock);
-  }
-
-  @Test
-  public void shouldGetInstanceId() throws Exception {
-    File tmpSitePath = tempFolder.newFolder();
-    File tmpPluginDataPath =
-        Paths.get(tmpSitePath.getPath(), "data", Configuration.PLUGIN_NAME).toFile();
-    tmpPluginDataPath.mkdirs();
-    Path path = Paths.get(tmpPluginDataPath.getPath(), Configuration.INSTANCE_ID_FILE);
-    SitePaths sitePaths = new SitePaths(Paths.get(tmpSitePath.getPath()));
-    assertThat(path.toFile().exists()).isFalse();
-
-    UUID gotUUID1 = module.getInstanceId(sitePaths);
-    assertThat(gotUUID1).isNotNull();
-    assertThat(path.toFile().exists()).isTrue();
-
-    UUID gotUUID2 = module.getInstanceId(sitePaths);
-    assertThat(gotUUID1).isEqualTo(gotUUID2);
-  }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 92fa101..7d1751c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -1,15 +1,19 @@
 package com.googlesource.gerrit.plugins.multisite.broker;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -22,28 +26,35 @@
   @Mock private BrokerApi brokerApi;
   @Mock Event event;
   @Mock MessageLogger msgLog;
-  private UUID instanceId = UUID.randomUUID();
   private String topic = "index";
 
   private BrokerApiWrapper objectUnderTest;
 
   @Before
   public void setUp() {
+    event.instanceId = "instance-id";
     objectUnderTest =
         new BrokerApiWrapper(
-            DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
+            MoreExecutors.directExecutor(),
+            DynamicItem.itemOf(BrokerApi.class, brokerApi),
+            brokerMetrics,
+            msgLog);
   }
 
   @Test
   public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
-    when(brokerApi.send(any(), any())).thenReturn(true);
+    SettableFuture<Boolean> resultF = SettableFuture.create();
+    resultF.set(true);
+    when(brokerApi.send(any(), any())).thenReturn(resultF);
     objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
   }
 
   @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
-    when(brokerApi.send(any(), any())).thenReturn(false);
+    SettableFuture<Boolean> resultF = SettableFuture.create();
+    resultF.setException(new Exception("Force Future failure"));
+    when(brokerApi.send(any(), any())).thenReturn(resultF);
     objectUnderTest.send(topic, event);
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
@@ -53,10 +64,26 @@
     when(brokerApi.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
-      objectUnderTest.send(topic, event);
+      objectUnderTest.sendSync(topic, event);
     } catch (RuntimeException e) {
       // expected
     }
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
+
+  @Test
+  public void shouldSkipMessageSendingWhenInstanceIdIsNull() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = null;
+    objectUnderTest.send(topic, event);
+    verify(brokerApi, never()).send(any(), eq(event));
+  }
+
+  @Test
+  public void shouldSkipMessageSendingWhenInstanceIdIsEmpty() {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.instanceId = "";
+    objectUnderTest.send(topic, event);
+    verify(brokerApi, never()).send(any(), eq(event));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
index 67be583..ce222d0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/CacheEvictionHandlerTest.java
@@ -36,9 +36,10 @@
 
   @Test
   public void shouldNotPublishAccountsCacheEvictions() {
-
+    String instanceId = "instance-id";
     final CacheEvictionHandler<String, String> handler =
-        new CacheEvictionHandler<>(DynamicSet.emptySet(), executorMock, defaultCacheMatcher);
+        new CacheEvictionHandler<>(
+            DynamicSet.emptySet(), executorMock, defaultCacheMatcher, instanceId);
 
     handler.onRemoval(
         "test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
index 4263ddb..c8216bd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/cache/ProjectListUpdateHandlerTest.java
@@ -41,6 +41,8 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ProjectListUpdateHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
+
   private ProjectListUpdateHandler handler;
 
   @Mock private ProjectListUpdateForwarder forwarder;
@@ -51,7 +53,7 @@
     when(projectsFilter.matches(any(String.class))).thenReturn(true);
     handler =
         new ProjectListUpdateHandler(
-            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter);
+            asDynamicSet(forwarder), MoreExecutors.directExecutor(), projectsFilter, INSTANCE_ID);
   }
 
   private DynamicSet<ProjectListUpdateForwarder> asDynamicSet(
@@ -69,7 +71,8 @@
     handler.onNewProjectCreated(event);
     verify(forwarder)
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, false)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID)));
   }
 
   @Test
@@ -80,7 +83,8 @@
     handler.onProjectDeleted(event);
     verify(forwarder)
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
   }
 
   @Test
@@ -101,18 +105,22 @@
     handler.onNewProjectCreated(event);
     verify(forwarder, never())
         .updateProjectList(
-            any(ProjectListUpdateTask.class), eq(new ProjectListUpdateEvent(projectName, true)));
+            any(ProjectListUpdateTask.class),
+            eq(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID)));
   }
 
   @Test
   public void testProjectUpdateTaskToString() throws Exception {
     String projectName = "someProjectName";
     ProjectListUpdateTask task =
-        handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false));
+        handler
+        .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, false, INSTANCE_ID));
     assertThat(task.toString())
         .isEqualTo(String.format("Update project list in target instance: add '%s'", projectName));
 
-    task = handler.new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true));
+    task =
+        handler
+        .new ProjectListUpdateTask(new ProjectListUpdateEvent(projectName, true, INSTANCE_ID));
     assertThat(task.toString())
         .isEqualTo(
             String.format("Update project list in target instance: remove '%s'", projectName));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
new file mode 100644
index 0000000..cb1f78e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubscriberTestBase.java
@@ -0,0 +1,162 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Configuration.Broker;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@Ignore
+public abstract class AbstractSubscriberTestBase {
+  protected static final String NODE_INSTANCE_ID = "node-instance-id";
+  protected static final String INSTANCE_ID = "other-node-instance-id";
+  protected static final String PROJECT_NAME = "project-name";
+
+  @Mock protected DroppedEventListener droppedEventListeners;
+  @Mock protected MessageLogger msgLog;
+  @Mock protected SubscriberMetrics subscriberMetrics;
+  @Mock protected Configuration cfg;
+  @Mock protected Broker brokerCfg;
+  @Mock protected ProjectsFilter projectsFilter;
+
+  @SuppressWarnings("rawtypes")
+  protected ForwardedEventRouter eventRouter;
+
+  protected AbstractSubcriber objectUnderTest;
+
+  @Before
+  public void setup() {
+    when(cfg.broker()).thenReturn(brokerCfg);
+    when(brokerCfg.getTopic(any(), any())).thenReturn("test-topic");
+    eventRouter = eventRouter();
+    objectUnderTest = objectUnderTest();
+  }
+
+  @Test
+  public void shouldConsumeEventsWhenNotFilteredByProjectName()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+      objectUnderTest.getConsumer().accept(event);
+      verifyConsumed(event);
+    }
+  }
+
+  @Test
+  public void shouldSkipEventsWhenFilteredByProjectName()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      when(projectsFilter.matches(any(String.class))).thenReturn(false);
+      objectUnderTest.getConsumer().accept(event);
+      verifySkipped(event);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldSkipLocalEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    for (Event event : events()) {
+      event.instanceId = NODE_INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(projectsFilter, never()).matches(PROJECT_NAME);
+      verify(eventRouter, never()).route(event);
+      verify(droppedEventListeners, times(1)).onEventDropped(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners);
+    }
+  }
+
+  @Test
+  public void shouldUpdateReplicationMetricsWithLocalEvents() {
+    for (Event event : events()) {
+      event.instanceId = NODE_INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+    }
+  }
+
+  @Test
+  public void shouldUpdateReplicationMetricsWithNonLocalEvents() {
+    for (Event event : events()) {
+      event.instanceId = INSTANCE_ID;
+      when(projectsFilter.matches(any(String.class))).thenReturn(true);
+
+      objectUnderTest.getConsumer().accept(event);
+
+      verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+      reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+    }
+  }
+
+  protected abstract AbstractSubcriber objectUnderTest();
+
+  protected abstract List<Event> events();
+
+  @SuppressWarnings("rawtypes")
+  protected abstract ForwardedEventRouter eventRouter();
+
+  @SuppressWarnings("unchecked")
+  protected void verifySkipped(Event event)
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, never()).route(event);
+    verify(droppedEventListeners, times(1)).onEventDropped(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void verifyConsumed(Event event)
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners);
+  }
+
+  protected DynamicSet<DroppedEventListener> asDynamicSet(DroppedEventListener listener) {
+    DynamicSet<DroppedEventListener> result = new DynamicSet<>();
+    result.add("multi-site", listener);
+    return result;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
new file mode 100644
index 0000000..5031f36
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/BatchIndexEventSubscriberTest.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class BatchIndexEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final boolean DELETED = false;
+  private static final int CHANGE_ID = 1;
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(IndexEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(
+        new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+        new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new BatchIndexEventSubscriber(
+        (IndexEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
new file mode 100644
index 0000000..a197c91
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/CallbackMetricMaker.java
@@ -0,0 +1,26 @@
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import com.google.gerrit.metrics.Counter1;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import com.google.gerrit.metrics.Field;
+import org.junit.Ignore;
+
+@Ignore
+public class CallbackMetricMaker extends DisabledMetricMaker {
+  private int callbackMetricCounter = 0;
+
+  public int getCallbackMetricCounter() {
+    return callbackMetricCounter;
+  }
+
+  @Override
+  public <F1> Counter1<F1> newCounter(String name, Description desc, Field<F1> field1) {
+    callbackMetricCounter += 1;
+    return super.newCounter(name, desc, field1);
+  }
+
+  public void resetCallbackMetricCounter() {
+    callbackMetricCounter = 0;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
new file mode 100644
index 0000000..0e63528
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriberTest.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Account;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.util.time.TimeUtil;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class IndexEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final boolean DELETED = false;
+  private static final int CHANGE_ID = 1;
+  private static final String EMPTY_PROJECT_NAME = "";
+
+  @Mock protected ChangeFinder changeFinderMock;
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeNonProjectAndNonChangeIndexingEventsTypes()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldConsumeDeleteChangeIndexEventWithEmptyProjectNameWhenFound()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    ChangeIndexEvent event = new ChangeIndexEvent(EMPTY_PROJECT_NAME, CHANGE_ID, true, INSTANCE_ID);
+
+    ChangeNotes changeNotesMock = mock(ChangeNotes.class);
+    when(changeNotesMock.getChange()).thenReturn(newChange());
+    when(changeFinderMock.findOne(any(Change.Id.class))).thenReturn(Optional.of(changeNotesMock));
+    when(projectsFilter.matches(PROJECT_NAME)).thenReturn(true);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, times(1)).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldNOTConsumeDeleteChangeIndexEventWithEmptyProjectNameWhenNotFound()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    ChangeIndexEvent event = new ChangeIndexEvent("", CHANGE_ID, true, INSTANCE_ID);
+
+    when(changeFinderMock.findOne(any(Change.Id.class))).thenReturn(Optional.empty());
+    when(projectsFilter.matches(EMPTY_PROJECT_NAME)).thenReturn(false);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, times(1)).matches(EMPTY_PROJECT_NAME);
+    verify(eventRouter, never()).route(event);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(IndexEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(
+        new ProjectIndexEvent(PROJECT_NAME, INSTANCE_ID),
+        new ChangeIndexEvent(PROJECT_NAME, CHANGE_ID, DELETED, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new IndexEventSubscriber(
+        (IndexEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter,
+        changeFinderMock);
+  }
+
+  private Change newChange() {
+    return new Change(
+        Change.key(Integer.toString(CHANGE_ID)),
+        Change.id(CHANGE_ID),
+        Account.id(9999),
+        BranchNameKey.create(Project.nameKey(PROJECT_NAME), "refs/heads/master"),
+        TimeUtil.nowTs());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
new file mode 100644
index 0000000..201ee81
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriberTest.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import java.util.List;
+
+public class ProjectUpdateEventSubscriberTest extends AbstractSubscriberTestBase {
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(ProjectListUpdateRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    return ImmutableList.of(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new ProjectUpdateEventSubscriber(
+        (ProjectListUpdateRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
index ce82954..7828062 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/ReplicationStatusTest.java
@@ -15,16 +15,27 @@
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.metrics.CallbackMetric1;
+import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.server.project.ProjectCache;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import org.eclipse.jgit.lib.Config;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,6 +48,7 @@
   @Mock private ProjectVersionLogger verLogger;
   @Mock private ProjectCache projectCache;
   @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
+  @Mock private CallbackMetric1<String, Long> perProjectReplicationLagMetricCallback;
   private ReplicationStatus objectUnderTest;
   private Cache<String, Long> replicationStatusCache;
 
@@ -48,7 +60,13 @@
     replicationStatusCache = CacheBuilder.newBuilder().build();
     objectUnderTest =
         new ReplicationStatus(
-            replicationStatusCache, Optional.of(projectVersionRefUpdate), verLogger, projectCache);
+            replicationStatusCache,
+            Optional.of(projectVersionRefUpdate),
+            verLogger,
+            projectCache,
+            Executors.newScheduledThreadPool(1),
+            new Configuration(new Config(), new Config()),
+            new DisabledMetricMaker());
   }
 
   @Test
@@ -57,6 +75,14 @@
     replicationStatusCache.put("projectB", 3L);
 
     objectUnderTest.start();
+    assertThat(objectUnderTest.getMaxLagMillis()).isEqualTo(10L);
+  }
+
+  @Test
+  public void shouldConvertMillisLagFromPersistedCacheOnStartToSecs() {
+    replicationStatusCache.put("projectA", 10000L);
+
+    objectUnderTest.start();
     assertThat(objectUnderTest.getMaxLag()).isEqualTo(10L);
   }
 
@@ -67,7 +93,7 @@
     objectUnderTest.start();
 
     objectUnderTest.doUpdateLag(Project.nameKey("projectA"), 20L);
-    assertThat(objectUnderTest.getMaxLag()).isEqualTo(20L);
+    assertThat(objectUnderTest.getMaxLagMillis()).isEqualTo(20L);
   }
 
   @Test
@@ -87,4 +113,127 @@
 
     assertThat(replicationStatusCache.getIfPresent("projectA")).isEqualTo(20L);
   }
+
+  @Test
+  public void shouldRemoveProjectFromPersistedCache() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+  }
+
+  @Test
+  public void shouldRemoveProjectFromReplicationLags() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName)).thenReturn(Optional.empty());
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).isEmpty();
+  }
+
+  @Test
+  public void shouldNotRemoveProjectFromReplicationLagsIfLocalVersionStillExists() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(System.currentTimeMillis()));
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(objectUnderTest.getReplicationLags(1).keySet()).containsExactly(projectName);
+  }
+
+  @Test
+  public void shouldNotEvictFromPersistentCacheIfLocalVersionStillExists() {
+    String projectName = "projectA";
+    long lag = 100;
+    setupReplicationLag(projectName, lag);
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(System.currentTimeMillis()));
+
+    objectUnderTest.onProjectDeleted(projectDeletedEvent(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(lag);
+  }
+
+  @Test
+  public void shouldUpdateReplicationLagForProject() {
+    String projectName = "projectA";
+    long projectLocalVersion = 10L;
+    long projectRemoteVersion = 20L;
+
+    when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+        .thenReturn(Optional.of(projectLocalVersion));
+    when(projectVersionRefUpdate.getProjectRemoteVersion(eq(projectName)))
+        .thenReturn(Optional.of(projectRemoteVersion));
+
+    objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+    objectUnderTest.replicationLagMetricPerProject(perProjectReplicationLagMetricCallback).run();
+
+    assertThat(replicationStatusCache.getIfPresent(projectName))
+        .isEqualTo(projectRemoteVersion - projectLocalVersion);
+    verify(perProjectReplicationLagMetricCallback)
+        .set(eq(projectName), eq(projectRemoteVersion - projectLocalVersion));
+  }
+
+  @Test
+  public void shouldNotGenerateCallbackMetricIfNoReplicationLag() {
+    String projectName = "projectA";
+    long projectLatestVersion = 10L;
+    when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+        .thenReturn(Optional.of(projectLatestVersion));
+
+    objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isNull();
+    verify(perProjectReplicationLagMetricCallback, never()).set(any(), any());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldAutoRefreshReplicationLagForProject() {
+    String projectName = "projectA";
+    long projectLocalVersion = 10L;
+    long projectRemoteVersion = 20L;
+
+    when(projectVersionRefUpdate.getProjectLocalVersion(eq(projectName)))
+        .thenReturn(Optional.of(projectLocalVersion), Optional.of(projectRemoteVersion));
+    when(projectVersionRefUpdate.getProjectRemoteVersion(eq(projectName)))
+        .thenReturn(Optional.of(projectRemoteVersion));
+
+    objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+
+    assertThat(replicationStatusCache.getIfPresent(projectName))
+        .isEqualTo(projectRemoteVersion - projectLocalVersion);
+    objectUnderTest.refreshProjectsWithLag();
+
+    assertThat(replicationStatusCache.getIfPresent(projectName)).isEqualTo(0);
+  }
+
+  private void setupReplicationLag(String projectName, long lag) {
+    long currentVersion = System.currentTimeMillis();
+    long newVersion = currentVersion + lag;
+    replicationStatusCache.put(projectName, 3L);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(projectName))
+        .thenReturn(Optional.of(newVersion));
+    when(projectVersionRefUpdate.getProjectLocalVersion(projectName))
+        .thenReturn(Optional.of(currentVersion));
+    objectUnderTest.updateReplicationLag(Project.nameKey(projectName));
+  }
+
+  private ProjectDeletedListener.Event projectDeletedEvent(String projectName) {
+    ProjectDeletedListener.Event event = mock(ProjectDeletedListener.Event.class);
+    when(event.getProjectName()).thenReturn(projectName);
+    return event;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
new file mode 100644
index 0000000..71fd828
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriberTest.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.consumer;
+
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.AccountIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.ReplicationScheduledEvent;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public class StreamEventSubscriberTest extends AbstractSubscriberTestBase {
+  private static final NameKey PROJECT_NAME_KEY = NameKey.parse(PROJECT_NAME);
+  private @Mock RefUpdatedEvent refUpdatedEvent;
+  private @Mock RefReplicationDoneEvent refReplicationDoneEvent;
+  private @Mock ReplicationScheduledEvent replicationScheduledEvent;
+  private @Mock RefReplicatedEvent refReplicatedEvent;
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  protected ForwardedEventRouter eventRouter() {
+    return mock(StreamEventRouter.class);
+  }
+
+  @Override
+  protected List<Event> events() {
+    when(refUpdatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refUpdatedEvent.instanceId = INSTANCE_ID;
+    when(refReplicationDoneEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refReplicationDoneEvent.instanceId = INSTANCE_ID;
+    when(replicationScheduledEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    replicationScheduledEvent.instanceId = INSTANCE_ID;
+    when(refReplicatedEvent.getProjectNameKey()).thenReturn(PROJECT_NAME_KEY);
+    refReplicatedEvent.instanceId = INSTANCE_ID;
+
+    return ImmutableList.of(
+        refUpdatedEvent, refReplicationDoneEvent, replicationScheduledEvent, refReplicatedEvent);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldNotConsumeNonProjectEventTypeEvents()
+      throws IOException, PermissionBackendException, CacheNotFoundException {
+    IndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(projectsFilter, never()).matches(PROJECT_NAME);
+    verify(eventRouter, times(1)).route(event);
+    verify(droppedEventListeners, never()).onEventDropped(event);
+  }
+
+  @Test
+  public void shouldUpdateReplicationMetricsWithRemoteRefEvent() {
+    Event event =
+        new RefEvent("ref-replicated") {
+
+          @Override
+          public String getRefName() {
+            return "foo-ref";
+          }
+
+          @Override
+          public NameKey getProjectNameKey() {
+            return Project.nameKey(PROJECT_NAME);
+          }
+        };
+
+    event.instanceId = INSTANCE_ID;
+    when(projectsFilter.matches(eq(PROJECT_NAME))).thenReturn(true);
+
+    objectUnderTest.getConsumer().accept(event);
+
+    verify(subscriberMetrics, times(1)).updateReplicationStatusMetrics(event);
+    reset(projectsFilter, eventRouter, droppedEventListeners, subscriberMetrics);
+  }
+
+  @Override
+  protected AbstractSubcriber objectUnderTest() {
+    return new StreamEventSubscriber(
+        (StreamEventRouter) eventRouter,
+        asDynamicSet(droppedEventListeners),
+        NODE_INSTANCE_ID,
+        msgLog,
+        subscriberMetrics,
+        cfg,
+        projectsFilter);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
index 4c45e07..6413b18 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetricsTest.java
@@ -14,23 +14,28 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
 import com.google.common.base.Suppliers;
 import com.google.common.cache.CacheBuilder;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.metrics.DisabledMetricMaker;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
 import com.google.gerrit.server.project.ProjectCache;
 import com.googlesource.gerrit.plugins.multisite.ProjectVersionLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate;
+import com.googlesource.gerrit.plugins.replication.events.ProjectDeletionReplicationSucceededEvent;
+import java.net.URISyntaxException;
 import java.util.Optional;
-import java.util.UUID;
+import java.util.concurrent.Executors;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.URIish;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,37 +48,36 @@
   private static final Project.NameKey A_TEST_PROJECT_NAME_KEY =
       Project.nameKey(A_TEST_PROJECT_NAME);
 
-  @Mock private SharedRefDatabaseWrapper sharedRefDb;
-  @Mock private GitReferenceUpdated gitReferenceUpdated;
   @Mock private MetricMaker metricMaker;
   @Mock private ProjectVersionLogger verLogger;
   @Mock private ProjectCache projectCache;
   @Mock private ProjectVersionRefUpdate projectVersionRefUpdate;
   private SubscriberMetrics metrics;
-  private EventMessage.Header msgHeader;
+  private ReplicationStatus replicationStatus;
 
   @Before
   public void setup() throws Exception {
-    msgHeader = new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID());
-    metrics =
-        new SubscriberMetrics(
-            metricMaker,
-            new ReplicationStatus(
-                CacheBuilder.newBuilder().build(),
-                Optional.of(projectVersionRefUpdate),
-                verLogger,
-                projectCache));
+    replicationStatus =
+        new ReplicationStatus(
+            CacheBuilder.newBuilder().build(),
+            Optional.of(projectVersionRefUpdate),
+            verLogger,
+            projectCache,
+            Executors.newScheduledThreadPool(1),
+            new com.googlesource.gerrit.plugins.multisite.Configuration(new Config(), new Config()),
+            new DisabledMetricMaker());
+    metrics = new SubscriberMetrics(metricMaker, replicationStatus);
   }
 
   @Test
   public void shouldLogProjectVersionWhenReceivingRefUpdatedEventWithoutLag() {
-    Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis() / 1000);
+    Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis());
     when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
@@ -82,20 +86,137 @@
 
   @Test
   public void shouldLogProjectVersionWhenReceivingRefUpdatedEventWithALag() {
-    Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis() / 1000);
+    Optional<Long> globalRefDbVersion = Optional.of(System.currentTimeMillis());
     long replicationLag = 60;
     when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLag));
     when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
         .thenReturn(globalRefDbVersion);
 
-    EventMessage eventMessage = new EventMessage(msgHeader, newRefUpdateEvent());
+    Event eventMessage = newRefUpdateEvent();
 
     metrics.updateReplicationStatusMetrics(eventMessage);
 
     verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, globalRefDbVersion.get(), replicationLag);
   }
 
+  @Test
+  public void
+      shouldLogUponProjectDeletionSuccessWhenLocalVersionDoesNotExistAndSubscriberMetricsExist()
+          throws Exception {
+    long nowSecs = System.currentTimeMillis();
+    long replicationLagSecs = 60;
+    Optional<Long> globalRefDbVersion = Optional.of(nowSecs);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLagSecs));
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion);
+
+    Event refUpdateEventMessage = newRefUpdateEvent();
+    metrics.updateReplicationStatusMetrics(refUpdateEventMessage);
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+        .isEqualTo(replicationLagSecs);
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(Optional.empty());
+
+    Event projectDeleteEventMessage = projectDeletionSuccess();
+    metrics.updateReplicationStatusMetrics(projectDeleteEventMessage);
+
+    verify(verLogger).logDeleted(A_TEST_PROJECT_NAME_KEY);
+  }
+
+  @Test
+  public void shouldNotLogUponProjectDeletionSuccessWhenSubscriberMetricsDoNotExist()
+      throws Exception {
+    Event eventMessage = projectDeletionSuccess();
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(Optional.empty());
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+
+    metrics.updateReplicationStatusMetrics(eventMessage);
+
+    verifyZeroInteractions(verLogger);
+  }
+
+  @Test
+  public void shouldNotLogUponProjectDeletionSuccessWhenLocalVersionStillExists() throws Exception {
+    Event eventMessage = projectDeletionSuccess();
+    Optional<Long> anyRefVersionValue = Optional.of(System.currentTimeMillis());
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(anyRefVersionValue);
+
+    metrics.updateReplicationStatusMetrics(eventMessage);
+
+    verifyZeroInteractions(verLogger);
+  }
+
+  @Test
+  public void shouldRemoveProjectMetricsUponProjectDeletionSuccess() throws Exception {
+    long nowSecs = System.currentTimeMillis();
+    long replicationLagSecs = 60;
+    Optional<Long> globalRefDbVersion = Optional.of(nowSecs);
+    when(projectVersionRefUpdate.getProjectRemoteVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion.map(ts -> ts + replicationLagSecs));
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(globalRefDbVersion);
+
+    Event eventMessage = newRefUpdateEvent();
+
+    metrics.updateReplicationStatusMetrics(eventMessage);
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME))
+        .isEqualTo(replicationLagSecs);
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isEqualTo(nowSecs);
+
+    when(projectVersionRefUpdate.getProjectLocalVersion(A_TEST_PROJECT_NAME))
+        .thenReturn(Optional.empty());
+    Event projectDeleteEvent = projectDeletionSuccess();
+
+    metrics.updateReplicationStatusMetrics(projectDeleteEvent);
+
+    assertThat(replicationStatus.getReplicationStatus(A_TEST_PROJECT_NAME)).isNull();
+    assertThat(replicationStatus.getLocalVersion(A_TEST_PROJECT_NAME)).isNull();
+  }
+
+  @Test
+  public void shouldDoNothingIfNameIsValid() {
+    String validProjectName = "aValidProject-Name123";
+
+    assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo(validProjectName);
+  }
+
+  @Test
+  public void shouldSanitizeNameWithDot() {
+    String validProjectName = "nameWithA.InTheMiddle";
+
+    assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo("nameWithA_2eInTheMiddle");
+  }
+
+  @Test
+  public void shouldSanitizeNameWithSlash() {
+    String validProjectName = "nameWithA/InTheMiddle";
+
+    assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo("nameWithA_2fInTheMiddle");
+  }
+
+  @Test
+  public void shouldDoubleUnderscoresInName() {
+    String validProjectName = "nameWithA_InTheMiddle";
+
+    assertThat(metrics.sanitizeProjectName(validProjectName)).isEqualTo("nameWithA__InTheMiddle");
+  }
+
+  private ProjectDeletionReplicationSucceededEvent projectDeletionSuccess()
+      throws URISyntaxException {
+    return new ProjectDeletionReplicationSucceededEvent(
+        A_TEST_PROJECT_NAME, new URIish("git://target"));
+  }
+
   private RefUpdatedEvent newRefUpdateEvent() {
     RefUpdateAttribute refUpdate = new RefUpdateAttribute();
     refUpdate.project = A_TEST_PROJECT_NAME;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index 76c001f..6762ba2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -34,6 +34,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class CacheEvictionEventRouterTest {
 
+  private static final String INSTANCE_ID = "instance-id";
   private static Gson gson = new EventGsonProvider().get();
   private CacheEvictionEventRouter router;
   @Mock private ForwardedCacheEvictionHandler cacheEvictionHandler;
@@ -45,7 +46,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_CacheEviction() throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key");
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "key", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -54,7 +55,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_CacheEvictionWithSlash()
       throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/key");
+    final CacheEvictionEvent event = new CacheEvictionEvent("cache", "some/key", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler).evict(CacheEntry.from(event.cacheName, event.key));
@@ -63,7 +64,8 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectCacheEvictionWithSlash()
       throws Exception {
-    final CacheEvictionEvent event = new CacheEvictionEvent(Constants.PROJECTS, "some/project");
+    final CacheEvictionEvent event =
+        new CacheEvictionEvent(Constants.PROJECTS, "some/project", INSTANCE_ID);
     router.route(event);
 
     verify(cacheEvictionHandler)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 1dc07bd..413b6c1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -33,7 +33,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.replication.RefReplicationDoneEvent;
+import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.util.Optional;
 import org.eclipse.jgit.lib.ObjectId;
 import org.junit.Before;
@@ -44,7 +44,7 @@
 
 @RunWith(MockitoJUnitRunner.class)
 public class IndexEventRouterTest {
-
+  private static final String INSTANCE_ID = "instance-id";
   private IndexEventRouter router;
   @Mock private ForwardedIndexAccountHandler indexAccountHandler;
   @Mock private ForwardedIndexChangeHandler indexChangeHandler;
@@ -61,12 +61,13 @@
             indexChangeHandler,
             indexGroupHandler,
             indexProjectHandler,
-            allUsersName);
+            allUsersName,
+            INSTANCE_ID);
   }
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_AccountIndex() throws Exception {
-    final AccountIndexEvent event = new AccountIndexEvent(1);
+    final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
     router.route(event);
 
     verify(indexAccountHandler)
@@ -80,7 +81,7 @@
 
     StreamEventRouter streamEventRouter = new StreamEventRouter(forwardedEventHandler, router);
 
-    final AccountIndexEvent event = new AccountIndexEvent(1);
+    final AccountIndexEvent event = new AccountIndexEvent(1, INSTANCE_ID);
     router.route(event);
 
     verify(indexAccountHandler)
@@ -96,7 +97,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
-    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId());
+    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId(), INSTANCE_ID);
     router.route(event);
 
     verify(indexGroupHandler)
@@ -108,7 +109,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectIndex() throws Exception {
     final String projectName = "projectName";
-    final ProjectIndexEvent event = new ProjectIndexEvent(projectName);
+    final ProjectIndexEvent event = new ProjectIndexEvent(projectName, INSTANCE_ID);
     router.route(event);
 
     verify(indexProjectHandler)
@@ -119,7 +120,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndex() throws Exception {
-    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false);
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, false, INSTANCE_ID);
     router.route(event);
 
     verify(indexChangeHandler)
@@ -133,7 +134,7 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ChangeIndexDelete() throws Exception {
-    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true);
+    final ChangeIndexEvent event = new ChangeIndexEvent("projectName", 3, true, INSTANCE_ID);
     router.route(event);
 
     verify(indexChangeHandler)
@@ -147,7 +148,7 @@
 
   @Test
   public void routerShouldFailForNotRecognisedEvents() throws Exception {
-    final IndexEvent newEventType = new IndexEvent("new-type") {};
+    final IndexEvent newEventType = new IndexEvent("new-type", INSTANCE_ID) {};
 
     assertThrows(UnsupportedOperationException.class, () -> router.route(newEventType));
     verifyZeroInteractions(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
index 93daf92..8a21c39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
@@ -38,7 +38,8 @@
 
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_ProjectListUpdate() throws Exception {
-    final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false);
+    String instanceId = "instance-id";
+    final ProjectListUpdateEvent event = new ProjectListUpdateEvent("project", false, instanceId);
     router.route(event);
 
     verify(projectListUpdateHandler).update(event);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
index e3d1ae0..28bf56d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/BrokerForwarderTest.java
@@ -68,10 +68,10 @@
     }
   }
 
-  public class TestEvent extends MultiSiteEvent {
+  public static class TestEvent extends MultiSiteEvent {
 
     protected TestEvent() {
-      super("test");
+      super("test", "instance-id");
     }
   }
 
@@ -93,7 +93,7 @@
   @Test
   public void shouldSendEventToBrokerFromGenericSourceThread() {
     brokerForwarder.send(newForwarderTask(), testTopic, testEvent);
-    verify(brokerMock).send(eq(testTopicName), eq(testEvent));
+    verify(brokerMock).sendSync(eq(testTopicName), eq(testEvent));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
index 826e154..75596ed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Sets;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.ProjectInput;
 import com.google.gerrit.extensions.registration.DynamicSet;
@@ -123,8 +124,10 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldEvictProjectCache() throws Exception {
-    objectUnderTest.route(new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, project.get()));
+    objectUnderTest.route(
+        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, project.get(), "instance-id"));
     evictionsCacheTracker.waitForExpectedEvictions();
 
     assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
@@ -132,6 +135,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "instance-id")
   public void shouldEvictProjectCacheWithSlash() throws Exception {
     ProjectInput in = new ProjectInput();
     in.name = name("my/project");
@@ -141,7 +145,7 @@
     restartCacheEvictionsTracking();
 
     objectUnderTest.route(
-        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, projectNameKey.get()));
+        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, projectNameKey.get(), "instance-id"));
 
     evictionsCacheTracker.waitForExpectedEvictions();
     assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
index 90ae8da..dbd358d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.mockito.Mockito.doReturn;
 
 import com.google.common.cache.Cache;
@@ -48,10 +49,12 @@
   public void shouldThrowAnExceptionWhenCacheNotFound() throws Exception {
     CacheEntry entry = new CacheEntry("somePlugin", "unexistingCache", null);
 
-    exception.expect(CacheNotFoundException.class);
-    exception.expectMessage(
-        String.format("cache %s.%s not found", entry.getPluginName(), entry.getCacheName()));
-    handler.evict(entry);
+    CacheNotFoundException thrown =
+        assertThrows(CacheNotFoundException.class, () -> handler.evict(entry));
+    assertThat(thrown)
+        .hasMessageThat()
+        .isEqualTo(
+            String.format("cache %s.%s not found", entry.getPluginName(), entry.getCacheName()));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
index 32c6319..02efb77 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexAccountHandlerTest.java
@@ -61,9 +61,11 @@
 
   @Test
   public void deleteIsNotSupported() throws Exception {
-    exception.expect(UnsupportedOperationException.class);
-    exception.expectMessage("Delete from account index not supported");
-    handler.index(id, Operation.DELETE, Optional.empty());
+    UnsupportedOperationException thrown =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> handler.index(id, Operation.DELETE, Optional.empty()));
+    assertThat(thrown).hasMessageThat().isEqualTo("Delete from account index not supported");
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index 96470b6..b827cdf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -17,6 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.never;
@@ -49,6 +50,7 @@
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardedIndexChangeHandlerTest {
@@ -62,6 +64,8 @@
   private static final boolean THROW_STORAGE_EXCEPTION = true;
   private static final boolean CHANGE_UP_TO_DATE = true;
   private static final boolean CHANGE_OUTDATED = false;
+  private static final boolean CHANGE_CONSISTENT = true;
+  private static final boolean CHANGE_INCONSISTENT = false;
 
   @Rule public ExpectedException exception = ExpectedException.none();
   @Mock private ChangeIndexer indexerMock;
@@ -88,6 +92,7 @@
     when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
     when(configurationMock.index()).thenReturn(index);
     when(index.numStripedLocks()).thenReturn(10);
+    when(index.maxTries()).thenReturn(1);
     handler =
         new ForwardedIndexChangeHandler(
             indexerMock, configurationMock, indexExecutorMock, ctxMock, changeCheckerFactoryMock);
@@ -95,16 +100,40 @@
 
   @Test
   public void changeIsIndexedWhenUpToDate() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
     verify(indexerMock, times(1)).index(any(Change.class));
   }
 
   @Test
   public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED, CHANGE_CONSISTENT);
     handler.index(
-        TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false)));
+        TEST_CHANGE_ID,
+        Operation.INDEX,
+        Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
+    verify(indexerMock, times(1)).index(any(Change.class));
+  }
+
+  @Test
+  public void changeIsIndexeAtFirstRetryWhenInitiallyInconsistent() throws Exception {
+    setupChangeAccessRelatedMocks(
+        CHANGE_EXISTS,
+        DO_NOT_THROW_STORAGE_EXCEPTION,
+        CHANGE_UP_TO_DATE,
+        CHANGE_INCONSISTENT,
+        CHANGE_CONSISTENT);
+    handler.index(
+        TEST_CHANGE_ID,
+        Operation.INDEX,
+        Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
+    verify(indexerMock, never()).index(any(Change.class));
+    verify(indexExecutorMock, times(1)).schedule(any(Runnable.class), anyLong(), any());
+
+    handler.index(
+        TEST_CHANGE_ID,
+        Operation.INDEX,
+        Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id")));
     verify(indexerMock, times(1)).index(any(Change.class));
   }
 
@@ -124,9 +153,11 @@
 
   @Test
   public void indexerThrowsStorageExceptionTryingToIndexChange() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE);
-    exception.expect(StorageException.class);
-    handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
+    setupChangeAccessRelatedMocks(
+        CHANGE_EXISTS, THROW_STORAGE_EXCEPTION, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT);
+    assertThrows(
+        StorageException.class,
+        () -> handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()));
   }
 
   @Test
@@ -175,11 +206,21 @@
 
   private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate)
       throws Exception {
-    setupChangeAccessRelatedMocks(changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate);
+    setupChangeAccessRelatedMocks(
+        changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate, CHANGE_CONSISTENT);
   }
 
   private void setupChangeAccessRelatedMocks(
-      boolean changeExists, boolean storageException, boolean changeIsUpToDate)
+      boolean changeExist, boolean changeUpToDate, boolean changeConsistent) throws Exception {
+    setupChangeAccessRelatedMocks(
+        changeExist, DO_NOT_THROW_STORAGE_EXCEPTION, changeUpToDate, changeConsistent);
+  }
+
+  private void setupChangeAccessRelatedMocks(
+      boolean changeExists,
+      boolean storageException,
+      boolean changeIsUpToDate,
+      boolean... changeConsistentReturnValues)
       throws StorageException {
     if (changeExists) {
       when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock);
@@ -190,5 +231,11 @@
     }
 
     when(changeCheckerPresentMock.isUpToDate(any())).thenReturn(changeIsUpToDate);
+
+    OngoingStubbing<Boolean> changeConsistentCall =
+        when(changeCheckerPresentMock.isChangeConsistent());
+    for (boolean changeConsistent : changeConsistentReturnValues) {
+      changeConsistentCall = changeConsistentCall.thenReturn(changeConsistent);
+    }
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index 982ac52..b49dbfa 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -74,9 +74,11 @@
 
   @Test
   public void deleteIsNotSupported() throws Exception {
-    exception.expect(UnsupportedOperationException.class);
-    exception.expectMessage("Delete from group index not supported");
-    handler.index(uuid, Operation.DELETE, Optional.empty());
+    UnsupportedOperationException thrown =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> handler.index(uuid, Operation.DELETE, Optional.empty()));
+    assertThat(thrown).hasMessageThat().isEqualTo("Delete from group index not supported");
   }
 
   @Test
@@ -141,6 +143,6 @@
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid) {
-    return Optional.of(new GroupIndexEvent(uuid, null));
+    return Optional.of(new GroupIndexEvent(uuid, null, "instance-id"));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
index 3ce5e14..72b9427 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -73,9 +73,11 @@
 
   @Test
   public void deleteIsNotSupported() throws Exception {
-    exception.expect(UnsupportedOperationException.class);
-    exception.expectMessage("Delete from project index not supported");
-    handler.index(nameKey, Operation.DELETE, Optional.empty());
+    UnsupportedOperationException thrown =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> handler.index(nameKey, Operation.DELETE, Optional.empty()));
+    assertThat(thrown).hasMessageThat().isEqualTo("Delete from project index not supported");
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
index 9893ce7..2412c79 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedProjectListUpdateHandlerTest.java
@@ -34,6 +34,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class ForwardedProjectListUpdateHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
   private static final String PROJECT_NAME = "someProject";
   private static final String SOME_MESSAGE = "someMessage";
   private static final Project.NameKey PROJECT_KEY = Project.nameKey(PROJECT_NAME);
@@ -48,13 +49,13 @@
 
   @Test
   public void testSuccessfulAdd() throws Exception {
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
     verify(projectCacheMock).onCreateProject(PROJECT_KEY);
   }
 
   @Test
   public void testSuccessfulRemove() throws Exception {
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
     verify(projectCacheMock).remove(PROJECT_KEY);
   }
 
@@ -72,7 +73,7 @@
         .onCreateProject(PROJECT_KEY);
 
     assertThat(Context.isForwardedEvent()).isFalse();
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID));
     assertThat(Context.isForwardedEvent()).isFalse();
 
     verify(projectCacheMock).onCreateProject(PROJECT_KEY);
@@ -92,7 +93,7 @@
         .remove(PROJECT_KEY);
 
     assertThat(Context.isForwardedEvent()).isFalse();
-    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true));
+    handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID));
     assertThat(Context.isForwardedEvent()).isFalse();
 
     verify(projectCacheMock).remove(PROJECT_KEY);
@@ -113,7 +114,7 @@
     RuntimeException thrown =
         assertThrows(
             RuntimeException.class,
-            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false)));
+            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, false, INSTANCE_ID)));
     assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
     assertThat(Context.isForwardedEvent()).isFalse();
 
@@ -135,7 +136,7 @@
     RuntimeException thrown =
         assertThrows(
             RuntimeException.class,
-            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true)));
+            () -> handler.update(new ProjectListUpdateEvent(PROJECT_NAME, true, INSTANCE_ID)));
     assertThat(thrown).hasMessageThat().isEqualTo(SOME_MESSAGE);
     assertThat(Context.isForwardedEvent()).isFalse();
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
index 0739eab..f940fc4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/http/ReplicationStatusServletIT.java
@@ -24,9 +24,12 @@
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.RestResponse;
 import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.httpd.restapi.RestApiServlet;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
 import com.google.inject.Scopes;
 import com.google.inject.multibindings.OptionalBinder;
 import com.googlesource.gerrit.plugins.multisite.Log4jProjectVersionLogger;
@@ -56,13 +59,15 @@
   private ReplicationStatus replicationStatus;
 
   public static class TestModule extends AbstractModule {
+    @Inject WorkQueue workQueue;
+
     @Override
     protected void configure() {
       install(new ForwarderModule());
       install(new CacheModule());
       install(new RouterModule());
       install(new IndexModule());
-      install(new ReplicationStatusModule());
+      install(new ReplicationStatusModule(workQueue));
       SharedRefDbConfiguration sharedRefDbConfig =
           new SharedRefDbConfiguration(new Config(), "multi-site");
       bind(SharedRefDbConfiguration.class).toInstance(sharedRefDbConfig);
@@ -81,6 +86,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldSucceedForAdminUsers() throws Exception {
     RestResponse result = adminRestSession.get(LAG_ENDPOINT);
 
@@ -89,6 +95,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldFailWhenUserHasNoAdminServerCapability() throws Exception {
     RestResponse result = userRestSession.get(LAG_ENDPOINT);
     result.assertForbidden();
@@ -96,6 +103,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldReturnCurrentProjectLag() throws Exception {
     replicationStatus.doUpdateLag(Project.nameKey("foo"), 123L);
 
@@ -106,6 +114,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldReturnProjectsOrderedDescendinglyByLag() throws Exception {
     replicationStatus.doUpdateLag(Project.nameKey("bar"), 123L);
     replicationStatus.doUpdateLag(Project.nameKey("foo"), 3L);
@@ -118,6 +127,7 @@
   }
 
   @Test
+  @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldHonourTheLimitParameter() throws Exception {
     replicationStatus.doUpdateLag(Project.nameKey("bar"), 1L);
     replicationStatus.doUpdateLag(Project.nameKey("foo"), 2L);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
index 6403cce..09f8f59 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
@@ -101,7 +101,7 @@
   }
 
   private Optional<GroupIndexEvent> groupIndexEvent(String uuid, @Nullable ObjectId sha1) {
-    return Optional.of(new GroupIndexEvent(uuid, sha1));
+    return Optional.of(new GroupIndexEvent(uuid, sha1, "instance-id"));
   }
 
   private void setCommitExistsInRepo(boolean commitExists) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
index 93cec05..099f1dd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
@@ -16,7 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.google.gerrit.server.events.EventGsonProvider;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
 import java.util.UUID;
@@ -24,13 +24,14 @@
 import org.junit.Test;
 
 public class GroupEventIndexTest {
+  private static final String INSTANCE_ID = "instance-id";
   private static final Gson gson = new EventGsonProvider().get();
 
   @Test
   public void groupEventIndexRoundTripWithSha1() {
     String aGroupUUID = UUID.randomUUID().toString();
     ObjectId anObjectId = ObjectId.fromString("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
-    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId);
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId, INSTANCE_ID);
 
     assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
   }
@@ -38,7 +39,7 @@
   @Test
   public void groupEventIndexRoundTripWithoutSha1() {
     String aGroupUUID = UUID.randomUUID().toString();
-    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null);
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null, INSTANCE_ID);
 
     assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index 660a302..8cf6ea9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -14,16 +14,21 @@
 
 package com.googlesource.gerrit.plugins.multisite.index;
 
+import static com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationEndpoints.APPLY_OBJECT_API_ENDPOINT;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.index.IndexEventHandler.IndexProjectTask;
@@ -36,6 +41,8 @@
 @RunWith(MockitoJUnitRunner.class)
 public class IndexEventHandlerTest {
 
+  private static final String INSTANCE_ID = "instance-id";
+
   private IndexEventHandler eventHandler;
 
   @Mock private ProjectsFilter projectsFilter;
@@ -50,7 +57,8 @@
             asDynamicSet(forwarder),
             changeChecker,
             projectsFilter,
-            new TestGroupChecker(true));
+            new TestGroupChecker(true),
+            INSTANCE_ID);
   }
 
   private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
@@ -65,7 +73,7 @@
 
     eventHandler.onProjectIndexed("test_project");
     verify(forwarder, never())
-        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project")));
+        .index(any(IndexProjectTask.class), eq(new ProjectIndexEvent("test_project", INSTANCE_ID)));
   }
 
   @Test
@@ -76,4 +84,25 @@
     eventHandler.onChangeIndexed("test_project", changeId);
     verifyZeroInteractions(changeChecker);
   }
+
+  @Test
+  public void shouldNotForwardIndexChangeIfCurrentThreadIsPullReplicationApplyObject()
+      throws Exception {
+    String currentThreadName = Thread.currentThread().getName();
+    try {
+      Thread.currentThread().setName("pull-replication~" + APPLY_OBJECT_API_ENDPOINT);
+      int changeId = 1;
+      Context.setForwardedEvent(false);
+      lenient().when(projectsFilter.matches(any(String.class))).thenReturn(true);
+      lenient()
+          .when(changeChecker.create(anyString()))
+          .thenThrow(
+              new IllegalStateException("Change indexing event should have not been triggered"));
+
+      eventHandler.onChangeIndexed("test_project", changeId);
+      verifyNoInteractions(changeChecker);
+    } finally {
+      Thread.currentThread().setName(currentThreadName);
+    }
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java
new file mode 100644
index 0000000..fc2259f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/DisabledSharedRefLogger.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefLogger;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.Ignore;
+
+@Ignore
+public class DisabledSharedRefLogger implements SharedRefLogger {
+
+  @Override
+  public void logRefUpdate(String project, Ref currRef, ObjectId newRefValue) {}
+
+  @Override
+  public void logProjectDelete(String project) {}
+
+  @Override
+  public void logLockAcquisition(String project, String refName) {}
+
+  @Override
+  public void logLockRelease(String project, String refName) {}
+
+  @Override
+  public <T> void logRefUpdate(String project, String refName, T currRef, T newRefValue) {}
+
+  @Override
+  public <T> void logRefUpdate(String project, String refName, T newRefValue) {}
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/FakeSharedRefDatabaseWrapper.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/FakeSharedRefDatabaseWrapper.java
new file mode 100644
index 0000000..bf98a4b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/FakeSharedRefDatabaseWrapper.java
@@ -0,0 +1,84 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDBMetrics;
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.DisabledMetricMaker;
+import java.util.Arrays;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.junit.Ignore;
+
+@Ignore
+public class FakeSharedRefDatabaseWrapper extends SharedRefDatabaseWrapper {
+
+  public FakeSharedRefDatabaseWrapper(String... rejectedRefs) {
+
+    super(
+        DynamicItem.itemOf(
+            GlobalRefDatabase.class,
+            new GlobalRefDatabase() {
+
+              @Override
+              public boolean isUpToDate(Project.NameKey project, Ref ref)
+                  throws GlobalRefDbLockException {
+                return !Arrays.stream(rejectedRefs).anyMatch(r -> r.equals(ref.getName()));
+              }
+
+              @Override
+              public boolean exists(Project.NameKey project, String refName) {
+                return true;
+              }
+
+              @Override
+              public boolean compareAndPut(
+                  Project.NameKey project, Ref currRef, ObjectId newRefValue)
+                  throws GlobalRefDbSystemError {
+                return false;
+              }
+
+              @Override
+              public <T> boolean compareAndPut(
+                  Project.NameKey project, String refName, T currValue, T newValue)
+                  throws GlobalRefDbSystemError {
+                return false;
+              }
+
+              @Override
+              public AutoCloseable lockRef(Project.NameKey project, String refName)
+                  throws GlobalRefDbLockException {
+                return null;
+              }
+
+              @Override
+              public void remove(Project.NameKey project) throws GlobalRefDbSystemError {}
+
+              @Override
+              public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
+                  throws GlobalRefDbSystemError {
+                return Optional.empty();
+              }
+            }),
+        new DisabledSharedRefLogger(),
+        new SharedRefDBMetrics(new DisabledMetricMaker()));
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilterTest.java
new file mode 100644
index 0000000..e09f6f6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationFetchFilterTest.java
@@ -0,0 +1,258 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
+import java.util.Optional;
+import java.util.Set;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MultisiteReplicationFetchFilterTest extends LocalDiskRepositoryTestCase
+    implements RefFixture {
+
+  @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
+
+  @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
+  @Mock Configuration config;
+  @Mock Configuration.ReplicationFilter replicationFilterConfig;
+
+  @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+  String project = A_TEST_PROJECT_NAME;
+  Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+  private TestRepository<InMemoryRepository> repo;
+
+  @Before
+  public void setupTestRepo() throws Exception {
+    InMemoryRepository inMemoryRepo =
+        gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+    repo = new TestRepository<>(inMemoryRepo);
+    doReturn(replicationFilterConfig).when(config).replicationFilter();
+  }
+
+  @Test
+  public void shouldReturnEmptyRefsWhenAllUpToDate() throws Exception {
+    String fooRefName = "refs/heads/foo";
+    ObjectId fooObjectId = newRef(fooRefName).getId();
+    String barRefName = "refs/heads/bar";
+    ObjectId barObjectId = newRef(barRefName).getId();
+    Set<String> refs = Set.of(fooRefName, barRefName);
+
+    doReturn(Optional.of(fooObjectId.getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(fooRefName), eq(String.class));
+    doReturn(Optional.of(barObjectId.getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(barRefName), eq(String.class));
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefs = fetchFilter.filter(project, refs);
+
+    assertThat(filteredRefs).isEmpty();
+  }
+
+  @Test
+  public void shouldFilterOutOneUpToDateRef() throws Exception {
+    String refUpToDate = "refs/heads/uptodate";
+    String outdatedRef = "refs/heads/outdated";
+    ObjectId upToDateObjectId = newRef(refUpToDate).getId();
+    newRef(outdatedRef).getId();
+    Set<String> refsToFetch = Set.of(refUpToDate, outdatedRef);
+
+    doReturn(Optional.of(upToDateObjectId.getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(refUpToDate), eq(String.class));
+    doReturn(Optional.of(AN_OUTDATED_OBJECT_ID.getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(outdatedRef), eq(String.class));
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).containsExactly(outdatedRef);
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndFilterOut() throws Exception {
+    String temporaryOutdated = "refs/heads/temporaryOutdated";
+    RevCommit localRef = newRef(temporaryOutdated);
+
+    Set<String> refsToFetch = Set.of(temporaryOutdated);
+    doReturn(Optional.empty())
+        .doReturn(Optional.of(localRef.getId().getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).isEmpty();
+
+    verify(sharedRefDatabaseMock, times(2)).get(any(), any(), any());
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+    String temporaryOutdated = "refs/heads/temporaryOutdated";
+    newRef(temporaryOutdated);
+
+    Set<String> refsToFetch = Set.of(temporaryOutdated);
+    doReturn(Optional.of(AN_OUTDATED_OBJECT_ID.getName()))
+        .doReturn(Optional.of(AN_OUTDATED_OBJECT_ID.getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(1);
+    verify(sharedRefDatabaseMock, times(3))
+        .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+  }
+
+  @Test
+  public void shouldNotFilterOutWhenMissingInTheSharedRefDb() throws Exception {
+    String temporaryOutdated = "refs/heads/temporaryOutdated";
+    newRef(temporaryOutdated);
+
+    doReturn(Optional.empty())
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+    Set<String> refsToFetch = Set.of(temporaryOutdated);
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(1);
+  }
+
+  @Test
+  public void shouldNotFilterOutWhenRefsMultisiteVersionIsPresentInSharedRefDb() throws Exception {
+    String refsMultisiteVersionRef = ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
+    RevCommit multiSiteVersionRef = newRef(refsMultisiteVersionRef);
+
+    doReturn(Optional.of(multiSiteVersionRef.getId().getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(refsMultisiteVersionRef), eq(String.class));
+
+    Set<String> refsToFetch = Set.of(refsMultisiteVersionRef);
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(1);
+  }
+
+  @Test
+  public void shouldFilterOutWhenRefIsDeletedInTheSharedRefDb() throws Exception {
+    String temporaryOutdated = "refs/heads/temporaryOutdated";
+    newRef(temporaryOutdated);
+
+    Set<String> refsToFetch = Set.of(temporaryOutdated);
+    doReturn(Optional.of(ObjectId.zeroId().getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), eq(temporaryOutdated), eq(String.class));
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(0);
+    verify(sharedRefDatabaseMock).get(eq(projectName), any(), any());
+  }
+
+  @Test
+  public void shouldNotFilterOutWhenRefIsMissingOnlyInTheLocalRepository() throws Exception {
+    String refObjectId = "0000000000000000000000000000000000000001";
+    String nonExistingLocalRef = "refs/heads/temporaryOutdated";
+
+    Set<String> refsToFetch = Set.of(nonExistingLocalRef);
+    doReturn(Optional.of(refObjectId))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), any(), any());
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(1);
+  }
+
+  @Test
+  public void shouldNotFilterOutRefThatDoesntExistLocallyOrInSharedRefDb() throws Exception {
+    String nonExisting = "refs/heads/non-existing-ref";
+
+    Set<String> refsToFetch = Set.of(nonExisting);
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(1);
+  }
+
+  @Test
+  public void shouldFilterOutRefMissingInTheLocalRepositoryAndDeletedInSharedRefDb()
+      throws Exception {
+    String nonExistingLocalRef = "refs/heads/temporaryOutdated";
+
+    Set<String> refsToFetch = Set.of(nonExistingLocalRef);
+    doReturn(Optional.of(ObjectId.zeroId().getName()))
+        .when(sharedRefDatabaseMock)
+        .get(eq(projectName), any(), any());
+
+    MultisiteReplicationFetchFilter fetchFilter =
+        new MultisiteReplicationFetchFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    Set<String> filteredRefsToFetch = fetchFilter.filter(project, refsToFetch);
+
+    assertThat(filteredRefsToFetch).hasSize(0);
+  }
+
+  private RevCommit newRef(String refName) throws Exception {
+    return repo.branch(refName).commit().create();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java
new file mode 100644
index 0000000..b4a5e5d
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilterTest.java
@@ -0,0 +1,159 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.RefFixture;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MultisiteReplicationPushFilterTest extends LocalDiskRepositoryTestCase
+    implements RefFixture {
+
+  @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
+
+  @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
+  @Mock Configuration config;
+  @Mock Configuration.ReplicationFilter replicationFilterConfig;
+
+  @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+  String project = A_TEST_PROJECT_NAME;
+  Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+  private TestRepository<InMemoryRepository> repo;
+
+  @Before
+  public void setupTestRepo() throws Exception {
+    InMemoryRepository inMemoryRepo =
+        gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+    doReturn(replicationFilterConfig).when(config).replicationFilter();
+    repo = new TestRepository<>(inMemoryRepo);
+  }
+
+  @Test
+  public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
+    List<RemoteRefUpdate> refUpdates =
+        Arrays.asList(refUpdate("refs/heads/foo"), refUpdate("refs/heads/bar"));
+    doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
+  }
+
+  @Test
+  public void shouldFilterOutOneOutdatedRef() throws Exception {
+    RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+    RemoteRefUpdate outdatedRef = refUpdate("refs/heads/outdated");
+    List<RemoteRefUpdate> refUpdates = Arrays.asList(refUpToDate, outdatedRef);
+    SharedRefDatabaseWrapper sharedRefDatabase =
+        new FakeSharedRefDatabaseWrapper(outdatedRef.getSrcRef());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager, config);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(refUpToDate);
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+    String refName = "refs/heads/temporaryOutdated";
+    RemoteRefUpdate temporaryOutdated = refUpdate(refName);
+    ObjectId latestObjectId = repo.getRepository().exactRef(refName).getObjectId();
+
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).hasSize(1);
+    assertThat(filteredRefUpdates.get(0).getNewObjectId()).isEqualTo(latestObjectId);
+
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    repo.branch("refs/heads/temporaryOutdated").commit().create();
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(false).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager, config);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).isEmpty();
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
+  public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
+    RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+    RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
+    RemoteRefUpdate changeMetaRef = refUpdate("refs/changes/12/4512/meta");
+    RemoteRefUpdate changeRef = refUpdate("refs/changes/12/4512/1");
+    List<RemoteRefUpdate> refUpdates =
+        Arrays.asList(refUpToDate, refChangeUpToDate, changeMetaRef, changeRef);
+    SharedRefDatabaseWrapper sharedRefDatabase =
+        new FakeSharedRefDatabaseWrapper(changeMetaRef.getSrcRef());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager, config);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
+  }
+
+  private RemoteRefUpdate refUpdate(String refName) throws Exception {
+    ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
+    Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+    repo.branch(refName).commit().create();
+    return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
index 88d2270..d99fb4b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/ProjectVersionRefUpdateTest.java
@@ -18,16 +18,21 @@
 import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF;
 import static com.googlesource.gerrit.plugins.multisite.validation.ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 import com.gerritforge.gerrit.globalrefdb.validation.ProjectsFilter;
 import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDatabaseWrapper;
+import com.google.common.base.Suppliers;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.data.RefUpdateAttribute;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
@@ -95,16 +100,9 @@
     Context.setForwardedEvent(false);
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
-            ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
-            String.class))
-        .thenReturn(Optional.of("26f7ee61bf0e470e8393c884526eec8a9b943a63"));
-    when(sharedRefDb.get(
-            A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
             String.class))
         .thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
-    when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class)))
-        .thenReturn(true);
     when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
         .thenReturn(true);
     when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
@@ -129,6 +127,35 @@
   }
 
   @Test
+  public void producerShouldUsePutInsteadOfCompareAndPutWhenExtendedGlobalRefDb()
+      throws IOException {
+    when(sharedRefDb.isSetOperationSupported()).thenReturn(true);
+    RefUpdatedEvent refUpdatedEvent = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdatedAttribute = new RefUpdateAttribute();
+    refUpdatedAttribute.project = A_TEST_PROJECT_NAME_KEY.get();
+    refUpdatedAttribute.refName = A_TEST_REF_NAME;
+    refUpdatedEvent.refUpdate = Suppliers.memoize(() -> refUpdatedAttribute);
+
+    new ProjectVersionRefUpdateImpl(
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
+        .onEvent(refUpdatedEvent);
+
+    Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
+
+    verify(sharedRefDb, never())
+        .compareAndPut(any(Project.NameKey.class), anyString(), anyLong(), anyLong());
+
+    verify(sharedRefDb).put(any(Project.NameKey.class), any(String.class), any(String.class));
+    assertThat(ref).isNotNull();
+
+    ObjectLoader loader = repo.getRepository().open(ref.getObjectId());
+    long storedVersion = readLongObject(loader);
+    assertThat(storedVersion).isGreaterThan((long) masterCommit.getCommitTime());
+
+    verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, storedVersion, 0);
+  }
+
+  @Test
   public void producerShouldUpdateProjectVersionUponForcedPushRefUpdatedEvent() throws Exception {
     Context.setForwardedEvent(false);
 
@@ -137,27 +164,20 @@
 
     Thread.sleep(1000L);
     repo.branch("master").update(masterCommit);
-
-    when(sharedRefDb.get(
-            A_TEST_PROJECT_NAME_KEY,
-            ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
-            String.class))
-        .thenReturn(Optional.of("26f7ee61bf0e470e8393c884526eec8a9b943a63"));
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
             String.class))
         .thenReturn(Optional.of("" + (masterCommit.getCommitTime() - 1)));
-    when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class)))
-        .thenReturn(true);
     when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
         .thenReturn(true);
     when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
     when(refUpdatedEvent.getRefName()).thenReturn(A_TEST_REF_NAME);
 
-    new ProjectVersionRefUpdateImpl(
-            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter)
-        .onEvent(refUpdatedEvent);
+    ProjectVersionRefUpdateImpl projectVersion =
+        new ProjectVersionRefUpdateImpl(
+            repoManager, sharedRefDb, gitReferenceUpdated, verLogger, projectsFilter);
+    projectVersion.onEvent(refUpdatedEvent);
 
     Ref ref = repo.getRepository().findRef(MULTI_SITE_VERSIONING_REF);
 
@@ -168,6 +188,10 @@
 
     ObjectLoader loader = repo.getRepository().open(ref.getObjectId());
     long storedVersion = readLongObject(loader);
+
+    Optional<Long> localStoredVersion = projectVersion.getProjectLocalVersion(A_TEST_PROJECT_NAME);
+    assertThat(localStoredVersion).isEqualTo(Optional.of(storedVersion));
+
     assertThat(storedVersion).isGreaterThan((long) masterPlusOneCommit.getCommitTime());
 
     verify(verLogger).log(A_TEST_PROJECT_NAME_KEY, storedVersion, 0);
@@ -179,17 +203,10 @@
     Context.setForwardedEvent(false);
     when(sharedRefDb.get(
             A_TEST_PROJECT_NAME_KEY,
-            ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_REF,
-            String.class))
-        .thenReturn(Optional.empty());
-    when(sharedRefDb.get(
-            A_TEST_PROJECT_NAME_KEY,
             ProjectVersionRefUpdate.MULTI_SITE_VERSIONING_VALUE_REF,
             String.class))
         .thenReturn(Optional.empty());
 
-    when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(Ref.class), any(ObjectId.class)))
-        .thenReturn(true);
     when(sharedRefDb.compareAndPut(any(Project.NameKey.class), any(String.class), any(), any()))
         .thenReturn(true);
     when(refUpdatedEvent.getProjectNameKey()).thenReturn(A_TEST_PROJECT_NAME_KEY);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java
index baba94b..1885d27 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/RefFixture.java
@@ -33,6 +33,8 @@
   static final ObjectId AN_OBJECT_ID_1 = new ObjectId(1, 2, 3, 4, 5);
   static final ObjectId AN_OBJECT_ID_2 = new ObjectId(1, 2, 3, 4, 6);
   static final ObjectId AN_OBJECT_ID_3 = new ObjectId(1, 2, 3, 4, 7);
+  static final ObjectId AN_OUTDATED_OBJECT_ID =
+      ObjectId.fromString("da37cb098dd7df4c8662ae8afc1acae5f7567775");
   static final String A_TEST_REF_NAME = "refs/heads/master";
   static final String A_REF_NAME_OF_A_PATCHSET = "refs/changes/01/1/1";