Merge "Log4jMessageLogger: Use EventGsonProvider instance from gerrit core"
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 82cd5e8..fc1b8ad 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,6 +9,6 @@
 
     maven_jar(
         name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.3.1",
-        sha1 = "90775e671946b20e52be3a11277d1ed33973d66e",
+        artifact = "com.gerritforge:events-broker:3.4.0-rc0",
+        sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
     )
diff --git a/setup_local_env/README.md b/setup_local_env/README.md
index 5b99996..fca46d8 100644
--- a/setup_local_env/README.md
+++ b/setup_local_env/README.md
@@ -4,7 +4,8 @@
 The environment is composed by:
 
 - 2 gerrit instances deployed by default in /tmp
-- 1 kafka node and 1 zookeeper node
+- 1 zookeeper node
+- 1 Broker node (kafka, kinesis or gcloud-pubsub)
 - 1 HA-PROXY
 
 ## Requirements
@@ -14,15 +15,36 @@
 - wget
 - envsubst
 - haproxy
+- aws-cli (only when broker_type is "kinesis")
 
 ## Examples
 
-Simplest setup with all default values and cleanup previous deployment
+Simplest setup with all default values and cleanup previous deployment. This
+will deploy kafka broker
 
 ```bash
 sh setup_local_env/setup.sh --release-war-file /path/to/gerrit.war --multisite-lib-file /path/to/multi-site.jar
 ```
 
+Deploy Kinesis broker
+
+```bash
+sh setup_local_env/setup.sh \
+    --release-war-file /path/to/gerrit.war \
+    --multisite-lib-file /path/to/multi-site.jar \
+    --broker-type kinesis
+```
+
+Deploy GCloud PubSub broker
+
+```bash
+sh setup_local_env/setup.sh \
+    --release-war-file /path/to/gerrit.war \
+    --multisite-lib-file /path/to/multi-site.jar \
+    --broker-type gcloud-pubsub
+```
+
+
 Cleanup the previous deployments
 
 ```bash
@@ -59,6 +81,8 @@
 [--just-cleanup-env]            Cleans up previous deployment; default false
 
 [--enabled-https]               Enabled https; default true
+
+[--broker_type]                 events broker type; 'kafka', 'kinesis' or 'gcloud-pubsub'. Default 'kafka'
 ```
 
 ## Limitations
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index f9eca89..368f94d 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -18,6 +18,7 @@
 [container]
     javaOptions = "-Dflogger.backend_factory=com.google.common.flogger.backend.log4j.Log4jBackendFactory#getInstance"
     javaOptions = "-Dflogger.logging_context=com.google.gerrit.server.logging.LoggingContext#getInstance"
+    javaOptions = "-DPUBSUB_EMULATOR_HOST=localhost:$BROKER_PORT"
     javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$REMOTE_DEBUG_PORT"
 [index]
     type = LUCENE
@@ -41,13 +42,25 @@
     directory = $FAKE_NFS
 [plugin "kafka-events"]
     sendAsync = true
-    bootstrapServers = localhost:$KAFKA_PORT
-    groupId = $KAFKA_GROUP_ID
+    bootstrapServers = localhost:$BROKER_PORT
+    groupId = $GROUP_ID
     numberOfSubscribers = 6
     securityProtocol = PLAINTEXT
     pollingIntervalMs = 1000
     enableAutoCommit = true
     autoCommitIntervalMs = 1000
     autoOffsetReset = latest
+[plugin "events-aws-kinesis"]
+    numberOfSubscribers = 6
+    pollingIntervalMs = 1000
+    region = us-east-1
+    endpoint = http://localhost:$BROKER_PORT
+    applicationName = $GROUP_ID
+    initialPosition = trim_horizon
+[plugin "events-gcloud-pubsub"]
+    numberOfSubscribers = 6
+    gcloudProject="test-project"
+    subscriptionId=$GROUP_ID
+    privateKeyLocation="not used in local mode"
 [plugin "metrics-reporter-prometheus"]
     prometheusBearerToken = token
diff --git a/setup_local_env/docker-compose.yaml b/setup_local_env/docker-compose-core.yaml
similarity index 61%
rename from setup_local_env/docker-compose.yaml
rename to setup_local_env/docker-compose-core.yaml
index c386d46..dab168d 100644
--- a/setup_local_env/docker-compose.yaml
+++ b/setup_local_env/docker-compose-core.yaml
@@ -5,15 +5,8 @@
     ports:
       - "2181:2181"
     container_name: zk_test_node
-  kafka:
-    image: wurstmeister/kafka:2.12-2.1.0
-    ports:
-      - "9092:9092"
-    container_name: kafka_test_node
-    environment:
-      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
   prometheus:
+    container_name: prometheus_test_node
     image: prom/prometheus:v2.16.0
     user: root
     volumes:
diff --git a/setup_local_env/docker-compose-gcloud-pubsub.yaml b/setup_local_env/docker-compose-gcloud-pubsub.yaml
new file mode 100644
index 0000000..983e784
--- /dev/null
+++ b/setup_local_env/docker-compose-gcloud-pubsub.yaml
@@ -0,0 +1,13 @@
+version: '3'
+services:
+  pubsub:
+    image: gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators
+    ports:
+      - "8085:8085"
+    container_name: gcloud-pubsub_test_node
+    entrypoint: gcloud beta emulators pubsub start --project test-project --host-port 0.0.0.0:8085
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
diff --git a/setup_local_env/docker-compose-kafka.yaml b/setup_local_env/docker-compose-kafka.yaml
new file mode 100644
index 0000000..8a31502
--- /dev/null
+++ b/setup_local_env/docker-compose-kafka.yaml
@@ -0,0 +1,15 @@
+version: '3'
+services:
+  kafka:
+    image: wurstmeister/kafka:2.12-2.1.0
+    ports:
+      - "9092:9092"
+    container_name: kafka_test_node
+    environment:
+      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
diff --git a/setup_local_env/docker-compose-kinesis.yaml b/setup_local_env/docker-compose-kinesis.yaml
new file mode 100644
index 0000000..15c609e
--- /dev/null
+++ b/setup_local_env/docker-compose-kinesis.yaml
@@ -0,0 +1,17 @@
+version: '3'
+services:
+  kinesis:
+    image: localstack/localstack:0.12.8
+    ports:
+      - "4566:4566"
+      - "4751:4751"
+    container_name: kinesis_test_node
+    environment:
+      SERVICES: dynamodb,cloudwatch,kinesis
+      AWS_REGION: us-east-1
+      USE_SSL: "true"
+    networks:
+      - setup_local_env_default
+networks:
+  setup_local_env_default:
+    external: true
\ No newline at end of file
diff --git a/setup_local_env/haproxy-config/haproxy.cfg b/setup_local_env/haproxy-config/haproxy.cfg
index 94b22d8..de4f709 100644
--- a/setup_local_env/haproxy-config/haproxy.cfg
+++ b/setup_local_env/haproxy-config/haproxy.cfg
@@ -65,4 +65,5 @@
     timeout connect 10s
     timeout server 5m
     server ssh_node1 $HA_GERRIT_SITE1_HOSTNAME:$HA_GERRIT_SITE1_SSHD_PORT check inter 10s check port $HA_GERRIT_SITE1_HTTPD_PORT inter 10s
-    server ssh_node2 $HA_GERRIT_SITE2_HOSTNAME:$HA_GERRIT_SITE2_SSHD_PORT check inter 10s check port $HA_GERRIT_SITE2_HTTPD_PORT inter 10s backup
\ No newline at end of file
+    server ssh_node2 $HA_GERRIT_SITE2_HOSTNAME:$HA_GERRIT_SITE2_SSHD_PORT check inter 10s check port $HA_GERRIT_SITE2_HTTPD_PORT inter 10s backup
+
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index f25281c..f3f786b 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -16,7 +16,7 @@
 
 
 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
-GERRIT_BRANCH=stable-3.3
+GERRIT_BRANCH=stable-3.4
 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job
 LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
 EVENTS_BROKER_VER=`grep 'com.gerritforge:events-broker' $(dirname $0)/../external_plugin_deps.bzl | cut -d '"' -f 2 | cut -d ':' -f 3`
@@ -30,6 +30,9 @@
   type wget >/dev/null 2>&1 || { echo >&2 "Require wget but it's not installed. Aborting."; exit 1; }
   type envsubst >/dev/null 2>&1 || { echo >&2 "Require envsubst but it's not installed. Aborting."; exit 1; }
   type openssl >/dev/null 2>&1 || { echo >&2 "Require openssl but it's not installed. Aborting."; exit 1; }
+  if [ "$BROKER_TYPE" = "kinesis" ];  then
+    type aws >/dev/null 2>&1 || { echo >&2 "Require aws-cli but it's not installed. Aborting."; exit 1; }
+  fi
 }
 
 function get_replication_url {
@@ -67,7 +70,7 @@
     export GERRIT_HOSTNAME=$7
     export REPLICATION_HOSTNAME=$8
     export REMOTE_DEBUG_PORT=$9
-    export KAFKA_GROUP_ID=${10}
+    export GROUP_ID=${10}
     export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
 
     echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -99,10 +102,17 @@
   haproxy -f $HA_PROXY_CONFIG_DIR/haproxy.cfg &
 }
 
-function deploy_config_files {
-  # KAFKA configuration
-  export KAFKA_PORT=9092
+function export_broker_port {
+  if [ "$BROKER_TYPE" = "kinesis" ]; then
+    export BROKER_PORT=4566
+  elif [ "$BROKER_TYPE" =  "kafka" ]; then
+    export BROKER_PORT=9092
+  elif [ "$BROKER_TYPE" =  "gcloud-pubsub" ]; then
+    export BROKER_PORT=8085
+  fi
+}
 
+function deploy_config_files {
   # ZK configuration
   export ZK_PORT=2181
 
@@ -112,21 +122,21 @@
   GERRIT_SITE1_SSHD_PORT=$3
   CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
   GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
-  GERRIT_SITE1_KAFKA_GROUP_ID="instance-1"
+  GERRIT_SITE1_GROUP_ID="instance-1"
   # SITE 2
   GERRIT_SITE2_HOSTNAME=$4
   GERRIT_SITE2_HTTPD_PORT=$5
   GERRIT_SITE2_SSHD_PORT=$6
   CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
   GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
-  GERRIT_SITE2_KAFKA_GROUP_ID="instance-2"
+  GERRIT_SITE2_GROUP_ID="instance-2"
 
   # Set config SITE1
-  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_KAFKA_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_GROUP_ID
 
 
   # Set config SITE2
-  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_KAFKA_GROUP_ID
+  copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_GROUP_ID
 }
 
 function is_docker_desktop {
@@ -142,13 +152,33 @@
   fi
 }
 
+function create_kinesis_streams {
+  for stream in "gerrit_batch_index" "gerrit_cache_eviction" "gerrit_index" "gerrit_list_project" "gerrit_stream" "gerrit_web_session" "gerrit"
+  do
+    create_kinesis_stream $stream
+  done
+}
+
+function create_kinesis_stream {
+  local stream=$1
+
+  export AWS_PAGER=''
+  echo "[KINESIS] Create stream $stream"
+  until aws --endpoint-url=http://localhost:$BROKER_PORT kinesis create-stream --shard-count 1 --stream-name "$stream"
+  do
+      echo "[KINESIS stream $stream] Creation failed. Retrying in 5 seconds..."
+      sleep 5s
+  done
+}
 
 function cleanup_environment {
   echo "Killing existing HA-PROXY setup"
   kill $(ps -ax | grep haproxy | grep "gerrit_setup/ha-proxy-config" | awk '{print $1}') 2> /dev/null
 
-  echo "Stopping docker containers"
-  docker-compose -f $SCRIPT_DIR/docker-compose.yaml down 2> /dev/null
+  echo "Stopping $BROKER_TYPE docker container"
+  docker-compose -f "${SCRIPT_DIR}/docker-compose-${BROKER_TYPE}.yaml" down 2> /dev/null
+  echo "Stopping core docker containers"
+  docker-compose -f "${SCRIPT_DIR}/docker-compose-core.yaml" down 2> /dev/null
 
   echo "Stopping GERRIT instances"
   $1/bin/gerrit.sh stop 2> /dev/null
@@ -158,8 +188,32 @@
   rm -rf $3 2> /dev/null
 }
 
-function check_if_kafka_is_running {
-  echo $(docker inspect kafka_test_node 2> /dev/null | grep '"Running": true' | wc -l)
+function check_if_container_is_running {
+  local container=$1;
+  echo $(docker inspect "$container" 2> /dev/null | grep '"Running": true' | wc -l)
+}
+
+function ensure_docker_compose_is_up_and_running {
+  local log_label=$1
+  local container_name=$2
+  local docker_compose_file=$3
+
+  local is_container_running=$(check_if_container_is_running "$container_name")
+  if [ "$is_container_running" -lt 1 ];then
+    echo "[$log_label] Starting docker containers"
+    docker-compose -f "${SCRIPT_DIR}/${docker_compose_file}" up -d
+
+    echo "[$log_label] Waiting for docker containers to start..."
+    while [[ $(check_if_container_is_running "$container_name") -lt 1 ]];do sleep 10s; done
+  else
+    echo "[$log_label] Containers already running, nothing to do"
+  fi
+}
+
+function prepare_broker_data {
+  if [ "$BROKER_TYPE" = "kinesis" ]; then
+    create_kinesis_streams
+  fi
 }
 
 while [ $# -ne 0 ]
@@ -194,6 +248,8 @@
     echo
     echo "[--enabled-https]               Enabled https; default true"
     echo
+    echo "[--broker-type]                 events broker type; 'kafka', 'kinesis' or 'gcloud-pubsub'. Default 'kafka'"
+    echo
     exit 0
   ;;
   "--new-deployment")
@@ -281,6 +337,15 @@
     shift
     shift
   ;;
+ "--broker-type" )
+    BROKER_TYPE=$2
+    shift
+    shift
+    if [ ! "$BROKER_TYPE" = "kafka" ] && [ ! "$BROKER_TYPE" = "kinesis" ] && [ ! "$BROKER_TYPE" = "gcloud-pubsub" ]; then
+      echo >&2 "broker type: '$BROKER_TYPE' not valid. Please supply 'kafka','kinesis' or 'gcloud-pubsub'. Aborting"
+      exit 1
+    fi
+  ;;
   *     )
     echo "Unknown option argument: $1"
     shift
@@ -309,6 +374,7 @@
 export REPLICATION_DELAY_SEC=${REPLICATION_DELAY_SEC:-"5"}
 export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
 HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
+BROKER_TYPE=${BROKER_TYPE:-"kafka"}
 
 export COMMON_LOCATION=$DEPLOYMENT_LOCATION/gerrit_setup
 LOCATION_TEST_SITE_1=$COMMON_LOCATION/instance-1
@@ -342,10 +408,10 @@
 fi
 if [ $DOWNLOAD_WEBSESSION_PLUGIN = "true" ];then
   echo "Downloading websession-broker plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-websession-broker-bazel-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
+  wget $GERRIT_CI/plugin-websession-broker-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/websession-broker/websession-broker.jar \
   -O $DEPLOYMENT_LOCATION/websession-broker.jar || { echo >&2 "Cannot download websession-broker plugin: Check internet connection. Abort\
 ing"; exit 1; }
-  wget $GERRIT_CI/plugin-healthcheck-bazel-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
+  wget $GERRIT_CI/plugin-healthcheck-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/healthcheck/healthcheck.jar \
   -O $DEPLOYMENT_LOCATION/healthcheck.jar || { echo >&2 "Cannot download healthcheck plugin: Check internet connection. Abort\
 ing"; exit 1; }
 else
@@ -353,7 +419,7 @@
 fi
 
 echo "Downloading zookeeper plugin $GERRIT_BRANCH"
-  wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
+  wget $GERRIT_CI/plugin-zookeeper-refdb-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/zookeeper-refdb/zookeeper-refdb.jar \
   -O $DEPLOYMENT_LOCATION/zookeeper-refdb.jar || { echo >&2 "Cannot download zookeeper plugin: Check internet connection. Abort\
 ing"; exit 1; }
 
@@ -367,10 +433,27 @@
   -O $DEPLOYMENT_LOCATION/events-broker.jar || { echo >&2 "Cannot download events-broker library: Check internet connection. Abort\
 ing"; exit 1; }
 
+if [ "$BROKER_TYPE" = "kafka" ]; then
 echo "Downloading kafka-events plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-kafka-events-bazel-$GERRIT_BRANCH/$LAST_BUILD/kafka-events/kafka-events.jar \
   -O $DEPLOYMENT_LOCATION/kafka-events.jar || { echo >&2 "Cannot download kafka-events plugin: Check internet connection. Abort\
 ing"; exit 1; }
+fi
+
+if [ "$BROKER_TYPE" = "kinesis" ]; then
+echo "Downloading events-aws-kinesis plugin $GERRIT_BRANCH"
+  wget $GERRIT_CI/plugin-events-aws-kinesis-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-aws-kinesis/events-aws-kinesis.jar \
+  -O $DEPLOYMENT_LOCATION/events-aws-kinesis.jar || { echo >&2 "Cannot download events-aws-kinesis plugin: Check internet connection. Abort\
+ing"; exit 1; }
+fi
+
+
+if [ "$BROKER_TYPE" = "gcloud-pubsub" ]; then
+echo "Downloading events-gcloud-pubsub plugin $GERRIT_BRANCH"
+  wget $GERRIT_CI/plugin-events-gcloud-pubsub-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/events-gcloud-pubsub/events-gcloud-pubsub.jar \
+  -O $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar || { echo >&2 "Cannot download events-gcloud-pubsub plugin: Check internet connection. Abort\
+ing"; exit 1; }
+fi
 
 echo "Downloading metrics-reporter-prometheus plugin $GERRIT_BRANCH"
   wget $GERRIT_CI/plugin-metrics-reporter-prometheus-bazel-master-$GERRIT_BRANCH/$LAST_BUILD/metrics-reporter-prometheus/metrics-reporter-prometheus.jar \
@@ -422,8 +505,14 @@
   echo "Copy events broker library"
   cp -f $DEPLOYMENT_LOCATION/events-broker.jar $LOCATION_TEST_SITE_1/lib/events-broker.jar
 
-  echo "Copy kafka events plugin"
-  cp -f $DEPLOYMENT_LOCATION/kafka-events.jar $LOCATION_TEST_SITE_1/plugins/kafka-events.jar
+  echo "Copy $BROKER_TYPE events plugin"
+  if [ $BROKER_TYPE = "kinesis" ]; then
+     cp -f $DEPLOYMENT_LOCATION/events-aws-kinesis.jar $LOCATION_TEST_SITE_1/plugins/events-aws-kinesis.jar
+  elif [ $BROKER_TYPE = "gcloud-pubsub" ]; then
+    cp -f $DEPLOYMENT_LOCATION/events-gcloud-pubsub.jar $LOCATION_TEST_SITE_1/plugins/events-gcloud-pubsub.jar
+  else
+     cp -f $DEPLOYMENT_LOCATION/$BROKER_TYPE-events.jar $LOCATION_TEST_SITE_1/plugins/$BROKER_TYPE-events.jar
+  fi
 
   echo "Copy metrics-reporter-prometheus plugin"
   cp -f $DEPLOYMENT_LOCATION/metrics-reporter-prometheus.jar $LOCATION_TEST_SITE_1/plugins/metrics-reporter-prometheus.jar
@@ -455,14 +544,10 @@
 
 cat $SCRIPT_DIR/configs/prometheus.yml | envsubst > $COMMON_LOCATION/prometheus.yml
 
-IS_KAFKA_RUNNING=$(check_if_kafka_is_running)
-if [ $IS_KAFKA_RUNNING -lt 1 ];then
-
-  echo "Starting zk and kafka"
-  docker-compose -f $SCRIPT_DIR/docker-compose.yaml up -d
-  echo "Waiting for kafka to start..."
-  while [[ $(check_if_kafka_is_running) -lt 1 ]];do sleep 10s; done
-fi
+export_broker_port
+ensure_docker_compose_is_up_and_running "core" "prometheus_test_node" "docker-compose-core.yaml"
+ensure_docker_compose_is_up_and_running "$BROKER_TYPE" "${BROKER_TYPE}_test_node" "docker-compose-$BROKER_TYPE.yaml"
+prepare_broker_data
 
 echo "Re-deploying configuration files"
 deploy_config_files $GERRIT_1_HOSTNAME $GERRIT_1_HTTPD_PORT $GERRIT_1_SSHD_PORT $GERRIT_2_HOSTNAME $GERRIT_2_HTTPD_PORT $GERRIT_2_SSHD_PORT
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 816edc9..1f8766a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -76,12 +76,10 @@
         subscriberMetrics.incrementSubscriberConsumedMessage();
         subscriberMetrics.updateReplicationStatusMetrics(event);
       } catch (IOException e) {
-        logger.atSevere().withCause(e).log(
-            "Malformed event '%s': [Exception: %s]", event.getHeader());
+        logger.atSevere().withCause(e).log("Malformed event '%s'", event.getHeader());
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       } catch (PermissionBackendException | CacheNotFoundException e) {
-        logger.atSevere().withCause(e).log(
-            "Cannot handle message %s: [Exception: %s]", event.getHeader());
+        logger.atSevere().withCause(e).log("Cannot handle message '%s'", event.getHeader());
         subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/GsonParser.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheKeyJsonParser.java
similarity index 96%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/GsonParser.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheKeyJsonParser.java
index b69cb91..80b2445 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/GsonParser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheKeyJsonParser.java
@@ -24,11 +24,11 @@
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.cache.Constants;
 
-public final class GsonParser {
+public final class CacheKeyJsonParser {
   private final Gson gson;
 
   @Inject
-  public GsonParser(@EventGson Gson gson) {
+  public CacheKeyJsonParser(@EventGson Gson gson) {
     this.gson = gson;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
index 1e21e84..8d41500 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -23,15 +23,12 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.Configuration.Index;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.index.ChangeChecker;
 import com.googlesource.gerrit.plugins.multisite.index.ChangeCheckerImpl;
 import com.googlesource.gerrit.plugins.multisite.index.ForwardedIndexExecutor;
 import java.util.Optional;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Index a change using {@link ChangeIndexer}. This class is meant to be used on the receiving side
@@ -41,12 +38,8 @@
  */
 @Singleton
 public class ForwardedIndexChangeHandler
-    extends ForwardedIndexingHandler<String, ChangeIndexEvent> {
+    extends ForwardedIndexingHandlerWithRetries<String, ChangeIndexEvent> {
   private final ChangeIndexer indexer;
-  private final ScheduledExecutorService indexExecutor;
-  private final OneOffRequestContext oneOffCtx;
-  private final int retryInterval;
-  private final int maxTries;
   private final ChangeCheckerImpl.Factory changeCheckerFactory;
 
   @Inject
@@ -56,43 +49,22 @@
       @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
       OneOffRequestContext oneOffCtx,
       ChangeCheckerImpl.Factory changeCheckerFactory) {
-    super(configuration.index().numStripedLocks());
+    super(indexExecutor, configuration, oneOffCtx);
     this.indexer = indexer;
-    this.indexExecutor = indexExecutor;
-    this.oneOffCtx = oneOffCtx;
     this.changeCheckerFactory = changeCheckerFactory;
-
-    Index indexConfig = configuration.index();
-    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
-    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
   }
 
   @Override
   protected void doIndex(String id, Optional<ChangeIndexEvent> indexEvent) {
-    doIndex(id, indexEvent, 0);
+    attemptToIndex(id, indexEvent, 0);
   }
 
-  private void doIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
+  @Override
+  protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
     ChangeChecker checker = changeCheckerFactory.create(id);
     Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
     if (changeNotes.isPresent()) {
-      ChangeNotes notes = changeNotes.get();
-      reindex(notes);
-
-      if (checker.isChangeUpToDate(indexEvent)) {
-        if (retryCount > 0) {
-          log.warn("Change {} has been eventually indexed after {} attempt(s)", id, retryCount);
-        } else {
-          log.debug("Change {} successfully indexed", id);
-        }
-      } else {
-        log.warn(
-            "Change {} seems too old compared to the event timestamp (event={} >> change-Ts={})",
-            id,
-            indexEvent,
-            checker);
-        rescheduleIndex(id, indexEvent, retryCount + 1);
-      }
+      reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount);
     } else {
       log.warn(
           "Change {} not present yet in local Git repository (event={}) after {} attempt(s)",
@@ -106,42 +78,20 @@
     }
   }
 
-  private void reindex(ChangeNotes notes) {
+  @Override
+  protected void reindex(String id) {
     try (ManualRequestContext ctx = oneOffCtx.open()) {
+      ChangeChecker checker = changeCheckerFactory.create(id);
+      Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
+      ChangeNotes notes = changeNotes.get();
       notes.reload();
       indexer.index(notes.getChange());
     }
   }
 
-  private boolean rescheduleIndex(
-      String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) {
-    if (retryCount > maxTries) {
-      log.error(
-          "Change {} could not be indexed after {} retries. Change index could be stale.",
-          id,
-          retryCount);
-      return false;
-    }
-
-    log.warn(
-        "Retrying for the #{} time to index Change {} after {} msecs",
-        retryCount,
-        id,
-        retryInterval);
-    @SuppressWarnings("unused")
-    Future<?> possiblyIgnoredError =
-        indexExecutor.schedule(
-            () -> {
-              try (ManualRequestContext ctx = oneOffCtx.open()) {
-                Context.setForwardedEvent(true);
-                doIndex(id, indexEvent, retryCount);
-              } catch (Exception e) {
-                log.warn("Change {} could not be indexed", id, e);
-              }
-            },
-            retryInterval,
-            TimeUnit.MILLISECONDS);
-    return true;
+  @Override
+  protected String indexName() {
+    return "change";
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
index 4e154ef..c4906a9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
@@ -16,11 +16,15 @@
 
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.index.ForwardedIndexExecutor;
+import com.googlesource.gerrit.plugins.multisite.index.GroupChecker;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Index a group using {@link GroupIndexer}. This class is meant to be used on the receiving side of
@@ -29,19 +33,42 @@
  * done for the same group uuid
  */
 @Singleton
-public class ForwardedIndexGroupHandler extends ForwardedIndexingHandler<String, GroupIndexEvent> {
+public class ForwardedIndexGroupHandler
+    extends ForwardedIndexingHandlerWithRetries<String, GroupIndexEvent> {
   private final GroupIndexer indexer;
+  private final GroupChecker groupChecker;
 
   @Inject
-  ForwardedIndexGroupHandler(GroupIndexer indexer, Configuration config) {
-    super(config.index().numStripedLocks());
+  ForwardedIndexGroupHandler(
+      GroupIndexer indexer,
+      Configuration config,
+      GroupChecker groupChecker,
+      OneOffRequestContext oneOffRequestContext,
+      @ForwardedIndexExecutor ScheduledExecutorService indexExecutor) {
+    super(indexExecutor, config, oneOffRequestContext);
     this.indexer = indexer;
+    this.groupChecker = groupChecker;
   }
 
   @Override
   protected void doIndex(String uuid, Optional<GroupIndexEvent> event) {
-    indexer.index(AccountGroup.uuid(uuid));
-    log.debug("Group {} successfully indexed", uuid);
+    attemptToIndex(uuid, event, 0);
+  }
+
+  @Override
+  protected void reindex(String id) {
+    indexer.index(AccountGroup.uuid(id));
+  }
+
+  @Override
+  protected String indexName() {
+    return "group";
+  }
+
+  @Override
+  protected void attemptToIndex(
+      String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) {
+    reindexAndCheckIsUpToDate(uuid, groupIndexEvent, groupChecker, retryCount);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
index 6da9681..3787a80 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
@@ -16,6 +16,7 @@
 
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.index.project.ProjectIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
@@ -23,9 +24,7 @@
 import com.googlesource.gerrit.plugins.multisite.index.ForwardedIndexExecutor;
 import com.googlesource.gerrit.plugins.multisite.index.ProjectChecker;
 import java.util.Optional;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Index a project using {@link ProjectIndexer}. This class is meant to be used on the receiving
@@ -35,80 +34,40 @@
  */
 @Singleton
 public class ForwardedIndexProjectHandler
-    extends ForwardedIndexingHandler<String, ProjectIndexEvent> {
+    extends ForwardedIndexingHandlerWithRetries<String, ProjectIndexEvent> {
   private final ProjectIndexer indexer;
-  private final int retryInterval;
-  private final int maxTries;
   private final ProjectChecker projectChecker;
-  private final ScheduledExecutorService indexExecutor;
 
   @Inject
   ForwardedIndexProjectHandler(
       ProjectIndexer indexer,
       ProjectChecker projectChecker,
+      OneOffRequestContext oneOffRequestContext,
       @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
       Configuration config) {
-    super(config.index().numStripedLocks());
+    super(indexExecutor, config, oneOffRequestContext);
     this.indexer = indexer;
-    Configuration.Index indexConfig = config.index();
-    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
-    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
-    this.indexExecutor = indexExecutor;
     this.projectChecker = projectChecker;
   }
 
   @Override
   protected void doIndex(String projectName, Optional<ProjectIndexEvent> event) {
-    if (!attemptIndex(projectName, event)) {
-      log.warn("First Attempt failed, scheduling again after {} msecs", retryInterval);
-      rescheduleIndex(projectName, event, 1);
-    }
+    attemptToIndex(projectName, event, 0);
   }
 
-  public boolean attemptIndex(String projectName, Optional<ProjectIndexEvent> event) {
-    log.debug("Attempt to index project {}, event: [{}]", projectName, event);
-    final Project.NameKey projectNameKey = Project.nameKey(projectName);
-    if (projectChecker.isProjectUpToDate(projectNameKey)) {
-      indexer.index(projectNameKey);
-      log.debug("Project {} successfully indexed", projectName);
-      return true;
-    }
-    return false;
+  @Override
+  protected void reindex(String id) {
+    indexer.index(Project.nameKey(id));
   }
 
-  public void rescheduleIndex(
-      String projectName, Optional<ProjectIndexEvent> event, int retryCount) {
-    if (retryCount > maxTries) {
-      log.error(
-          "Project {} could not be indexed after {} retries. index could be stale.",
-          projectName,
-          retryCount);
+  @Override
+  protected String indexName() {
+    return "project";
+  }
 
-      return;
-    }
-
-    log.warn(
-        "Retrying for the #{} time to index project {} after {} msecs",
-        retryCount,
-        projectName,
-        retryInterval);
-
-    @SuppressWarnings("unused")
-    Future<?> possiblyIgnoredError =
-        indexExecutor.schedule(
-            () -> {
-              Context.setForwardedEvent(true);
-              if (!attemptIndex(projectName, event)) {
-                log.warn(
-                    "Attempt {} to index project {} failed, scheduling again after {} msecs",
-                    retryCount,
-                    projectName,
-                    retryInterval);
-                rescheduleIndex(projectName, event, retryCount + 1);
-              }
-            },
-            retryInterval,
-            TimeUnit.MILLISECONDS);
+  @Override
+  protected void attemptToIndex(String id, Optional<ProjectIndexEvent> indexEvent, int retryCount) {
+    reindexAndCheckIsUpToDate(id, indexEvent, projectChecker, retryCount);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
new file mode 100644
index 0000000..5c64431
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import com.googlesource.gerrit.plugins.multisite.index.UpToDateChecker;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base class to handle forwarded indexing. This class is meant to be extended by classes used on
+ * the receiving side of the {@link IndexEvent} since it will prevent indexing to be forwarded again
+ * causing an infinite forwarding loop between the 2 nodes. It will also make sure no concurrent
+ * indexing is done for the same id.
+ */
+public abstract class ForwardedIndexingHandlerWithRetries<T, E extends IndexEvent>
+    extends ForwardedIndexingHandler<T, E> {
+
+  private final int retryInterval;
+  private final int maxTries;
+  private final ScheduledExecutorService indexExecutor;
+  protected final OneOffRequestContext oneOffCtx;
+
+  ForwardedIndexingHandlerWithRetries(
+      ScheduledExecutorService indexExecutor,
+      Configuration configuration,
+      OneOffRequestContext oneOffCtx) {
+    super(configuration.index().numStripedLocks());
+
+    Configuration.Index indexConfig = configuration.index();
+    this.oneOffCtx = oneOffCtx;
+    this.indexExecutor = indexExecutor;
+    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
+    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
+  }
+
+  protected abstract void reindex(T id);
+
+  protected abstract String indexName();
+
+  protected abstract void attemptToIndex(T id, Optional<E> indexEvent, int retryCount);
+
+  protected boolean rescheduleIndex(T id, Optional<E> indexEvent, int retryCount) {
+    if (retryCount > maxTries) {
+      log.error(
+          "{} {} could not be indexed after {} retries. {} index could be stale.",
+          indexName(),
+          id,
+          retryCount,
+          indexName());
+      return false;
+    }
+
+    log.warn(
+        "Retrying for the #{} time to index {} {} after {} msecs",
+        retryCount,
+        indexName(),
+        id,
+        retryInterval);
+    @SuppressWarnings("unused")
+    Future<?> possiblyIgnoredError =
+        indexExecutor.schedule(
+            () -> {
+              try (ManualRequestContext ctx = oneOffCtx.open()) {
+                Context.setForwardedEvent(true);
+                attemptToIndex(id, indexEvent, retryCount);
+              } catch (Exception e) {
+                log.warn("{} {} could not be indexed", indexName(), id, e);
+              }
+            },
+            retryInterval,
+            TimeUnit.MILLISECONDS);
+    return true;
+  }
+
+  public final void reindexAndCheckIsUpToDate(
+      T id, Optional<E> indexEvent, UpToDateChecker<E> upToDateChecker, int retryCount) {
+    reindex(id);
+
+    if (!upToDateChecker.isUpToDate(indexEvent)) {
+      log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id);
+      rescheduleIndex(id, indexEvent, retryCount + 1);
+      return;
+    }
+    if (retryCount > 0) {
+      log.warn(
+          "{} {} has been eventually indexed after {} attempt(s)", indexName(), id, retryCount);
+    } else {
+      log.debug("{} {} successfully indexed", indexName(), id);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
index e452e27..4981b2f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/GroupIndexEvent.java
@@ -15,27 +15,30 @@
 package com.googlesource.gerrit.plugins.multisite.forwarder.events;
 
 import com.google.common.base.Objects;
+import com.google.gerrit.common.Nullable;
+import org.eclipse.jgit.lib.ObjectId;
 
 public class GroupIndexEvent extends IndexEvent {
   static final String TYPE = "group-index";
 
-  public String groupUUID;
+  public final String groupUUID;
+  public final ObjectId sha1;
 
-  public GroupIndexEvent(String groupUUID) {
+  public GroupIndexEvent(String groupUUID, @Nullable ObjectId sha1) {
     super(TYPE);
     this.groupUUID = groupUUID;
+    this.sha1 = sha1;
   }
 
-  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
     GroupIndexEvent that = (GroupIndexEvent) o;
-    return Objects.equal(groupUUID, that.groupUUID);
+    return Objects.equal(groupUUID, that.groupUUID) && Objects.equal(sha1, that.sha1);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(groupUUID);
+    return Objects.hashCode(groupUUID, sha1);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
index 0fb0c0a..a77e168 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
@@ -16,18 +16,18 @@
 
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEntry;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheKeyJsonParser;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandler;
-import com.googlesource.gerrit.plugins.multisite.forwarder.GsonParser;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 
 public class CacheEvictionEventRouter implements ForwardedEventRouter<CacheEvictionEvent> {
   private final ForwardedCacheEvictionHandler cacheEvictionHanlder;
-  private final GsonParser gsonParser;
+  private final CacheKeyJsonParser gsonParser;
 
   @Inject
   public CacheEvictionEventRouter(
-      ForwardedCacheEvictionHandler cacheEvictionHanlder, GsonParser gsonParser) {
+      ForwardedCacheEvictionHandler cacheEvictionHanlder, CacheKeyJsonParser gsonParser) {
     this.cacheEvictionHanlder = cacheEvictionHanlder;
     this.gsonParser = gsonParser;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
index 3646b3a..9ee59eb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeChecker.java
@@ -20,7 +20,7 @@
 import java.util.Optional;
 
 /** Encapsulates the logic of verifying the up-to-date status of a change. */
-public interface ChangeChecker {
+public interface ChangeChecker extends UpToDateChecker<ChangeIndexEvent> {
 
   /**
    * Return the Change nodes read from ReviewDb or NoteDb.
@@ -48,7 +48,7 @@
    * @param indexEvent indexing event
    * @return true if the local Change is up-to-date, false otherwise.
    */
-  public boolean isChangeUpToDate(Optional<ChangeIndexEvent> indexEvent);
+  public boolean isUpToDate(Optional<ChangeIndexEvent> indexEvent);
 
   /**
    * Return the last computed up-to-date Change time-stamp.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
index 60611f4..08b26f7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ChangeCheckerImpl.java
@@ -85,7 +85,7 @@
   }
 
   @Override
-  public boolean isChangeUpToDate(Optional<ChangeIndexEvent> indexEvent) {
+  public boolean isUpToDate(Optional<ChangeIndexEvent> indexEvent) {
     getComputedChangeTs();
     if (!computedChangeTs.isPresent()) {
       log.warn("Unable to compute last updated ts for change {}", changeId);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java
new file mode 100644
index 0000000..c19d04a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupChecker.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+
+public interface GroupChecker extends UpToDateChecker<GroupIndexEvent> {
+
+  @Override
+  boolean isUpToDate(Optional<GroupIndexEvent> groupIndexEvent);
+
+  ObjectId getGroupHead(String groupUUID);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java
new file mode 100644
index 0000000..e9e40f3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImpl.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.config.AllUsersName;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import java.io.IOException;
+import java.util.Optional;
+import org.eclipse.jgit.errors.MissingObjectException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
+
+@Singleton
+class GroupCheckerImpl implements GroupChecker {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private final GitRepositoryManager repoManager;
+  private final AllUsersName allUsers;
+
+  @Inject
+  GroupCheckerImpl(GitRepositoryManager repoManager, AllUsersName allUsers) {
+    this.repoManager = repoManager;
+    this.allUsers = allUsers;
+  }
+
+  @Override
+  public boolean isUpToDate(Optional<GroupIndexEvent> groupIndexEvent) {
+    if (!groupIndexEvent.isPresent()) {
+      logger.atWarning().log("Group Index empty, considering this group up-to-date");
+      return true;
+    }
+    GroupIndexEvent event = groupIndexEvent.get();
+    AccountGroup.UUID groupUUID = AccountGroup.uuid(event.groupUUID);
+
+    if (event.sha1 == null) {
+      logger.atWarning().log(
+          "Event for group '%s' does not contain sha1, consider group up-to-date for compatibility.",
+          groupUUID);
+      return true;
+    }
+
+    try (Repository repo = repoManager.openRepository(allUsers)) {
+      if (commitExistsInRepo(repo, event.sha1)) {
+        logger.atInfo().log(
+            "Group '%s' up-to-date: sha1 '%s' exists in All-Users", groupUUID, event.sha1);
+        return true;
+      } else {
+        logger.atWarning().log(
+            "Group '%s' NOT up-to-date: sha1 '%s' still missing in All-Users",
+            groupUUID, event.sha1);
+      }
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Could not check whether Group '%s' is up-to-date", groupUUID);
+    }
+    return false;
+  }
+
+  @Override
+  public ObjectId getGroupHead(String groupUUID) {
+    try (Repository repo = repoManager.openRepository(allUsers)) {
+      return Optional.ofNullable(repo.exactRef(RefNames.refsGroups(AccountGroup.uuid(groupUUID))))
+          .map(Ref::getObjectId)
+          .orElse(ObjectId.zeroId());
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("Fatal: could not get head of group %s.", groupUUID);
+      return ObjectId.zeroId();
+    }
+  }
+
+  @VisibleForTesting
+  boolean commitExistsInRepo(Repository repo, ObjectId sha1) throws IOException {
+    try (RevWalk revWalk = new RevWalk(repo)) {
+      revWalk.parseCommit(sha1);
+      return true;
+    } catch (MissingObjectException e) {
+      logger.atWarning().log("Commit %s does not exist in All-Users", sha1);
+    }
+    return false;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
index a00e2ad..eef3e4b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandler.java
@@ -47,17 +47,20 @@
   private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final ChangeCheckerImpl.Factory changeChecker;
   private final ProjectsFilter projectsFilter;
+  private final GroupChecker groupChecker;
 
   @Inject
   IndexEventHandler(
       @IndexExecutor Executor executor,
       DynamicSet<IndexEventForwarder> forwarders,
       ChangeCheckerImpl.Factory changeChecker,
-      ProjectsFilter projectsFilter) {
+      ProjectsFilter projectsFilter,
+      GroupChecker groupChecker) {
     this.forwarders = forwarders;
     this.executor = executor;
     this.changeChecker = changeChecker;
     this.projectsFilter = projectsFilter;
+    this.groupChecker = groupChecker;
   }
 
   @Override
@@ -83,7 +86,8 @@
   @Override
   public void onGroupIndexed(String groupUUID) {
     if (!Context.isForwardedEvent()) {
-      IndexGroupTask task = new IndexGroupTask(new GroupIndexEvent(groupUUID));
+      IndexGroupTask task =
+          new IndexGroupTask(new GroupIndexEvent(groupUUID, groupChecker.getGroupHead(groupUUID)));
       if (queuedTasks.add(task)) {
         executor.execute(task);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
index 1f54385..075388f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/IndexModule.java
@@ -39,6 +39,7 @@
     DynamicSet.bind(binder(), ProjectIndexedListener.class).to(IndexEventHandler.class);
 
     bind(ProjectChecker.class).to(ProjectCheckerImpl.class);
+    bind(GroupChecker.class).to(GroupCheckerImpl.class);
 
     install(
         new FactoryModuleBuilder()
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java
index ece2443..e040fce 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectChecker.java
@@ -14,9 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.index;
 
-import com.google.gerrit.entities.Project;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
 
 /** Encapsulates the logic of verifying the up-to-date status of a project. */
-public interface ProjectChecker {
-  boolean isProjectUpToDate(Project.NameKey projectName);
-}
+public interface ProjectChecker extends UpToDateChecker<ProjectIndexEvent> {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java
index d9851f3..8655824 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/ProjectCheckerImpl.java
@@ -17,6 +17,8 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
+import java.util.Optional;
 
 public class ProjectCheckerImpl implements ProjectChecker {
   private final ProjectCache projectCache;
@@ -27,7 +29,9 @@
   }
 
   @Override
-  public boolean isProjectUpToDate(Project.NameKey projectName) {
-    return projectCache.get(projectName).isPresent();
+  public boolean isUpToDate(Optional<ProjectIndexEvent> indexEvent) {
+    return indexEvent
+        .flatMap(event -> projectCache.get(Project.nameKey(event.projectName)))
+        .isPresent();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/index/UpToDateChecker.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/UpToDateChecker.java
new file mode 100644
index 0000000..8a7a231
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/index/UpToDateChecker.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
+import java.util.Optional;
+
+public interface UpToDateChecker<E extends IndexEvent> {
+  /**
+   * Check if the local Change is aligned with the indexEvent received.
+   *
+   * @param indexEvent indexing event
+   * @return true if the local Change is up-to-date, false otherwise.
+   */
+  boolean isUpToDate(Optional<E> indexEvent);
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index a632b74..bf5c5d9 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -18,8 +18,8 @@
 
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEntry;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheKeyJsonParser;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandler;
-import com.googlesource.gerrit.plugins.multisite.forwarder.GsonParser;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
 import org.junit.Before;
@@ -36,7 +36,7 @@
 
   @Before
   public void setUp() {
-    router = new CacheEvictionEventRouter(cacheEvictionHandler, new GsonParser(new Gson()));
+    router = new CacheEvictionEventRouter(cacheEvictionHandler, new CacheKeyJsonParser(new Gson()));
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index 348a79d..f22cb14 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -35,6 +35,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
 import com.googlesource.gerrit.plugins.replication.events.RefReplicationDoneEvent;
 import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -95,7 +96,7 @@
   @Test
   public void routerShouldSendEventsToTheAppropriateHandler_GroupIndex() throws Exception {
     final String groupId = "12";
-    final GroupIndexEvent event = new GroupIndexEvent(groupId);
+    final GroupIndexEvent event = new GroupIndexEvent(groupId, ObjectId.zeroId());
     router.route(event);
 
     verify(indexGroupHandler)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/GsonParserTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheKeyJsonParserTest.java
similarity index 95%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/GsonParserTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheKeyJsonParserTest.java
index cb4a223..7efd4dd 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/GsonParserTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/CacheKeyJsonParserTest.java
@@ -24,11 +24,11 @@
 import com.googlesource.gerrit.plugins.multisite.cache.Constants;
 import org.junit.Test;
 
-public class GsonParserTest {
+public class CacheKeyJsonParserTest {
   private static final Object EMPTY_JSON = "{}";
 
   private final Gson gson = new EventGsonProvider().get();
-  private final GsonParser gsonParser = new GsonParser(gson);
+  private final CacheKeyJsonParser gsonParser = new CacheKeyJsonParser(gson);
 
   @Test
   public void accountIDParse() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
new file mode 100644
index 0000000..fb2a50b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedCacheEvictionHandlerIT.java
@@ -0,0 +1,132 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbConfiguration;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.server.cache.CacheRemovalListener;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.project.ProjectCacheImpl;
+import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
+import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@TestPlugin(
+    name = "multi-site",
+    sysModule =
+        "com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandlerIT$TestModule")
+public class ForwardedCacheEvictionHandlerIT extends LightweightPluginDaemonTest {
+  private static final Duration CACHE_EVICTIONS_WAIT_TIMEOUT = Duration.ofMinutes(1);
+
+  @SuppressWarnings("rawtypes")
+  @Inject
+  private DynamicSet<CacheRemovalListener> cacheRemovalListeners;
+
+  @Inject private CacheEvictionEventRouter objectUnderTest;
+  @Inject @EventGson private Gson gson;
+  private CacheEvictionsTracker<?, ?> evictionsCacheTracker;
+  private RegistrationHandle cacheEvictionRegistrationHandle;
+
+  public static class TestModule extends AbstractModule {
+    @Override
+    protected void configure() {
+      install(new ForwarderModule());
+      install(new CacheModule());
+      install(new RouterModule());
+      install(new IndexModule());
+      SharedRefDbConfiguration sharedRefDbConfig =
+          new SharedRefDbConfiguration(new Config(), "multi-site");
+      bind(SharedRefDbConfiguration.class).toInstance(sharedRefDbConfig);
+    }
+  }
+
+  public static class CacheEvictionsTracker<K, V> implements CacheRemovalListener<K, V> {
+    private final Map<String, Set<Object>> trackedEvictions;
+    private final CountDownLatch allExpectedEvictionsArrived;
+
+    public CacheEvictionsTracker(int numExpectedEvictions) {
+      allExpectedEvictionsArrived = new CountDownLatch(numExpectedEvictions);
+      trackedEvictions = Maps.newHashMap();
+    }
+
+    public Set<Object> trackedEvictionsFor(String cacheName) {
+      return trackedEvictions.getOrDefault(cacheName, Collections.emptySet());
+    }
+
+    public void waitForExpectedEvictions() throws InterruptedException {
+      allExpectedEvictionsArrived.await(
+          CACHE_EVICTIONS_WAIT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void onRemoval(
+        String pluginName, String cacheName, RemovalNotification<K, V> notification) {
+      trackedEvictions.compute(
+          cacheName,
+          (k, v) -> {
+            if (v == null) {
+              return Sets.newHashSet(notification.getKey());
+            }
+            v.add(notification.getKey());
+            return v;
+          });
+      allExpectedEvictionsArrived.countDown();
+    }
+  }
+
+  @Before
+  public void startTrackingCacheEvictions() {
+    evictionsCacheTracker = new CacheEvictionsTracker<>(1);
+    cacheEvictionRegistrationHandle = cacheRemovalListeners.add("gerrit", evictionsCacheTracker);
+  }
+
+  @After
+  public void stopTrackingCacheEvictions() {
+    cacheEvictionRegistrationHandle.remove();
+  }
+
+  @Test
+  public void shouldEvictProjectCache() throws Exception {
+    objectUnderTest.route(
+        new CacheEvictionEvent(ProjectCacheImpl.CACHE_NAME, gson.toJson(project)));
+    evictionsCacheTracker.waitForExpectedEvictions();
+
+    assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
+        .contains(project);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
index 04d83c5..e64759c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -190,6 +190,6 @@
       }
     }
 
-    when(changeCheckerPresentMock.isChangeUpToDate(any())).thenReturn(changeIsUpToDate);
+    when(changeCheckerPresentMock.isUpToDate(any())).thenReturn(changeIsUpToDate);
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index c703512..65d8d20 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -16,16 +16,23 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.index.TestGroupChecker;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -40,16 +47,22 @@
 
   @Rule public ExpectedException exception = ExpectedException.none();
   @Mock private GroupIndexer indexerMock;
+  @Mock private OneOffRequestContext ctxMock;
+  @Mock private ScheduledExecutorService indexExecutorMock;
   @Mock private Configuration config;
   @Mock private Configuration.Index index;
   private ForwardedIndexGroupHandler handler;
   private String uuid;
+  private static final int RETRY_INTERVAL = 1000;
+  private static final int MAX_TRIES = 2;
 
   @Before
   public void setUp() throws Exception {
     when(config.index()).thenReturn(index);
     when(index.numStripedLocks()).thenReturn(10);
-    handler = new ForwardedIndexGroupHandler(indexerMock, config);
+    when(index.retryInterval()).thenReturn(RETRY_INTERVAL);
+    when(index.maxTries()).thenReturn(MAX_TRIES);
+    handler = groupHandler(true);
     uuid = "123";
   }
 
@@ -108,4 +121,28 @@
 
     verify(indexerMock).index(AccountGroup.uuid(uuid));
   }
+
+  @Test
+  public void shouldChangeIndexEventWheNotUpToDate() throws IOException {
+    ForwardedIndexGroupHandler groupHandlerWithOutdatedEvent = groupHandler(false);
+    groupHandlerWithOutdatedEvent.index(uuid, Operation.INDEX, groupIndexEvent(uuid));
+    verify(indexerMock).index(AccountGroup.uuid(uuid));
+  }
+
+  @Test
+  public void shouldRescheduleGroupIndexingWhenItIsNotUpToDate() throws IOException {
+    ForwardedIndexGroupHandler groupHandlerWithOutdatedEvent = groupHandler(false);
+    groupHandlerWithOutdatedEvent.index(uuid, Operation.INDEX, groupIndexEvent(uuid));
+    verify(indexExecutorMock)
+        .schedule(any(Runnable.class), eq(new Long(RETRY_INTERVAL)), eq(TimeUnit.MILLISECONDS));
+  }
+
+  private ForwardedIndexGroupHandler groupHandler(boolean checkIsUpToDate) {
+    return new ForwardedIndexGroupHandler(
+        indexerMock, config, new TestGroupChecker(checkIsUpToDate), ctxMock, indexExecutorMock);
+  }
+
+  private Optional<GroupIndexEvent> groupIndexEvent(String uuid) {
+    return Optional.of(new GroupIndexEvent(uuid, null));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
index 9ed0f7d..72b9427 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -23,6 +23,7 @@
 
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.index.project.ProjectIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation;
 import com.googlesource.gerrit.plugins.multisite.index.ProjectChecker;
@@ -44,6 +45,7 @@
   @Rule public ExpectedException exception = ExpectedException.none();
   @Mock private ProjectIndexer indexerMock;
   @Mock private Configuration configMock;
+  @Mock private OneOffRequestContext ctxMock;
   @Mock private ProjectChecker projectCheckerMock;
   @Mock private Configuration.Index indexMock;
   @Mock private ScheduledExecutorService indexExecutorMock;
@@ -56,10 +58,10 @@
     when(indexMock.numStripedLocks()).thenReturn(10);
     when(indexMock.retryInterval()).thenReturn(0);
     when(indexMock.maxTries()).thenReturn(2);
-    when(projectCheckerMock.isProjectUpToDate(any())).thenReturn(true);
+    when(projectCheckerMock.isUpToDate(any())).thenReturn(true);
     handler =
         new ForwardedIndexProjectHandler(
-            indexerMock, projectCheckerMock, indexExecutorMock, configMock);
+            indexerMock, projectCheckerMock, ctxMock, indexExecutorMock, configMock);
     nameKey = "project/name";
   }
 
@@ -118,13 +120,4 @@
 
     verify(indexerMock).index(Project.nameKey(nameKey));
   }
-
-  @Test
-  public void indexAttemptShouldFailWhenCheckerFails() throws Exception {
-    handler =
-        new ForwardedIndexProjectHandler(
-            indexerMock, (projectName) -> false, indexExecutorMock, configMock);
-
-    assertThat(handler.attemptIndex(nameKey, Optional.empty())).isFalse();
-  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
new file mode 100644
index 0000000..6403cce
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupCheckerImplTest.java
@@ -0,0 +1,123 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.doReturn;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.config.AllUsersName;
+import com.google.gerrit.server.config.AllUsersNameProvider;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefDatabase;
+import org.eclipse.jgit.lib.Repository;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class GroupCheckerImplTest {
+  ObjectId AN_OBJECT_ID = ObjectId.fromString("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
+  AllUsersName allUsers = new AllUsersName(AllUsersNameProvider.DEFAULT);
+
+  GroupCheckerImpl objectUnderTest;
+  @Mock private GitRepositoryManager repoManagerMock;
+  @Mock private RefDatabase refDatabaseMock;
+  @Mock private Repository repoMock;
+
+  @Before
+  public void setUp() throws Exception {
+    doReturn(repoMock).when(repoManagerMock).openRepository(allUsers);
+    doReturn(refDatabaseMock).when(repoMock).getRefDatabase();
+    objectUnderTest = new GroupCheckerImpl(repoManagerMock, allUsers);
+  }
+
+  @Test
+  public void isGroupUpToDate_shouldReturnTrueWhenEventIsEmpty() {
+    assertThat(objectUnderTest.isUpToDate(Optional.empty())).isTrue();
+  }
+
+  @Test
+  public void isGroupUpToDate_shouldReturnFalseWhenSha1DoesNotExistInAllUsers() {
+    setCommitExistsInRepo(false);
+    assertThat(
+            objectUnderTest.isUpToDate(groupIndexEvent(UUID.randomUUID().toString(), AN_OBJECT_ID)))
+        .isFalse();
+  }
+
+  @Test
+  public void isGroupUpToDate_shouldReturnFalseWhenSha1ExistsInAllUsers() {
+    setCommitExistsInRepo(true);
+    assertThat(
+            objectUnderTest.isUpToDate(groupIndexEvent(UUID.randomUUID().toString(), AN_OBJECT_ID)))
+        .isTrue();
+  }
+
+  @Test
+  public void isGroupUpToDate_shouldReturnTrueWhenSha1IsNotDefined() {
+    UUID groupUUID = UUID.randomUUID();
+    setCommitExistsInRepo(true);
+
+    assertThat(objectUnderTest.isUpToDate(groupIndexEvent(groupUUID.toString(), null))).isTrue();
+  }
+
+  @Test
+  public void getGroupHead_shouldReturnTheExactReValueWhenDefined() throws IOException {
+    UUID groupUUID = UUID.randomUUID();
+    setupExactRefInGroup(groupUUID, AN_OBJECT_ID);
+
+    assertThat(objectUnderTest.getGroupHead(groupUUID.toString())).isEqualTo(AN_OBJECT_ID);
+  }
+
+  @Test
+  public void getGroupHead_shouldReturnObjectIdZeroWhenExactRefIsNull() throws IOException {
+    UUID groupUUID = UUID.randomUUID();
+    setupExactRefInGroup(groupUUID, null);
+
+    assertThat(objectUnderTest.getGroupHead(groupUUID.toString())).isEqualTo(ObjectId.zeroId());
+  }
+
+  private Optional<GroupIndexEvent> groupIndexEvent(String uuid, @Nullable ObjectId sha1) {
+    return Optional.of(new GroupIndexEvent(uuid, sha1));
+  }
+
+  private void setCommitExistsInRepo(boolean commitExists) {
+    objectUnderTest =
+        new GroupCheckerImpl(repoManagerMock, allUsers) {
+          @Override
+          boolean commitExistsInRepo(Repository repo, ObjectId sha1) {
+            return commitExists;
+          }
+        };
+  }
+
+  private void setupExactRefInGroup(UUID groupUUID, @Nullable ObjectId objectId)
+      throws IOException {
+    String groupRefName = RefNames.refsGroups(AccountGroup.uuid(groupUUID.toString()));
+    ObjectIdRef.Unpeeled aRef = new ObjectIdRef.Unpeeled(Ref.Storage.LOOSE, groupRefName, objectId);
+    doReturn(objectId == null ? null : aRef).when(refDatabaseMock).exactRef(groupRefName);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
new file mode 100644
index 0000000..93cec05
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/GroupEventIndexTest.java
@@ -0,0 +1,45 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import java.util.UUID;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Test;
+
+public class GroupEventIndexTest {
+  private static final Gson gson = new EventGsonProvider().get();
+
+  @Test
+  public void groupEventIndexRoundTripWithSha1() {
+    String aGroupUUID = UUID.randomUUID().toString();
+    ObjectId anObjectId = ObjectId.fromString("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef");
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, anObjectId);
+
+    assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
+  }
+
+  @Test
+  public void groupEventIndexRoundTripWithoutSha1() {
+    String aGroupUUID = UUID.randomUUID().toString();
+    GroupIndexEvent original = new GroupIndexEvent(aGroupUUID, null);
+
+    assertThat(gson.fromJson(gson.toJson(original), GroupIndexEvent.class)).isEqualTo(original);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
index b26532c..660a302 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/IndexEventHandlerTest.java
@@ -46,7 +46,11 @@
   public void setUp() {
     eventHandler =
         new IndexEventHandler(
-            MoreExecutors.directExecutor(), asDynamicSet(forwarder), changeChecker, projectsFilter);
+            MoreExecutors.directExecutor(),
+            asDynamicSet(forwarder),
+            changeChecker,
+            projectsFilter,
+            new TestGroupChecker(true));
   }
 
   private DynamicSet<IndexEventForwarder> asDynamicSet(IndexEventForwarder forwarder) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java
new file mode 100644
index 0000000..0af7e9e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/index/TestGroupChecker.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.index;
+
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
+import java.util.Optional;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Ignore;
+
+@Ignore
+public class TestGroupChecker implements GroupChecker {
+
+  private final boolean isUpToDate;
+
+  public TestGroupChecker(boolean isUpToDate) {
+    this.isUpToDate = isUpToDate;
+  }
+
+  private static final String someObjectId = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeef";
+
+  @Override
+  public boolean isUpToDate(Optional<GroupIndexEvent> groupIndexEvent) {
+    return isUpToDate;
+  }
+
+  @Override
+  public ObjectId getGroupHead(String groupUUID) {
+    return ObjectId.fromString(someObjectId);
+  }
+}
diff --git a/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/CreateChangeUsingMultiGerrit.scala b/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/CreateChangeUsingMultiGerrit.scala
index eade1cf..fdacd81 100644
--- a/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/CreateChangeUsingMultiGerrit.scala
+++ b/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/CreateChangeUsingMultiGerrit.scala
@@ -25,7 +25,6 @@
 class CreateChangeUsingMultiGerrit extends GerritSimulation {
   private val data: FeederBuilder = jsonFile(resource).convert(keys).queue
   private val projectName = className
-  private val numberKey = "_number"
 
   override def relativeRuntimeWeight = 10
 
@@ -33,7 +32,7 @@
     .feed(data)
     .exec(httpRequest
       .body(ElFileBody(body)).asJson
-      .check(regex("\"" + numberKey + "\":(\\d+),").saveAs(numberKey)))
+      .check(regex("\"_" + numberKey + "\":(\\d+),").saveAs(numberKey)))
     .exec(session => {
       deleteChange.number = Some(session(numberKey).as[Int])
       session
diff --git a/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/DeleteChangeUsingMultiGerrit1.scala b/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/DeleteChangeUsingMultiGerrit1.scala
index 457de0d..b509a19 100644
--- a/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/DeleteChangeUsingMultiGerrit1.scala
+++ b/src/test/scala/com/googlesource/gerrit/plugins/multisite/scenarios/DeleteChangeUsingMultiGerrit1.scala
@@ -40,12 +40,12 @@
     .feed(data)
     .exec(session => {
       if (number.nonEmpty) {
-        session.set("number", number.get)
+        session.set(numberKey, number.get)
       } else {
         session
       }
     })
-    .exec(http(uniqueName).delete("${url}${number}"))
+    .exec(http(uniqueName).delete("${url}${"+ numberKey +"}"))
 
   setUp(
     test.inject(