Merge branch 'stable-3.1'

* stable-3.1:
  Use dockerized prometheus instance instead of the local one
  Improve readability of the refs filtering during replication
  Reload local version when in memory ref is not up to date
  Remove EventConsumerIT test

Change-Id: Ia2d4ce69c0433a0051ae53df4b61af51f77c5f91
diff --git a/setup_local_env/configs/prometheus.yml b/setup_local_env/configs/prometheus.yml
new file mode 100644
index 0000000..8eaa989
--- /dev/null
+++ b/setup_local_env/configs/prometheus.yml
@@ -0,0 +1,17 @@
+global:
+  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
+  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
+  # scrape_timeout is set to the global default (10s).
+
+scrape_configs:
+ - job_name: 'metrics'
+   scheme: http
+   metrics_path: '/plugins/metrics-reporter-prometheus/metrics'
+   params:
+      format: ['prometheus']
+   bearer_token: token
+   scrape_interval: 5s
+   static_configs:
+      - targets: ['$GERRIT_SITE_HOST:18080','$GERRIT_SITE_HOST:18081']
+        labels:
+          env: 'unit'
diff --git a/setup_local_env/docker-compose.kafka-broker.yaml b/setup_local_env/docker-compose.yaml
similarity index 63%
rename from setup_local_env/docker-compose.kafka-broker.yaml
rename to setup_local_env/docker-compose.yaml
index b7e91f0..c386d46 100644
--- a/setup_local_env/docker-compose.kafka-broker.yaml
+++ b/setup_local_env/docker-compose.yaml
@@ -13,3 +13,11 @@
     environment:
       KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  prometheus:
+    image: prom/prometheus:v2.16.0
+    user: root
+    volumes:
+     - $COMMON_LOCATION/prometheus.yml:/etc/prometheus/prometheus.yml
+    ports:
+      - "9090:9090"
+    network_mode: $NETWORK_MODE
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index affa535..f9329e7 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -23,7 +23,6 @@
 
 function check_application_requirements {
   type haproxy >/dev/null 2>&1 || { echo >&2 "Require haproxy but it's not installed. Aborting."; exit 1; }
-  type prometheus >/dev/null 2>&1 || { echo >&2 "Require prometheus but it's not installed. Aborting."; exit 1; }
   type java >/dev/null 2>&1 || { echo >&2 "Require java but it's not installed. Aborting."; exit 1; }
   type docker >/dev/null 2>&1 || { echo >&2 "Require docker but it's not installed. Aborting."; exit 1; }
   type docker-compose >/dev/null 2>&1 || { echo >&2 "Require docker-compose but it's not installed. Aborting."; exit 1; }
@@ -99,16 +98,6 @@
   haproxy -f $HA_PROXY_CONFIG_DIR/haproxy.cfg &
 }
 
-function start_prometheus {
-  mkdir -p $PROMETHEUS_CONFIG_DIR
-  cat $SCRIPT_DIR/prometheus-config/prometheus.yml | envsubst > $PROMETHEUS_CONFIG_DIR/prometheus.yml
-
-  echo "Starting Prometheus..."
-  echo "THE SCRIPT LOCATION $SCRIPT_DIR"
-  echo "THE HA SCRIPT_LOCATION $PROMETHEUS_SCRIPT_DIR"
-  prometheus --config.file $PROMETHEUS_CONFIG_DIR/prometheus.yml &
-}
-
 function deploy_config_files {
   # KAFKA configuration
   export KAFKA_PORT=9092
@@ -139,14 +128,26 @@
   copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_KAFKA_GROUP_ID
 }
 
+function is_docker_desktop {
+  echo $(docker info | grep "Operating System: Docker Desktop" | wc -l)
+}
+
+function docker_host_env {
+  IS_DOCKER_DESKTOP=$(is_docker_desktop)
+  if [ "$IS_DOCKER_DESKTOP" = "1" ];then
+    echo "mac"
+  else
+    echo "linux"
+  fi
+}
+
 
 function cleanup_environment {
   echo "Killing existing HA-PROXY setup"
   kill $(ps -ax | grep haproxy | grep "gerrit_setup/ha-proxy-config" | awk '{print $1}') 2> /dev/null
-  echo "Killing existing Prometheus setup"
-  kill $(ps -ax | grep prometheus | grep "gerrit_setup/prometheus-config" | awk '{print $1}') 2> /dev/null
-  echo "Stopping kafka and zk"
-  docker-compose -f $SCRIPT_DIR/docker-compose.kafka-broker.yaml down 2> /dev/null
+
+  echo "Stopping docker containers"
+  docker-compose -f $SCRIPT_DIR/docker-compose.yaml down 2> /dev/null
 
   echo "Stopping GERRIT instances"
   $1/bin/gerrit.sh stop 2> /dev/null
@@ -308,7 +309,7 @@
 export SSH_ADVERTISED_PORT=${SSH_ADVERTISED_PORT:-"29418"}
 HTTPS_ENABLED=${HTTPS_ENABLED:-"false"}
 
-COMMON_LOCATION=$DEPLOYMENT_LOCATION/gerrit_setup
+export COMMON_LOCATION=$DEPLOYMENT_LOCATION/gerrit_setup
 LOCATION_TEST_SITE_1=$COMMON_LOCATION/instance-1
 LOCATION_TEST_SITE_2=$COMMON_LOCATION/instance-2
 HA_PROXY_CONFIG_DIR=$COMMON_LOCATION/ha-proxy-config
@@ -433,12 +434,23 @@
   ln -s $LOCATION_TEST_SITE_2/lib/multi-site.jar $LOCATION_TEST_SITE_2/plugins/multi-site.jar
 fi
 
+DOCKER_HOST_ENV=$(docker_host_env)
+echo "Docker host environment: $DOCKER_HOST_ENV"
+if [ "$DOCKER_HOST_ENV" = "mac" ];then
+  export GERRIT_SITE_HOST="host.docker.internal"
+  export NETWORK_MODE="bridge"
+else
+  export GERRIT_SITE_HOST="localhost"
+  export NETWORK_MODE="host"
+fi
+
+cat $SCRIPT_DIR/configs/prometheus.yml | envsubst > $COMMON_LOCATION/prometheus.yml
 
 IS_KAFKA_RUNNING=$(check_if_kafka_is_running)
 if [ $IS_KAFKA_RUNNING -lt 1 ];then
 
   echo "Starting zk and kafka"
-  docker-compose -f $SCRIPT_DIR/docker-compose.kafka-broker.yaml up -d
+  docker-compose -f $SCRIPT_DIR/docker-compose.yaml up -d
   echo "Waiting for kafka to start..."
   while [[ $(check_if_kafka_is_running) -lt 1 ]];do sleep 10s; done
 fi
@@ -456,11 +468,6 @@
   start_ha_proxy
 fi
 
-if [[ $(ps -ax | grep promtheus | grep "gerrit_setup/prometheus-config" | awk '{print $1}' | wc -l) -lt 1 ]];then
-  echo "Starting prometheus"
-  start_prometheus
-fi
-
 echo "==============================="
 echo "Current gerrit multi-site setup"
 echo "==============================="
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index b6f4033..f831671 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -15,76 +15,141 @@
 package com.googlesource.gerrit.plugins.multisite.validation;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.google.common.base.Preconditions;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
 public class MultisiteReplicationPushFilter implements ReplicationPushFilter {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String REF_META_SUFFIX = "/meta";
+  public static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
+  public static final int RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
+
   static final String REPLICATION_LOG_NAME = "replication_log";
   static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
 
   private final SharedRefDatabaseWrapper sharedRefDb;
+  private final GitRepositoryManager gitRepositoryManager;
 
   @Inject
-  public MultisiteReplicationPushFilter(SharedRefDatabaseWrapper sharedRefDb) {
+  public MultisiteReplicationPushFilter(
+      SharedRefDatabaseWrapper sharedRefDb, GitRepositoryManager gitRepositoryManager) {
     this.sharedRefDb = sharedRefDb;
+    this.gitRepositoryManager = gitRepositoryManager;
   }
 
   @Override
   public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
     Set<String> outdatedChanges = new HashSet<>();
 
-    List<RemoteRefUpdate> filteredRefUpdates =
-        remoteUpdatesList.stream()
-            .filter(
-                refUpdate -> {
-                  String ref = refUpdate.getSrcRef();
-                  try {
-                    if (sharedRefDb.isUpToDate(
-                        Project.nameKey(projectName),
-                        new ObjectIdRef.Unpeeled(
-                            Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
-                      return true;
+    try (Repository repository =
+        gitRepositoryManager.openRepository(Project.nameKey(projectName))) {
+      List<RemoteRefUpdate> filteredRefUpdates =
+          remoteUpdatesList.stream()
+              .filter(
+                  refUpdate -> {
+                    boolean refUpToDate = isUpToDateWithRetry(projectName, repository, refUpdate);
+                    if (!refUpToDate) {
+                      repLog.warn(
+                          "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+                          refUpdate);
+                      if (refUpdate.getSrcRef().endsWith(REF_META_SUFFIX)) {
+                        outdatedChanges.add(getRootChangeRefPrefix(refUpdate.getSrcRef()));
+                      }
                     }
-                    repLog.warn(
-                        "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
-                        refUpdate);
-                  } catch (GlobalRefDbLockException e) {
-                    repLog.warn(
-                        "{} is locked on shared-refdb and thus will NOT BE replicated", refUpdate);
-                  }
-                  if (ref.endsWith(REF_META_SUFFIX)) {
-                    outdatedChanges.add(getRootChangeRefPrefix(ref));
-                  }
-                  return false;
-                })
-            .collect(Collectors.toList());
+                    return refUpToDate;
+                  })
+              .collect(Collectors.toList());
 
-    return filteredRefUpdates.stream()
-        .filter(
-            refUpdate -> {
-              if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
-                repLog.warn(
-                    "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
-                    refUpdate);
-                return false;
-              }
-              return true;
-            })
-        .collect(Collectors.toList());
+      return filteredRefUpdates.stream()
+          .filter(
+              refUpdate -> {
+                if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
+                  repLog.warn(
+                      "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
+                      refUpdate);
+                  return false;
+                }
+                return true;
+              })
+          .collect(Collectors.toList());
+
+    } catch (IOException ioe) {
+      String message = String.format("Error while opening project: '%s'", projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return Collections.emptyList();
+    }
+  }
+
+  private boolean isUpToDateWithRetry(
+      String projectName, Repository repository, RemoteRefUpdate refUpdate) {
+    String ref = refUpdate.getSrcRef();
+    try {
+      if (sharedRefDb.isUpToDate(
+          Project.nameKey(projectName),
+          new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
+        return true;
+      }
+
+      randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+          projectName, refUpdate, ref);
+
+      return sharedRefDb.isUpToDate(
+          Project.nameKey(projectName),
+          new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, getNotNullExactRef(repository, ref)));
+    } catch (GlobalRefDbLockException gle) {
+      String message =
+          String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
+      repLog.error(message);
+      logger.atSevere().withCause(gle).log(message);
+      return false;
+    } catch (IOException ioe) {
+      String message =
+          String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return false;
+    }
+  }
+
+  private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+      String projectName, RemoteRefUpdate refUpdate, String ref) {
+    int randomSleepTimeMsec =
+        MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS
+            + new Random().nextInt(RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS);
+    repLog.debug(
+        String.format(
+            "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and re-check",
+            ref, projectName, refUpdate.getNewObjectId(), randomSleepTimeMsec));
+    try {
+      Thread.sleep(randomSleepTimeMsec);
+    } catch (InterruptedException ie) {
+      String message =
+          String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
+      repLog.error(message);
+      logger.atWarning().withCause(ie).log(message);
+    }
   }
 
   private String changePrefix(String changeRef) {
@@ -106,4 +171,10 @@
 
     return changeMetaRef;
   }
+
+  private ObjectId getNotNullExactRef(Repository repository, String refName) throws IOException {
+    Ref ref = repository.exactRef(refName);
+    Preconditions.checkNotNull(ref);
+    return ref.getObjectId();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
index 5242ae8..7e4593b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
@@ -18,37 +18,61 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.multisite.validation.DisabledSharedRefLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.MultisiteReplicationPushFilter;
-import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
-public class MultisiteReplicationPushFilterTest {
+public class MultisiteReplicationPushFilterTest extends LocalDiskRepositoryTestCase
+    implements RefFixture {
+
+  @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
 
   @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
 
-  String project = "fooProject";
-  Project.NameKey projectName = Project.nameKey(project);
+  @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+  String project = A_TEST_PROJECT_NAME;
+  Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+  private TestRepository<InMemoryRepository> repo;
+
+  @Before
+  public void setUp() throws Exception {
+    InMemoryRepository inMemoryRepo =
+        gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+    repo = new TestRepository<>(inMemoryRepo);
+  }
 
   @Test
   public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
@@ -57,7 +81,7 @@
     doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabaseMock);
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
@@ -71,13 +95,42 @@
     SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(outdatedRef.getSrcRef());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabase);
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactly(refUpToDate);
   }
 
   @Test
+  public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(temporaryOutdated);
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    repo.branch("refs/heads/temporaryOutdated").commit().create();
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(false).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).isEmpty();
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
   public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
     RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
     RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
@@ -88,7 +141,7 @@
     SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(changeMetaRef.getSrcRef());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabase);
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
@@ -145,9 +198,10 @@
         new DisabledSharedRefLogger());
   }
 
-  private RemoteRefUpdate refUpdate(String refName) throws IOException {
+  private RemoteRefUpdate refUpdate(String refName) throws Exception {
     ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
     Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+    repo.branch(refName).commit().create();
     return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
   }
 }