Stop replication when instance is not aligned with SharedRefDb

Replication should be blocked if an instance contains an older version
of the references when compared with the shared database. This will
stop the out of sync instance from overriding the changes of the instance
that is up to date.

Bug: Issue 11175
Change-Id: Iba80db5269cc9a79cb6f4a05e4200826b8da6c87
diff --git a/BUILD b/BUILD
index ee78d2f..d03d116 100644
--- a/BUILD
+++ b/BUILD
@@ -22,6 +22,7 @@
         "@curator-recipes//jar",
         "@kafka-client//jar",
         "@zookeeper//jar",
+        "//plugins/replication",
     ],
 )
 
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index 348b90e..18eeca4 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -3,6 +3,7 @@
     serverId = 69ec38f0-350e-4d9c-96d4-bc956f2faaac
     canonicalWebUrl = $GERRIT_CANONICAL_WEB_URL
     installModule = com.googlesource.gerrit.plugins.multisite.Module # multi-site needs to be a gerrit lib
+
 [database]
     type = h2
     database = $LOCATION_TEST_SITE/db/ReviewDB
@@ -35,4 +36,4 @@
 [plugins]
     allowRemoteAdmin = true
 [plugin "websession-flatfile"]
-    directory = $FAKE_NFS
\ No newline at end of file
+    directory = $FAKE_NFS
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index 684a22b..c42d465 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -369,6 +369,11 @@
 	# Replicating environment
 	echo "Replicating environment"
 	cp -fR $LOCATION_TEST_SITE_1/* $LOCATION_TEST_SITE_2
+
+	echo "Link replication plugin"
+	ln -s $LOCATION_TEST_SITE_1/plugins/replication.jar $LOCATION_TEST_SITE_1/lib/replication.jar
+	ln -s $LOCATION_TEST_SITE_2/plugins/replication.jar $LOCATION_TEST_SITE_2/lib/replication.jar
+
 fi
 
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
new file mode 100644
index 0000000..6b4f4cc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/MultisiteReplicationPushFilter.java
@@ -0,0 +1,104 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedLockException;
+import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefDatabase;
+import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class MultisiteReplicationPushFilter implements ReplicationPushFilter {
+  private static final String REF_META_SUFFIX = "/meta";
+  static final String REPLICATION_LOG_NAME = "replication_log";
+  static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME);
+
+  private final SharedRefDatabase sharedRefDb;
+
+  @Inject
+  public MultisiteReplicationPushFilter(SharedRefDatabase sharedRefDb) {
+    this.sharedRefDb = sharedRefDb;
+  }
+
+  @Override
+  public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
+    Set<String> outdatedChanges = new HashSet<>();
+
+    List<RemoteRefUpdate> filteredRefUpdates =
+        remoteUpdatesList.stream()
+            .filter(
+                refUpdate -> {
+                  String ref = refUpdate.getSrcRef();
+                  try {
+                    if (sharedRefDb.isUpToDate(
+                        projectName, SharedRefDatabase.newRef(ref, refUpdate.getNewObjectId()))) {
+                      return true;
+                    }
+                    repLog.warn(
+                        "{} is not up-to-date with the shared-refdb and thus will NOT BE replicated",
+                        refUpdate);
+                  } catch (SharedLockException e) {
+                    repLog.warn(
+                        "{} is locked on shared-refdb and thus will NOT BE replicated", refUpdate);
+                  }
+                  if (ref.endsWith(REF_META_SUFFIX)) {
+                    outdatedChanges.add(getRootChangeRefPrefix(ref));
+                  }
+                  return false;
+                })
+            .collect(Collectors.toList());
+
+    return filteredRefUpdates.stream()
+        .filter(
+            refUpdate -> {
+              if (outdatedChanges.contains(changePrefix(refUpdate.getSrcRef()))) {
+                repLog.warn(
+                    "{} belongs to an outdated /meta ref and thus will NOT BE replicated",
+                    refUpdate);
+                return false;
+              }
+              return true;
+            })
+        .collect(Collectors.toList());
+  }
+
+  private String changePrefix(String changeRef) {
+    if (!changeRef.startsWith("refs/changes")) {
+      return changeRef;
+    }
+    if (changeRef.endsWith(REF_META_SUFFIX)) {
+      return getRootChangeRefPrefix(changeRef);
+    }
+
+    // changeRef has the form '/refs/changes/NN/NNNN/P'
+    return changeRef.substring(0, changeRef.lastIndexOf('/'));
+  }
+
+  private String getRootChangeRefPrefix(String changeMetaRef) {
+    if (changeMetaRef.endsWith(REF_META_SUFFIX)) {
+      return changeMetaRef.substring(0, changeMetaRef.length() - REF_META_SUFFIX.length());
+    }
+
+    return changeMetaRef;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
index 9626739..087c4ed 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/ValidationModule.java
@@ -16,6 +16,7 @@
 
 import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.inject.Scopes;
@@ -25,6 +26,8 @@
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.CustomSharedRefEnforcementByProject;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.DefaultSharedRefEnforcement;
 import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.SharedRefEnforcement;
+import com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule;
+import com.googlesource.gerrit.plugins.replication.ReplicationPushFilter;
 
 public class ValidationModule extends FactoryModule {
   private final Configuration cfg;
@@ -37,6 +40,8 @@
 
   @Override
   protected void configure() {
+    install(new ReplicationExtensionPointModule());
+
     bind(SharedRefLogger.class).to(Log4jSharedRefLogger.class);
 
     factory(MultiSiteRepository.Factory.class);
@@ -46,6 +51,9 @@
     factory(RefUpdateValidator.Factory.class);
     factory(BatchRefUpdateValidator.Factory.class);
 
+    DynamicItem.bind(binder(), ReplicationPushFilter.class)
+        .to(MultisiteReplicationPushFilter.class);
+
     if (!disableGitRepositoryValidation) {
       bind(GitRepositoryManager.class).to(MultiSiteGitRepositoryManager.class);
     }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
new file mode 100644
index 0000000..1332dd1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/MultisiteReplicationPushFilterTest.java
@@ -0,0 +1,127 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+
+import com.googlesource.gerrit.plugins.multisite.validation.MultisiteReplicationPushFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MultisiteReplicationPushFilterTest {
+
+  @Mock SharedRefDatabase sharedRefDatabaseMock;
+
+  String project = "fooProject";
+
+  @Test
+  public void shouldReturnAllRefUpdatesWhenAllUpToDate() throws Exception {
+    List<RemoteRefUpdate> refUpdates =
+        Arrays.asList(refUpdate("refs/heads/foo"), refUpdate("refs/heads/bar"));
+    doReturn(true).when(sharedRefDatabaseMock).isUpToDate(eq(project), any());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabaseMock);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactlyElementsIn(refUpdates);
+  }
+
+  @Test
+  public void shouldFilterOutOneOutdatedRef() throws Exception {
+    RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+    RemoteRefUpdate outdatedRef = refUpdate("refs/heads/outdated");
+    List<RemoteRefUpdate> refUpdates = Arrays.asList(refUpToDate, outdatedRef);
+    SharedRefDatabase sharedRefDatabase = newSharedRefDatabase(outdatedRef.getSrcRef());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabase);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(refUpToDate);
+  }
+
+  @Test
+  public void shouldFilterOutAllOutdatedChangesRef() throws Exception {
+    RemoteRefUpdate refUpToDate = refUpdate("refs/heads/uptodate");
+    RemoteRefUpdate refChangeUpToDate = refUpdate("refs/changes/25/1225/2");
+    RemoteRefUpdate changeMetaRef = refUpdate("refs/changes/12/4512/meta");
+    RemoteRefUpdate changeRef = refUpdate("refs/changes/12/4512/1");
+    List<RemoteRefUpdate> refUpdates =
+        Arrays.asList(refUpToDate, refChangeUpToDate, changeMetaRef, changeRef);
+    SharedRefDatabase sharedRefDatabase = newSharedRefDatabase(changeMetaRef.getSrcRef());
+
+    MultisiteReplicationPushFilter pushFilter =
+        new MultisiteReplicationPushFilter(sharedRefDatabase);
+    List<RemoteRefUpdate> filteredRefUpdates = pushFilter.filter(project, refUpdates);
+
+    assertThat(filteredRefUpdates).containsExactly(refUpToDate, refChangeUpToDate);
+  }
+
+  private SharedRefDatabase newSharedRefDatabase(String... rejectedRefs) {
+    Set<String> rejectedSet = new HashSet<>();
+    rejectedSet.addAll(Arrays.asList(rejectedRefs));
+
+    SharedRefDatabase sharedRefDatabase =
+        new SharedRefDatabase() {
+
+          @Override
+          public void removeProject(String project) throws IOException {}
+
+          @Override
+          public AutoCloseable lockRef(String project, String refName) throws SharedLockException {
+            return null;
+          }
+
+          @Override
+          public boolean isUpToDate(String project, Ref ref) throws SharedLockException {
+            return !rejectedSet.contains(ref.getName());
+          }
+
+          @Override
+          public boolean exists(String project, String refName) {
+            return true;
+          }
+
+          @Override
+          public boolean compareAndPut(String project, Ref currRef, ObjectId newRefValue)
+              throws IOException {
+            return false;
+          }
+        };
+    return sharedRefDatabase;
+  }
+
+  private RemoteRefUpdate refUpdate(String refName) throws IOException {
+    ObjectId srcObjId = ObjectId.fromString("0000000000000000000000000000000000000001");
+    Ref srcRef = new ObjectIdRef.Unpeeled(Ref.Storage.NEW, refName, srcObjId);
+    return new RemoteRefUpdate(null, srcRef, "origin", false, "origin", srcObjId);
+  }
+}