Merge remote-tracking branch 'origin/stable-2.16' into stable-3.0

* origin/stable-2.16:
  Improve readability of the refs filtering during replication
  Reload local version when in memory ref is not up to date
  Remove EventConsumerIT test

Change-Id: I963f190f06f979c65043a7e7e1c8f6f50ee65433
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 42f7194..b666c02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -15,76 +15,141 @@
 package com.googlesource.gerrit.plugins.multisite.validation;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.google.common.base.Preconditions;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
 public class MultisiteReplicationPushFilter implements ReplicationPushFilter {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String REF_META_SUFFIX = "/meta";
+  public static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
+  public static final int RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
+
   static final String REPLICATION_LOG_NAME = "replication_log";
   static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
 
   private final SharedRefDatabaseWrapper sharedRefDb;
+  private final GitRepositoryManager gitRepositoryManager;
 
   @Inject
-  public MultisiteReplicationPushFilter(SharedRefDatabaseWrapper sharedRefDb) {
+  public MultisiteReplicationPushFilter(
+      SharedRefDatabaseWrapper sharedRefDb, GitRepositoryManager gitRepositoryManager) {
     this.sharedRefDb = sharedRefDb;
+    this.gitRepositoryManager = gitRepositoryManager;
   }
 
   @Override
   public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
     Set<String> outdatedChanges = new HashSet<>();
 
-    List<RemoteRefUpdate> filteredRefUpdates =
-        remoteUpdatesList.stream()
-            .filter(
-                refUpdate -> {
-                  String ref = refUpdate.getSrcRef();
-                  try {
-                    if (sharedRefDb.isUpToDate(
-                        new Project.NameKey(projectName),
-                        new ObjectIdRef.Unpeeled(
-                            Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
-                      return true;
+    try (Repository repository =
+        gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+      List<RemoteRefUpdate> filteredRefUpdates =
+          remoteUpdatesList.stream()
+              .filter(
+                  refUpdate -> {
+                    boolean refUpToDate = isUpToDateWithRetry(projectName, repository, refUpdate);
+                    if (!refUpToDate) {
+                      repLog.warn(
+                          "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+                          refUpdate);
+                      if (refUpdate.getSrcRef().endsWith(REF_META_SUFFIX)) {
+                        outdatedChanges.add(getRootChangeRefPrefix(refUpdate.getSrcRef()));
+                      }
                     }
-                    repLog.warn(
-                        "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
-                        refUpdate);
-                  } catch (GlobalRefDbLockException e) {
-                    repLog.warn(
-                        "{} is locked on shared-refdb and thus will NOT BE replicated", refUpdate);
-                  }
-                  if (ref.endsWith(REF_META_SUFFIX)) {
-                    outdatedChanges.add(getRootChangeRefPrefix(ref));
-                  }
-                  return false;
-                })
-            .collect(Collectors.toList());
+                    return refUpToDate;
+                  })
+              .collect(Collectors.toList());
 
-    return filteredRefUpdates.stream()
-        .filter(
-            refUpdate -> {
-              if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
-                repLog.warn(
-                    "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
-                    refUpdate);
-                return false;
-              }
-              return true;
-            })
-        .collect(Collectors.toList());
+      return filteredRefUpdates.stream()
+          .filter(
+              refUpdate -> {
+                if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
+                  repLog.warn(
+                      "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
+                      refUpdate);
+                  return false;
+                }
+                return true;
+              })
+          .collect(Collectors.toList());
+
+    } catch (IOException ioe) {
+      String message = String.format("Error while opening project: '%s'", projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return Collections.emptyList();
+    }
+  }
+
+  private boolean isUpToDateWithRetry(
+      String projectName, Repository repository, RemoteRefUpdate refUpdate) {
+    String ref = refUpdate.getSrcRef();
+    try {
+      if (sharedRefDb.isUpToDate(
+          new Project.NameKey(projectName),
+          new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, refUpdate.getNewObjectId()))) {
+        return true;
+      }
+
+      randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+          projectName, refUpdate, ref);
+
+      return sharedRefDb.isUpToDate(
+          new Project.NameKey(projectName),
+          new ObjectIdRef.Unpeeled(Ref.Storage.NETWORK, ref, getNotNullExactRef(repository, ref)));
+    } catch (GlobalRefDbLockException gle) {
+      String message =
+          String.format("%s is locked on shared-refdb and thus will NOT BE replicated", ref);
+      repLog.error(message);
+      logger.atSevere().withCause(gle).log(message);
+      return false;
+    } catch (IOException ioe) {
+      String message =
+          String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return false;
+    }
+  }
+
+  private void randomSleepForMitigatingConditionWhereLocalRefHaveJustBeenChanged(
+      String projectName, RemoteRefUpdate refUpdate, String ref) {
+    int randomSleepTimeMsec =
+        MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS
+            + new Random().nextInt(RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS);
+    repLog.debug(
+        String.format(
+            "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and re-check",
+            ref, projectName, refUpdate.getNewObjectId(), randomSleepTimeMsec));
+    try {
+      Thread.sleep(randomSleepTimeMsec);
+    } catch (InterruptedException ie) {
+      String message =
+          String.format("Error while waiting for next check for '%s', ref '%s'", projectName, ref);
+      repLog.error(message);
+      logger.atWarning().withCause(ie).log(message);
+    }
   }
 
   private String changePrefix(String changeRef) {
@@ -106,4 +171,10 @@
 
     return changeMetaRef;
   }
+
+  private ObjectId getNotNullExactRef(Repository repository, String refName) throws IOException {
+    Ref ref = repository.exactRef(refName);
+    Preconditions.checkNotNull(ref);
+    return ref.getObjectId();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
index e2de613..fb8eb40 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
@@ -18,37 +18,61 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
 import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.multisite.validation.DisabledSharedRefLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.MultisiteReplicationPushFilter;
-import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
-public class MultisiteReplicationPushFilterTest {
+public class MultisiteReplicationPushFilterTest extends LocalDiskRepositoryTestCase
+    implements RefFixture {
+
+  @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
 
   @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
 
-  String project = "fooProject";
-  Project.NameKey projectName = new Project.NameKey(project);
+  @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+  String project = A_TEST_PROJECT_NAME;
+  Project.NameKey projectName = A_TEST_PROJECT_NAME_KEY;
+
+  private TestRepository<InMemoryRepository> repo;
+
+  @Before
+  public void setUp() throws Exception {
+    InMemoryRepository inMemoryRepo =
+        gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+    repo = new TestRepository<>(inMemoryRepo);
+  }
 
   @Test
   public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
@@ -57,7 +81,7 @@
     doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabaseMock);
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
@@ -71,13 +95,42 @@
     SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(outdatedRef.getSrcRef());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabase);
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactly(refUpToDate);
   }
 
   @Test
+  public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(temporaryOutdated);
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    repo.branch("refs/heads/temporaryOutdated").commit().create();
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(false).when(sharedRefDatabaseMock).isUpToDate(eq(projectName), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).isEmpty();
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
   public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
     RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
     RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
@@ -88,7 +141,7 @@
     SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(changeMetaRef.getSrcRef());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabase);
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
@@ -145,9 +198,10 @@
         new DisabledSharedRefLogger());
   }
 
-  private RemoteRefUpdate refUpdate(String refName) throws IOException {
+  private RemoteRefUpdate refUpdate(String refName) throws Exception {
     ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
     Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+    repo.branch(refName).commit().create();
     return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
   }
 }