Reload local version when in memory ref is not up to date

When multiple updates are replication events might pile up
and the ref carried in the RemoteRefUpdate event might be
behind the one locally stored.

This will lead to filter out some replication events.
The situation is restored as soon as a new replication will
be triggered. The repository will be out of sync
until the arrival of the next replication.

Read the local ref and re-try the comparison to mitigate the
issue.

Bug: Issue 12425
Change-Id: I579f277635eff5d5cdd1d71a090fe06f2be22c0f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
index 5f25607..e4bd2e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -14,73 +14,133 @@
 
 package com.googlesource.gerrit.plugins.multisite.validation;
 
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedLockException;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
 import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
 public class MultisiteReplicationPushFilter implements ReplicationPushFilter {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final String REF_META_SUFFIX = "/meta";
+  public static final int MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
+  public static final int RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS = 1000;
+
   static final String REPLICATION_LOG_NAME = "replication_log";
   static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
 
   private final SharedRefDatabaseWrapper sharedRefDb;
+  private final GitRepositoryManager gitRepositoryManager;
 
   @Inject
-  public MultisiteReplicationPushFilter(SharedRefDatabaseWrapper sharedRefDb) {
+  public MultisiteReplicationPushFilter(
+      SharedRefDatabaseWrapper sharedRefDb, GitRepositoryManager gitRepositoryManager) {
     this.sharedRefDb = sharedRefDb;
+    this.gitRepositoryManager = gitRepositoryManager;
   }
 
   @Override
   public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
     Set<String> outdatedChanges = new HashSet<>();
-
-    List<RemoteRefUpdate> filteredRefUpdates =
-        remoteUpdatesList.stream()
-            .filter(
-                refUpdate -> {
-                  String ref = refUpdate.getSrcRef();
-                  try {
-                    if (sharedRefDb.isUpToDate(
-                        projectName, SharedRefDatabase.newRef(ref, refUpdate.getNewObjectId()))) {
-                      return true;
+    try (Repository repository =
+        gitRepositoryManager.openRepository(Project.NameKey.parse(projectName))) {
+      List<RemoteRefUpdate> filteredRefUpdates =
+          remoteUpdatesList.stream()
+              .filter(
+                  refUpdate -> {
+                    String ref = refUpdate.getSrcRef();
+                    try {
+                      if (sharedRefDb.isUpToDate(
+                          projectName, SharedRefDatabase.newRef(ref, refUpdate.getNewObjectId()))) {
+                        return true;
+                      }
+                      // The ref coming from the event might be old compared to the local version.
+                      // Valid refs won't be replicated because of this misalignment.
+                      // Reading the local ref and re-trying the comparison, after a short sleep,
+                      // could mitigate the issue.
+                      int waitBeforeReloadLocalVersionMs =
+                          MIN_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS
+                              + new Random().nextInt(RANDOM_WAIT_BEFORE_RELOAD_LOCAL_VERSION_MS);
+                      repLog.debug(
+                          String.format(
+                              "'%s' is not up-to-date for project '%s' [local='%s']. Reload local ref in '%d ms' and re-check",
+                              ref,
+                              projectName,
+                              refUpdate.getNewObjectId(),
+                              waitBeforeReloadLocalVersionMs));
+                      Thread.sleep(waitBeforeReloadLocalVersionMs);
+                      Optional<ObjectId> objectIdVersion =
+                          getProjectLocalObjectIdVersion(repository, projectName, ref);
+                      if (objectIdVersion.isPresent()
+                          && sharedRefDb.isUpToDate(
+                              projectName,
+                              new ObjectIdRef.Unpeeled(
+                                  Ref.Storage.NETWORK, ref, objectIdVersion.get()))) {
+                        repLog.debug("{} is up-to-date after retrying", objectIdVersion);
+                        return true;
+                      }
+                      repLog.warn(
+                          "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+                          refUpdate);
+                    } catch (SharedLockException e) {
+                      repLog.warn(
+                          "{} is locked on shared-refdb and thus will NOT BE replicated",
+                          refUpdate);
+                    } catch (InterruptedException ie) {
+                      String message =
+                          String.format(
+                              "Error while waiting for next check for '%s', ref '%s'",
+                              projectName, ref);
+                      repLog.error(message);
+                      logger.atSevere().withCause(ie).log(message);
+                      return false;
                     }
-                    repLog.warn(
-                        "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
-                        refUpdate);
-                  } catch (SharedLockException e) {
-                    repLog.warn(
-                        "{} is locked on shared-refdb and thus will NOT BE replicated", refUpdate);
-                  }
-                  if (ref.endsWith(REF_META_SUFFIX)) {
-                    outdatedChanges.add(getRootChangeRefPrefix(ref));
-                  }
-                  return false;
-                })
-            .collect(Collectors.toList());
+                    if (ref.endsWith(REF_META_SUFFIX)) {
+                      outdatedChanges.add(getRootChangeRefPrefix(ref));
+                    }
+                    return false;
+                  })
+              .collect(Collectors.toList());
 
-    return filteredRefUpdates.stream()
-        .filter(
-            refUpdate -> {
-              if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
-                repLog.warn(
-                    "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
-                    refUpdate);
-                return false;
-              }
-              return true;
-            })
-        .collect(Collectors.toList());
+      return filteredRefUpdates.stream()
+          .filter(
+              refUpdate -> {
+                if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
+                  repLog.warn(
+                      "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
+                      refUpdate);
+                  return false;
+                }
+                return true;
+              })
+          .collect(Collectors.toList());
+
+    } catch (IOException ioe) {
+      String message = String.format("Error while opening project: '%s'", projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+      return Collections.emptyList();
+    }
   }
 
   private String changePrefix(String changeRef) {
@@ -102,4 +162,18 @@
 
     return changeMetaRef;
   }
+
+  private Optional<ObjectId> getProjectLocalObjectIdVersion(
+      Repository repository, String projectName, String ref) {
+    try {
+      return Optional.ofNullable(repository.exactRef(ref).getObjectId());
+
+    } catch (IOException ioe) {
+      String message =
+          String.format("Error while extracting ref '%s' for project '%s'", ref, projectName);
+      repLog.error(message);
+      logger.atSevere().withCause(ioe).log(message);
+    }
+    return Optional.empty();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
index caf0d69..d6b92e4 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
@@ -18,31 +18,57 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.testing.InMemoryRepositoryManager;
+import com.google.gerrit.testing.InMemoryTestEnvironment;
+import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.SharedRefDatabaseWrapper;
 import com.googlesource.gerrit.plugins.multisite.validation.DisabledSharedRefLogger;
 import com.googlesource.gerrit.plugins.multisite.validation.MultisiteReplicationPushFilter;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.zookeeper.RefFixture;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.junit.LocalDiskRepositoryTestCase;
+import org.eclipse.jgit.junit.TestRepository;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectIdRef;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
-public class MultisiteReplicationPushFilterTest {
+public class MultisiteReplicationPushFilterTest extends LocalDiskRepositoryTestCase
+    implements RefFixture {
+
+  @Rule public InMemoryTestEnvironment testEnvironment = new InMemoryTestEnvironment();
 
   @Mock SharedRefDatabaseWrapper sharedRefDatabaseMock;
 
-  String project = "fooProject";
+  @Inject private InMemoryRepositoryManager gitRepositoryManager;
+
+  String project = A_TEST_PROJECT_NAME;
+
+  private TestRepository<InMemoryRepository> repo;
+
+  @Before
+  public void setUp() throws Exception {
+    InMemoryRepository inMemoryRepo =
+        gitRepositoryManager.createRepository(A_TEST_PROJECT_NAME_KEY);
+    repo = new TestRepository<>(inMemoryRepo);
+  }
 
   @Test
   public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
@@ -51,7 +77,7 @@
     doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(project), any());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabaseMock);
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
@@ -65,13 +91,42 @@
     SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(outdatedRef.getSrcRef());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabase);
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactly(refUpToDate);
   }
 
   @Test
+  public void shouldLoadLocalVersionAndNotFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(project), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(temporaryOutdated);
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
+  public void shouldLoadLocalVersionAndFilter() throws Exception {
+    RemoteRefUpdate temporaryOutdated = refUpdate("refs/heads/temporaryOutdated");
+    repo.branch("refs/heads/temporaryOutdated").commit().create();
+    List<RemoteRefUpdate> refUpdates = Collections.singletonList(temporaryOutdated);
+    doReturn(false).doReturn(false).when(sharedRefDatabaseMock).isUpToDate(eq(project), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock, gitRepositoryManager);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).isEmpty();
+    verify(sharedRefDatabaseMock, times(2)).isUpToDate(any(), any());
+  }
+
+  @Test
   public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
     RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
     RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
@@ -82,7 +137,7 @@
     SharedRefDatabaseWrapper sharedRefDatabase = newSharedRefDatabase(changeMetaRef.getSrcRef());
 
     MultisiteReplicationPushFilter pushFilter =
-        new MultisiteReplicationPushFilter(sharedRefDatabase);
+        new MultisiteReplicationPushFilter(sharedRefDatabase, gitRepositoryManager);
     List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
 
     assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
@@ -124,9 +179,10 @@
         new DisabledSharedRefLogger());
   }
 
-  private RemoteRefUpdate refUpdate(String refName) throws IOException {
+  private RemoteRefUpdate refUpdate(String refName) throws Exception {
     ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
     Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+    repo.branch(refName).commit().create();
     return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
   }
 }