Merge branch 'stable-3.9'

* stable-3.9:
  Fix ApplyObjectActionIT flakiness due to the wrong ref used
  Force async fetch as a fallback for sync replication
  Index change asynchronously upon refs replicated event
  Address follow-up comments to Change 396868
  Introduce wait for replication events to reduce flaky tests
  Fail apply-object on change /meta when missing patch-set

Change-Id: Ib0783cb2b0863f7327b415ee9917956e1cc5f1a1
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index ac17fac..baeb330 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Queues;
@@ -42,6 +44,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpResultUtils;
 import com.googlesource.gerrit.plugins.replication.pull.filter.ApplyObjectsRefsFilter;
@@ -57,7 +60,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -304,7 +306,7 @@
               Project.nameKey(event.projectName()), event.refs(), event.eventCreatedOn(), state);
       fetchCallsPool
           .submit(() -> allSources.parallelStream().forEach(callFunction))
-          .get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
+          .get(fetchCallsTimeout, MILLISECONDS);
     } catch (InterruptedException | ExecutionException | TimeoutException e) {
       stateLog.error(
           String.format(
@@ -346,7 +348,7 @@
         if (source.enableBatchedRefs()) {
           callBatchFetch(source, project, refs, state);
         } else {
-          callFetch(source, project, refs, state);
+          callFetch(source, project, refs, state, FetchRestApiClient.FORCE_ASYNC);
         }
       }
     };
@@ -673,7 +675,8 @@
       Source source,
       Project.NameKey project,
       List<ReferenceUpdatedEvent> refs,
-      ReplicationState state) {
+      ReplicationState state,
+      boolean forceAsyncCall) {
     boolean resultIsSuccessful = true;
     for (ReferenceUpdatedEvent refEvent : refs) {
       String refName = refEvent.refName();
@@ -685,7 +688,14 @@
             repLog.info(
                 "Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName);
             long startTime = System.currentTimeMillis();
-            Optional<HttpResult> result = Optional.of(fetchClient.callFetch(project, refName, uri));
+            Optional<HttpResult> result =
+                Optional.of(
+                    fetchClient.callFetch(
+                        project,
+                        refName,
+                        uri,
+                        MILLISECONDS.toNanos(System.currentTimeMillis()),
+                        forceAsyncCall));
             long endTime = System.currentTimeMillis();
             boolean resultSuccessful = HttpResultUtils.isSuccessful(result);
             repLog.info(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
index 9911070..01d32fb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
@@ -19,6 +19,7 @@
 import com.google.common.base.Strings;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.PreconditionFailedException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestApiException;
@@ -28,6 +29,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingLatestPatchSetException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import java.io.IOException;
@@ -132,6 +134,15 @@
           input.getRevisionData(),
           e);
       throw new UnprocessableEntityException(e.getMessage());
+    } catch (MissingLatestPatchSetException e) {
+      repLog.error(
+          "Apply object API *FAILED* from {} for {}:{} - {}",
+          input.getLabel(),
+          resource.getNameKey(),
+          input.getRefName(),
+          input.getRevisionData(),
+          e);
+      throw new PreconditionFailedException(e.getMessage());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
index 75de6c8..157fac3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -40,6 +40,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingLatestPatchSetException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
@@ -91,7 +92,7 @@
       String sourceLabel,
       long eventCreatedOn)
       throws IOException, RefUpdateException, MissingParentObjectException,
-          ResourceNotFoundException {
+          ResourceNotFoundException, MissingLatestPatchSetException {
     applyObjects(name, refName, new RevisionData[] {revisionsData}, sourceLabel, eventCreatedOn);
   }
 
@@ -102,7 +103,7 @@
       String sourceLabel,
       long eventCreatedOn)
       throws IOException, RefUpdateException, MissingParentObjectException,
-          ResourceNotFoundException {
+          ResourceNotFoundException, MissingLatestPatchSetException {
 
     repLog.info(
         "Apply object from {} for {}:{} - {}",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
index ffd5bfc..1088e40 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
@@ -19,6 +19,7 @@
 import com.google.common.base.Strings;
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.PreconditionFailedException;
 import com.google.gerrit.extensions.restapi.ResourceConflictException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestApiException;
@@ -27,6 +28,7 @@
 import com.google.gerrit.server.project.ProjectResource;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionsInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingLatestPatchSetException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import java.io.IOException;
@@ -130,6 +132,15 @@
           Arrays.toString(input.getRevisionsData()),
           e);
       throw new UnprocessableEntityException(e.getMessage());
+    } catch (MissingLatestPatchSetException e) {
+      repLog.error(
+          "Apply object API *FAILED* from {} for {}:{} - {}",
+          input.getLabel(),
+          resource.getNameKey(),
+          input.getRefName(),
+          Arrays.toString(input.getRevisionsData()),
+          e);
+      throw new PreconditionFailedException(e.getMessage());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
index 155b6a1..a69afa4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ProjectInitializationAction.java
@@ -46,6 +46,7 @@
 import com.googlesource.gerrit.plugins.replication.LocalFS;
 import com.googlesource.gerrit.plugins.replication.pull.GerritConfigOps;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionsInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingLatestPatchSetException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import com.googlesource.gerrit.plugins.replication.pull.api.util.PayloadSerDes;
@@ -166,12 +167,23 @@
     }
 
     String projectName = gitRepositoryName.replace(".git", "");
-    applyObjectCommand.applyObjects(
-        Project.nameKey(projectName),
-        input.getRefName(),
-        input.getRevisionsData(),
-        input.getLabel(),
-        input.getEventCreatedOn());
+    try {
+      applyObjectCommand.applyObjects(
+          Project.nameKey(projectName),
+          input.getRefName(),
+          input.getRevisionsData(),
+          input.getLabel(),
+          input.getEventCreatedOn());
+    } catch (MissingLatestPatchSetException e) {
+      repLog.error(
+          "Init project API FAILED from {} for {} - configuration data cannot contain change meta refs: {}:{}",
+          input.getLabel(),
+          projectName,
+          input.getRefName(),
+          Arrays.toString(input.getRevisionsData()),
+          e);
+      throw new BadRequestException("Configuration data cannot contain change meta refs", e);
+    }
     projectCache.onCreateProject(Project.nameKey(projectName));
     // In case pull-replication is used in conjunction with multi-site, by convention the remote
     // label is populated with the instanceId. That's why we are passing input.getLabel()
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/MissingLatestPatchSetException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/MissingLatestPatchSetException.java
new file mode 100644
index 0000000..458c74a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/MissingLatestPatchSetException.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.exception;
+
+import com.google.gerrit.entities.Project;
+
+public class MissingLatestPatchSetException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public MissingLatestPatchSetException(
+      Project.NameKey project, String refName, String errorMessage) {
+    super(String.format("%s for project %s ref name: %s", errorMessage, project.get(), refName));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index 27e13a7..28c74c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -32,12 +32,17 @@
   }
 
   HttpResult callFetch(
-      Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
+      Project.NameKey project,
+      String refName,
+      URIish targetUri,
+      long startTimeNanos,
+      boolean forceAsyncFetch)
       throws IOException;
 
   default HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
       throws IOException {
-    return callFetch(project, refName, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
+    return callFetch(
+        project, refName, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()), false);
   }
 
   HttpResult callBatchFetch(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index 081661f..8531b0a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -54,6 +54,7 @@
 import org.apache.http.ParseException;
 import org.apache.http.auth.AuthenticationException;
 import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpPost;
@@ -67,6 +68,8 @@
 import org.eclipse.jgit.transport.URIish;
 
 public class FetchRestApiClient implements FetchApiClient, ResponseHandler<HttpResult> {
+  public static final boolean FORCE_ASYNC = true;
+
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
 
@@ -111,15 +114,31 @@
     this.urlAuthenticationPrefix = bearerTokenProvider.get().map(br -> "").orElse("a/");
   }
 
-  /* (non-Javadoc)
-   * @see com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient#callFetch(com.google.gerrit.entities.Project.NameKey, java.lang.String, org.eclipse.jgit.transport.URIish)
-   */
   @Override
   public HttpResult callFetch(
-      Project.NameKey project, String refName, URIish targetUri, long startTimeNanos)
+      NameKey project,
+      String refName,
+      URIish targetUri,
+      long startTimeNanos,
+      boolean forceAsyncFetch)
+      throws ClientProtocolException, IOException {
+    return doCallFetch(
+        project,
+        refName,
+        targetUri,
+        startTimeNanos,
+        forceAsyncFetch || !syncRefsFilter.match(refName));
+  }
+
+  private HttpResult doCallFetch(
+      Project.NameKey project,
+      String refName,
+      URIish targetUri,
+      long startTimeNanos,
+      boolean callAsync)
       throws IOException {
     String url = formatUrl(targetUri.toString(), project, "fetch");
-    Boolean callAsync = !syncRefsFilter.match(refName);
+
     HttpPost post = new HttpPost(url);
     post.setEntity(
         new StringEntity(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
index dc90c7f..70b4795 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
@@ -22,6 +22,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.LocalGitRepositoryManagerProvider;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingLatestPatchSetException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import java.io.IOException;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
@@ -46,7 +47,8 @@
   }
 
   public RefUpdateState apply(Project.NameKey name, RefSpec refSpec, RevisionData[] revisionsData)
-      throws MissingParentObjectException, IOException, ResourceNotFoundException {
+      throws MissingParentObjectException, IOException, ResourceNotFoundException,
+          MissingLatestPatchSetException {
     try (Repository git = gitManager.openRepository(name)) {
 
       ObjectId refHead = null;
@@ -64,6 +66,12 @@
                 throw new MissingParentObjectException(name, refSpec.getSource(), parent.getId());
               }
             }
+
+            StringBuffer error = new StringBuffer();
+            if (!ChangeMetaCommitValidator.isValid(
+                git, refSpec.getSource(), commit, error::append)) {
+              throw new MissingLatestPatchSetException(name, refSpec.getSource(), error.toString());
+            }
           }
 
           for (RevisionObjectData rev : revisionData.getBlobs()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ChangeMetaCommitValidator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ChangeMetaCommitValidator.java
new file mode 100644
index 0000000..4e080af
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ChangeMetaCommitValidator.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.RefNames;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.function.Consumer;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.FooterKey;
+import org.eclipse.jgit.revwalk.RevCommit;
+
+class ChangeMetaCommitValidator {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final FooterKey FOOTER_CHANGE_META_PATCH_SET = new FooterKey("Patch-set");
+
+  public static boolean isValid(
+      Repository repo, String refName, RevCommit commit, Consumer<String> errorCallback)
+      throws IOException {
+    if (!refName.startsWith(RefNames.REFS_CHANGES) || !refName.endsWith(RefNames.META_SUFFIX)) {
+      return true;
+    }
+
+    List<String> patchSetFooter = commit.getFooterLines(FOOTER_CHANGE_META_PATCH_SET);
+    OptionalInt latestPatchSet = patchSetFooter.stream().mapToInt(Integer::parseInt).max();
+
+    if (latestPatchSet.isEmpty()) {
+      return true;
+    }
+
+    String patchSetRef = refName.replace(RefNames.META_SUFFIX, "/" + latestPatchSet.getAsInt());
+    Optional<ObjectId> patchSetObjectId =
+        Optional.ofNullable(repo.exactRef(patchSetRef)).map(Ref::getObjectId);
+
+    if (patchSetObjectId.isEmpty()) {
+      errorCallback.accept("Unable to find latest patch-set ref " + patchSetRef);
+      return false;
+    }
+
+    RevCommit patchSetCommit = repo.parseCommit(patchSetObjectId.get());
+    logger.atFine().log(
+        "Change on repository %s ref %s has latest patch-set %d and is successfully resolved to %s with commit %s",
+        repo, refName, latestPatchSet.getAsInt(), patchSetObjectId.get().getName(), patchSetCommit);
+
+    return true;
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
index 43f6ef2..130bb41 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationITAbstract.java
@@ -548,7 +548,8 @@
     assertThatRefReplicatedEventsContainsStatus(ReplicationState.RefFetchResult.SUCCEEDED);
   }
 
-  private void assertThatEventListenerHasReceivedNumEvents(int numExpectedEvents) {
+  private void assertThatEventListenerHasReceivedNumEvents(int numExpectedEvents) throws Exception {
+    waitUntil(() -> eventListener.numEventsReceived() >= numExpectedEvents);
     assertThat(eventListener.numEventsReceived()).isEqualTo(numExpectedEvents);
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 90c9b9c..e3eb65b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -167,14 +167,15 @@
     lenient()
         .when(fetchRestApiClient.callBatchSendObject(any(), any(), anyLong(), any()))
         .thenReturn(batchHttpResult);
-    when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(fetchHttpResult);
+    when(fetchRestApiClient.callFetch(any(), anyString(), any(), anyLong(), anyBoolean()))
+        .thenReturn(fetchHttpResult);
     when(fetchRestApiClient.callBatchFetch(any(), any(), any())).thenReturn(batchFetchHttpResult);
     when(fetchRestApiClient.initProject(any(), any(), anyLong(), any()))
         .thenReturn(successfulHttpResult);
     when(successfulHttpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isSuccessful()).thenReturn(true);
-    when(batchHttpResult.isSuccessful()).thenReturn(true);
-    when(fetchHttpResult.isSuccessful()).thenReturn(true);
+    lenient().when(batchHttpResult.isSuccessful()).thenReturn(true);
+    lenient().when(fetchHttpResult.isSuccessful()).thenReturn(true);
     when(batchFetchHttpResult.isSuccessful()).thenReturn(true);
     when(httpResult.isProjectMissing(any())).thenReturn(false);
     when(batchHttpResult.isProjectMissing(any())).thenReturn(false);
@@ -337,7 +338,7 @@
 
   @Test
   public void shouldFallbackToCallFetchWhenIOException() throws Exception {
-    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
+    BatchRefUpdateEvent event = generateBatchRefUpdateEvent("refs/changes/01/1/meta");
 
     objectUnderTest.start();
 
@@ -397,7 +398,7 @@
   @Test
   public void shouldFallbackToCallBatchFetchWhenParentObjectNotMissingButApplyObjectFails()
       throws Exception {
-    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    BatchRefUpdateEvent event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     objectUnderTest.start();
 
     when(batchHttpResult.isSuccessful()).thenReturn(false);
@@ -467,7 +468,7 @@
 
   @Test
   public void shouldCallFetchIfBatchedRefsNotEnabledAtSource() throws Exception {
-    Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
+    BatchRefUpdateEvent event = generateBatchRefUpdateEvent("refs/changes/01/1/1");
     when(source.enableBatchedRefs()).thenReturn(false);
     when(httpResult.isSuccessful()).thenReturn(false);
     when(httpResult.isParentObjectMissing()).thenReturn(false);
@@ -476,7 +477,7 @@
     objectUnderTest.onEvent(event);
 
     verify(fetchRestApiClient, never()).callBatchFetch(any(), any(), any());
-    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+    verifyFallbackToRestApiClientFetchAsync(event);
   }
 
   @Test
@@ -675,4 +676,15 @@
       return projectName;
     }
   }
+
+  private void verifyFallbackToRestApiClientFetchAsync(BatchRefUpdateEvent event)
+      throws IOException {
+    verify(fetchRestApiClient)
+        .callFetch(
+            eq(event.getProjectNameKey()),
+            eq(event.getRefNames().get(0)),
+            any(URIish.class),
+            any(Long.class),
+            eq(FetchRestApiClient.FORCE_ASYNC));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
index 4a3fa55..f8110d1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ActionITBase.java
@@ -197,7 +197,7 @@
     return httpRequest;
   }
 
-  private Project.NameKey createTestProject(String name) throws Exception {
+  protected Project.NameKey createTestProject(String name) throws Exception {
     return projectOperations.newProject().name(name).parent(project).create();
   }
 
@@ -229,4 +229,9 @@
     secureConfig.setString("remote", remoteName, "password", password);
     secureConfig.save();
   }
+
+  protected String firstPatchSetForChangeMetaRef(String metaRefName) {
+    String patchSetRefName = metaRefName.replace(RefNames.META_SUFFIX, "/1");
+    return patchSetRefName;
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
index 652daed..e771e7a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
@@ -19,6 +19,8 @@
 import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.extensions.restapi.Url;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import java.util.Optional;
@@ -27,21 +29,25 @@
 
 public class ApplyObjectActionIT extends ActionITBase {
 
+  private static final String REFS_HEADS_MASTER = RefNames.REFS_HEADS + "master";
+
   @Test
   @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldAcceptPayloadWithAsyncField() throws Exception {
+    createTestProjectWithReplicationSuffix();
     String payloadWithAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
             + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
 
-    String refName = createRef();
-    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    Optional<RevisionData> revisionDataOption = createRevisionData(REFS_HEADS_MASTER);
     assertThat(revisionDataOption.isPresent()).isTrue();
 
     RevisionData revisionData = revisionDataOption.get();
-    String sendObjectPayload = createPayload(payloadWithAsyncFieldTemplate, refName, revisionData);
+    String sendObjectPayload =
+        createPayload(payloadWithAsyncFieldTemplate, REFS_HEADS_MASTER, revisionData);
 
+    deleteTestProjectBranch(REFS_HEADS_MASTER);
     httpClientFactory
         .create(source)
         .execute(
@@ -52,19 +58,20 @@
   @Test
   @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldAcceptPayloadWithoutAsyncField() throws Exception {
+    createTestProjectWithReplicationSuffix();
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
             + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
-    String refName = createRef();
-    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    Optional<RevisionData> revisionDataOption = createRevisionData(REFS_HEADS_MASTER);
     assertThat(revisionDataOption.isPresent()).isTrue();
 
     RevisionData revisionData = revisionDataOption.get();
     String sendObjectPayload =
-        createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
+        createPayload(payloadWithoutAsyncFieldTemplate, REFS_HEADS_MASTER, revisionData);
 
+    deleteTestProjectBranch(REFS_HEADS_MASTER);
     httpClientFactory
         .create(source)
         .execute(
@@ -76,19 +83,20 @@
   @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   @GerritConfig(name = "container.replica", value = "true")
   public void shouldAcceptPayloadWhenNodeIsAReplica() throws Exception {
+    createTestProjectWithReplicationSuffix();
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
             + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
-    String refName = createRef();
-    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    Optional<RevisionData> revisionDataOption = createRevisionData(REFS_HEADS_MASTER);
     assertThat(revisionDataOption.isPresent()).isTrue();
 
     RevisionData revisionData = revisionDataOption.get();
     String sendObjectPayload =
-        createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
+        createPayload(payloadWithoutAsyncFieldTemplate, REFS_HEADS_MASTER, revisionData);
 
+    deleteTestProjectBranch(REFS_HEADS_MASTER);
     httpClientFactory
         .create(source)
         .execute(
@@ -219,20 +227,21 @@
   @GerritConfig(name = "container.replica", value = "false")
   @GerritConfig(name = "auth.bearerToken", value = "some-bearer-token")
   public void shouldAcceptPayloadWhenNodeIsAPrimaryWithBearerToken() throws Exception {
+    createTestProjectWithReplicationSuffix();
     url = getURLWithoutAuthenticationPrefix(project.get());
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
             + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
-    String refName = createRef();
-    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    Optional<RevisionData> revisionDataOption = createRevisionData(REFS_HEADS_MASTER);
     assertThat(revisionDataOption.isPresent()).isTrue();
 
     RevisionData revisionData = revisionDataOption.get();
     String sendObjectPayload =
-        createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
+        createPayload(payloadWithoutAsyncFieldTemplate, REFS_HEADS_MASTER, revisionData);
 
+    deleteTestProjectBranch(REFS_HEADS_MASTER);
     httpClientFactory
         .create(source)
         .execute(
@@ -258,4 +267,12 @@
         "%s/a/projects/%s/pull-replication~apply-object",
         adminRestSession.url(), Url.encode(projectName));
   }
+
+  private void createTestProjectWithReplicationSuffix() throws Exception {
+    createTestProject(project.get() + TEST_REPLICATION_SUFFIX);
+  }
+
+  private void deleteTestProjectBranch(String branchRefName) throws RestApiException {
+    gApi.projects().name(project.get()).branch(branchRefName).delete();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java
index c75d32a..b562933 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java
@@ -28,6 +28,7 @@
 import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Patch;
+import com.google.gerrit.entities.PatchSet;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.entities.RefNames;
@@ -42,6 +43,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.RevisionReader;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingLatestPatchSetException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import java.util.List;
 import java.util.Optional;
@@ -75,7 +77,9 @@
     testRepo = cloneProject(createTestProject(testRepoProjectName));
 
     Result pushResult = createChange();
-    String refName = RefNames.changeMetaRef(pushResult.getChange().getId());
+    Change.Id changeId = pushResult.getChange().getId();
+    String refName = RefNames.changeMetaRef(changeId);
+    String patchSetRefName = RefNames.patchSetRef(PatchSet.id(changeId, 1));
 
     RefSpec refSpec = new RefSpec(refName);
     Optional<RevisionData> revisionData;
@@ -83,6 +87,7 @@
 
     try (Repository repo = repoManager.openRepository(testRepoKey)) {
       revisionData = reader.read(testRepoKey, repo.exactRef(refName).getObjectId(), refName, 0);
+      objectUnderTest.apply(project, new RefSpec(patchSetRefName), toArray(revisionData));
       objectUnderTest.apply(project, refSpec, toArray(revisionData));
     }
 
@@ -124,6 +129,7 @@
 
     Result pushResult = createChange();
     Change.Id changeId = pushResult.getChange().getId();
+    String patchSetRefname = RefNames.patchSetRef(PatchSet.id(changeId, 1));
     String refName = RefNames.changeMetaRef(changeId);
     RefSpec refSpec = new RefSpec(refName);
 
@@ -131,6 +137,7 @@
     try (Repository repo = repoManager.openRepository(testRepoKey)) {
       Optional<RevisionData> revisionData =
           reader.read(testRepoKey, repo.exactRef(refName).getObjectId(), refName, 0);
+      objectUnderTest.apply(project, new RefSpec(patchSetRefname), toArray(revisionData));
       objectUnderTest.apply(project, refSpec, toArray(revisionData));
     }
 
@@ -181,6 +188,27 @@
     }
   }
 
+  @Test
+  public void shouldThrowExceptionWhenPatchSetIsMissing() throws Exception {
+    String testRepoProjectName = project + TEST_REPLICATION_SUFFIX;
+    NameKey createTestProject = createTestProject(testRepoProjectName);
+    try (Repository repo = repoManager.openRepository(createTestProject)) {
+      testRepo = cloneProject(createTestProject);
+
+      Result pushResult = createChange();
+      Change.Id changeId = pushResult.getChange().getId();
+      String refName = RefNames.changeMetaRef(changeId);
+
+      Optional<RevisionData> revisionData =
+          reader.read(createTestProject, repo.exactRef(refName).getObjectId(), refName, 0);
+
+      RefSpec refSpec = new RefSpec(refName);
+      assertThrows(
+          MissingLatestPatchSetException.class,
+          () -> objectUnderTest.apply(project, refSpec, toArray(revisionData)));
+    }
+  }
+
   private void compareObjects(RevisionData expected, Optional<RevisionData> actualOption) {
     assertThat(actualOption.isPresent()).isTrue();
     RevisionData actual = actualOption.get();