Merge branch 'stable-3.3'

* stable-3.3:
  FetchAction: Fix FloggerFormatString pattern flagged by error prone
  Add replication delay for asynchronous fetch calls
  Trigger indexing only for local ref updates
  Index change only for meta ref propagation
  Allow asynchronous fetch calls
  Comment the LocalDiskRepositoryManager in ApplyObject
  Fix issue with reading maxApiPayloadSize property
  Propagate FetchRefReplicatedEvent
  Send small refs as a fetch call payload
  Send ref content as a fetch call payload
  Index change upon propagation
  Propagate FetchRefReplicatedEvent upon a Ref fetch
  Always replicate changes regardless of the permissions

Change-Id: I5c4c575a5af3ed74232fd6942ddaedbc2866dae2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectMetrics.java
new file mode 100644
index 0000000..78b6ddb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectMetrics.java
@@ -0,0 +1,59 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class ApplyObjectMetrics {
+  private final Timer1<String> executionTime;
+
+  @Inject
+  ApplyObjectMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+    Field<String> SOURCE_FIELD =
+        Field.ofString(
+                "source",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+            .build();
+
+    executionTime =
+        metricMaker.newTimer(
+            "apply_object_latency",
+            new Description("Time spent applying object from remote source.")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SOURCE_FIELD);
+  }
+
+  /**
+   * Start the replication latency timer from a source.
+   *
+   * @param name the source name.
+   * @return the timer context.
+   */
+  public Timer1.Context<String> start(String name) {
+    return executionTime.start(name);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Context.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Context.java
new file mode 100644
index 0000000..34bf3d2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Context.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+/**
+ * Allows to tag event as local to avoid consuming remote events.
+ *
+ * <p>TODO: Gerrit v3.1 doesn't have concept of the instanceId so ThreadLocal must be used. From
+ * Gerrit v3.2 replace ThreadLocal with instanceId.
+ */
+public class Context {
+  private static final ThreadLocal<Boolean> localEvent = ThreadLocal.withInitial(() -> false);
+
+  private Context() {}
+
+  public static Boolean isLocalEvent() {
+    return localEvent.get();
+  }
+
+  public static void setLocalEvent(Boolean b) {
+    localEvent.set(b);
+  }
+
+  public static void unsetLocalEvent() {
+    localEvent.remove();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
index baffff7..bcd86d7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
@@ -29,7 +29,11 @@
   private final ReplicationStateListener stateLog;
 
   public interface Factory {
-    FetchAll create(String urlMatch, ReplicationFilter filter, ReplicationState state, boolean now);
+    FetchAll create(
+        String urlMatch,
+        ReplicationFilter filter,
+        ReplicationState state,
+        ReplicationType replicationType);
   }
 
   private final WorkQueue workQueue;
@@ -37,7 +41,7 @@
   private final String urlMatch;
   private final ReplicationFilter filter;
   private final ReplicationState state;
-  private final boolean now;
+  private final ReplicationType replicationType;
   private final SourcesCollection sources;
 
   @Inject
@@ -49,7 +53,7 @@
       @Assisted @Nullable String urlMatch,
       @Assisted ReplicationFilter filter,
       @Assisted ReplicationState state,
-      @Assisted boolean now) {
+      @Assisted ReplicationType replicationType) {
     this.workQueue = wq;
     this.projectCache = projectCache;
     this.stateLog = stateLog;
@@ -57,7 +61,7 @@
     this.urlMatch = urlMatch;
     this.filter = filter;
     this.state = state;
-    this.now = now;
+    this.replicationType = replicationType;
   }
 
   Future<?> schedule(long delay, TimeUnit unit) {
@@ -69,7 +73,7 @@
     try {
       for (Project.NameKey nameKey : projectCache.all()) {
         if (filter.matches(nameKey)) {
-          scheduleFullSync(nameKey, urlMatch, state, now);
+          scheduleFullSync(nameKey, urlMatch, state, replicationType);
         }
       }
     } catch (Exception e) {
@@ -79,12 +83,15 @@
   }
 
   private void scheduleFullSync(
-      Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
+      Project.NameKey project,
+      String urlMatch,
+      ReplicationState state,
+      ReplicationType replicationType) {
 
     for (Source cfg : sources.getAll()) {
       if (cfg.wouldFetchProject(project)) {
         for (URIish uri : cfg.getURIs(project, urlMatch)) {
-          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, now);
+          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, replicationType);
         }
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
index eb825b8..cad6dfb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
@@ -42,6 +42,10 @@
     this.refUpdateResult = refUpdateResult;
   }
 
+  public String getStatus() {
+    return status;
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(project, ref, status, refUpdateResult);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
index cffc1f6..ab16318 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
@@ -18,6 +18,7 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.events.RefEvent;
 import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -69,11 +70,15 @@
   }
 
   public static class CommandProcessing extends FetchResultProcessing {
+    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
     private WeakReference<Command> sshCommand;
     private AtomicBoolean hasError = new AtomicBoolean();
+    private final EventDispatcher dispatcher;
 
-    public CommandProcessing(Command sshCommand) {
+    @Inject
+    public CommandProcessing(Command sshCommand, EventDispatcher dispatcher) {
       this.sshCommand = new WeakReference<>(sshCommand);
+      this.dispatcher = dispatcher;
     }
 
     @Override
@@ -112,6 +117,17 @@
         sb.append(")");
       }
       writeStdOut(sb.toString());
+      try {
+        Context.setLocalEvent(true);
+        dispatcher.postEvent(
+            new FetchRefReplicatedEvent(
+                project, ref, resolveNodeName(uri), status, refUpdateResult));
+      } catch (PermissionBackendException e) {
+        logger.atSevere().withCause(e).log(
+            "Cannot post event for ref '%s', project %s", ref, project);
+      } finally {
+        Context.unsetLocalEvent();
+      }
     }
 
     @Override
@@ -187,9 +203,12 @@
 
     private void postEvent(RefEvent event) {
       try {
+        Context.setLocalEvent(true);
         dispatcher.postEvent(event);
       } catch (PermissionBackendException e) {
         logger.atSevere().withCause(e).log("Cannot post event");
+      } finally {
+        Context.unsetLocalEvent();
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
index d8c4a8d..5cf8bb6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+
 import com.google.common.util.concurrent.Atomics;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -65,7 +67,7 @@
               new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get()));
       fetchAllFuture.set(
           fetchAll
-              .create(null, ReplicationFilter.all(), state, false)
+              .create(null, ReplicationFilter.all(), state, ASYNC)
               .schedule(30, TimeUnit.SECONDS));
     }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 9add07c..50ab913 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -44,6 +44,8 @@
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
+import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -65,8 +67,13 @@
   @Override
   protected void configure() {
 
+    bind(RevisionReader.class).in(Scopes.SINGLETON);
+    bind(ApplyObject.class);
+
     install(new PullReplicationApiModule());
 
+    install(new FetchRefReplicatedEventModule());
+
     install(
         new FactoryModuleBuilder()
             .implement(HttpClient.class, SourceHttpClient.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index fc5ed7d..cdba012 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -17,6 +17,7 @@
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Queues;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -26,16 +27,23 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
+import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter;
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
@@ -58,7 +66,8 @@
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
   private FetchRestApiClient.Factory fetchClientFactory;
   private Integer fetchCallsTimeout;
-  private RefsFilter refsFilter;
+  private ExcludedRefsFilter refsFilter;
+  private RevisionReader revisionReader;
 
   @Inject
   ReplicationQueue(
@@ -67,7 +76,8 @@
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
       FetchRestApiClient.Factory fetchClientFactory,
-      RefsFilter refsFilter) {
+      ExcludedRefsFilter refsFilter,
+      RevisionReader revReader) {
     workQueue = wq;
     dispatcher = dis;
     sources = rd;
@@ -75,6 +85,7 @@
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
     this.fetchClientFactory = fetchClientFactory;
     this.refsFilter = refsFilter;
+    this.revisionReader = revReader;
   }
 
   @Override
@@ -141,17 +152,10 @@
     ForkJoinPool fetchCallsPool = null;
     try {
       fetchCallsPool = new ForkJoinPool(sources.get().getAll().size());
+
+      final Consumer<Source> callFunction = callFunction(project, refName, state);
       fetchCallsPool
-          .submit(
-              () ->
-                  sources
-                      .get()
-                      .getAll()
-                      .parallelStream()
-                      .forEach(
-                          source -> {
-                            callFetch(source, project, refName, state);
-                          }))
+          .submit(() -> sources.get().getAll().parallelStream().forEach(callFunction))
           .get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
     } catch (InterruptedException | ExecutionException | TimeoutException e) {
       stateLog.error(
@@ -167,6 +171,74 @@
     }
   }
 
+  private Consumer<Source> callFunction(NameKey project, String refName, ReplicationState state) {
+    CallFunction call = getCallFunction(project, refName, state);
+
+    return (source) -> {
+      try {
+        call.call(source);
+      } catch (MissingParentObjectException e) {
+        callFetch(source, project, refName, state);
+      }
+    };
+  }
+
+  private CallFunction getCallFunction(NameKey project, String refName, ReplicationState state) {
+    try {
+      Optional<RevisionData> revisionData = revisionReader.read(project, refName);
+      if (revisionData.isPresent()) {
+        return ((source) -> callSendObject(source, project, refName, revisionData.get(), state));
+      }
+    } catch (IOException | RefUpdateException e) {
+      stateLog.error(
+          String.format(
+              "Exception during reading ref: %s, project:%s, message: %s",
+              refName, project.get(), e.getMessage()),
+          e,
+          state);
+    }
+
+    return (source) -> callFetch(source, project, refName, state);
+  }
+
+  private void callSendObject(
+      Source source,
+      Project.NameKey project,
+      String refName,
+      RevisionData revision,
+      ReplicationState state)
+      throws MissingParentObjectException {
+    if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
+      for (String apiUrl : source.getApis()) {
+        try {
+          URIish uri = new URIish(apiUrl);
+          FetchRestApiClient fetchClient = fetchClientFactory.create(source);
+
+          HttpResult result = fetchClient.callSendObject(project, refName, revision, uri);
+          if (!result.isSuccessful()) {
+            repLog.warn(
+                String.format(
+                    "Pull replication rest api apply object call failed. Endpoint url: %s, reason:%s",
+                    apiUrl, result.getMessage().orElse("unknown")));
+            if (result.isParentObjectMissing()) {
+              throw new MissingParentObjectException(
+                  project, refName, source.getRemoteConfigName());
+            }
+          }
+        } catch (URISyntaxException e) {
+          stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
+        } catch (IOException e) {
+          stateLog.error(
+              String.format(
+                  "Exception during the pull replication fetch rest api call. Endpoint url:%s, message:%s",
+                  apiUrl, e.getMessage()),
+              e,
+              state);
+        }
+      }
+    }
+  }
+
   private void callFetch(
       Source source, Project.NameKey project, String refName, ReplicationState state) {
     if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
@@ -176,7 +248,6 @@
           FetchRestApiClient fetchClient = fetchClientFactory.create(source);
 
           HttpResult result = fetchClient.callFetch(project, refName, uri);
-
           if (!result.isSuccessful()) {
             stateLog.warn(
                 String.format(
@@ -185,7 +256,7 @@
                 state);
           }
         } catch (URISyntaxException e) {
-          stateLog.warn(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
+          stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
         } catch (Exception e) {
           stateLog.error(
               String.format(
@@ -227,4 +298,9 @@
 
     public abstract ObjectId objectId();
   }
+
+  @FunctionalInterface
+  private interface CallFunction {
+    void call(Source source) throws MissingParentObjectException;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationType.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationType.java
new file mode 100644
index 0000000..308a90b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationType.java
@@ -0,0 +1,20 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+public enum ReplicationType {
+  SYNC,
+  ASYNC
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RevisionReader.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RevisionReader.java
new file mode 100644
index 0000000..5718249
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RevisionReader.java
@@ -0,0 +1,163 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.google.common.collect.Lists;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.diff.DiffEntry;
+import org.eclipse.jgit.diff.DiffEntry.ChangeType;
+import org.eclipse.jgit.errors.CorruptObjectException;
+import org.eclipse.jgit.errors.IncorrectObjectTypeException;
+import org.eclipse.jgit.errors.LargeObjectException;
+import org.eclipse.jgit.errors.MissingObjectException;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectLoader;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.eclipse.jgit.treewalk.filter.TreeFilter;
+
+public class RevisionReader {
+  private static final String CONFIG_MAX_API_PAYLOAD_SIZE = "maxApiPayloadSize";
+  private static final Long DEFAULT_MAX_PAYLOAD_SIZE_IN_BYTES = 10000L;
+  private GitRepositoryManager gitRepositoryManager;
+  private Long maxRefSize;
+
+  @Inject
+  public RevisionReader(GitRepositoryManager gitRepositoryManager, ReplicationConfig cfg) {
+    this.gitRepositoryManager = gitRepositoryManager;
+    this.maxRefSize =
+        cfg.getConfig()
+            .getLong("replication", CONFIG_MAX_API_PAYLOAD_SIZE, DEFAULT_MAX_PAYLOAD_SIZE_IN_BYTES);
+  }
+
+  public Optional<RevisionData> read(Project.NameKey project, String refName)
+      throws MissingObjectException, IncorrectObjectTypeException, CorruptObjectException,
+          RepositoryNotFoundException, RefUpdateException, IOException {
+    try (Repository git = gitRepositoryManager.openRepository(project)) {
+      Ref head = git.exactRef(refName);
+      if (head == null) {
+        throw new RefUpdateException(
+            String.format("Cannot find ref %s in project %s", refName, project.get()));
+      }
+
+      Long totalRefSize = 0l;
+
+      ObjectLoader commitLoader = git.open(head.getObjectId());
+      totalRefSize += commitLoader.getSize();
+      verifySize(totalRefSize, commitLoader);
+
+      RevCommit commit = RevCommit.parse(commitLoader.getCachedBytes());
+      RevisionObjectData commitRev =
+          new RevisionObjectData(commit.getType(), commitLoader.getCachedBytes());
+
+      RevTree tree = commit.getTree();
+      ObjectLoader treeLoader = git.open(commit.getTree().toObjectId());
+      totalRefSize += treeLoader.getSize();
+      verifySize(totalRefSize, treeLoader);
+
+      RevisionObjectData treeRev =
+          new RevisionObjectData(tree.getType(), treeLoader.getCachedBytes());
+
+      List<RevisionObjectData> blobs = Lists.newLinkedList();
+      try (TreeWalk walk = new TreeWalk(git)) {
+        if (commit.getParentCount() > 0) {
+          List<DiffEntry> diffEntries = readDiffs(git, commit, tree, walk);
+          blobs = readBlobs(git, totalRefSize, diffEntries);
+        } else {
+          walk.setRecursive(true);
+          walk.addTree(tree);
+          blobs = readBlobs(git, totalRefSize, walk);
+        }
+      }
+      return Optional.of(new RevisionData(commitRev, treeRev, blobs));
+    } catch (LargeObjectException e) {
+      repLog.trace(
+          "Ref %s size for project %s is greater than configured '%s'",
+          refName, project, CONFIG_MAX_API_PAYLOAD_SIZE);
+      return Optional.empty();
+    }
+  }
+
+  private List<DiffEntry> readDiffs(Repository git, RevCommit commit, RevTree tree, TreeWalk walk)
+      throws MissingObjectException, IncorrectObjectTypeException, CorruptObjectException,
+          IOException {
+    walk.setFilter(TreeFilter.ANY_DIFF);
+    walk.reset(getParentTree(git, commit), tree);
+    return DiffEntry.scan(walk, true);
+  }
+
+  private List<RevisionObjectData> readBlobs(Repository git, Long totalRefSize, TreeWalk walk)
+      throws MissingObjectException, IncorrectObjectTypeException, CorruptObjectException,
+          IOException {
+    List<RevisionObjectData> blobs = Lists.newLinkedList();
+    while (walk.next()) {
+      ObjectId objectId = walk.getObjectId(0);
+      ObjectLoader objectLoader = git.open(objectId);
+      totalRefSize += objectLoader.getSize();
+      verifySize(totalRefSize, objectLoader);
+
+      RevisionObjectData rev =
+          new RevisionObjectData(objectLoader.getType(), objectLoader.getCachedBytes());
+      blobs.add(rev);
+    }
+    return blobs;
+  }
+
+  private List<RevisionObjectData> readBlobs(
+      Repository git, Long totalRefSize, List<DiffEntry> diffEntries)
+      throws MissingObjectException, IOException {
+    List<RevisionObjectData> blobs = Lists.newLinkedList();
+    for (DiffEntry diffEntry : diffEntries) {
+      if (!ChangeType.DELETE.equals(diffEntry.getChangeType())) {
+        ObjectLoader objectLoader = git.open(diffEntry.getNewId().toObjectId());
+        totalRefSize += objectLoader.getSize();
+        verifySize(totalRefSize, objectLoader);
+        RevisionObjectData rev =
+            new RevisionObjectData(objectLoader.getType(), objectLoader.getCachedBytes());
+        blobs.add(rev);
+      }
+    }
+    return blobs;
+  }
+
+  private RevTree getParentTree(Repository git, RevCommit commit)
+      throws MissingObjectException, IOException {
+    RevCommit parent = commit.getParent(0);
+    ObjectLoader parentLoader = git.open(parent.getId());
+    RevCommit parentCommit = RevCommit.parse(parentLoader.getCachedBytes());
+    return parentCommit.getTree();
+  }
+
+  private void verifySize(Long totalRefSize, ObjectLoader loader) throws LargeObjectException {
+    if (loader.isLarge() || totalRefSize > maxRefSize) {
+      throw new LargeObjectException();
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 565caee..2d8bce4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -16,6 +16,7 @@
 
 import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
 import static com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.resolveNodeName;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
@@ -294,12 +295,25 @@
                   try {
                     projectState = projectCache.get(project);
                   } catch (StorageException e) {
+                    repLog.warn(
+                        "NOT scheduling replication {}:{} because could not open source project",
+                        project,
+                        ref,
+                        e);
                     return false;
                   }
                   if (!projectState.isPresent()) {
+                    repLog.warn(
+                        "NOT scheduling replication {}:{} because project does not exist",
+                        project,
+                        ref);
                     throw new NoSuchProjectException(project);
                   }
                   if (!projectState.get().statePermitsRead()) {
+                    repLog.warn(
+                        "NOT scheduling replication {}:{} because project is not readable",
+                        project,
+                        ref);
                     return false;
                   }
                   if (!shouldReplicate(projectState.get(), userProvider.get())) {
@@ -309,12 +323,18 @@
                     return true;
                   }
                   try {
-                    permissionBackend
-                        .user(userProvider.get())
-                        .project(project)
-                        .ref(ref)
-                        .check(RefPermission.READ);
+                    if (!ref.startsWith(RefNames.REFS_CHANGES)) {
+                      permissionBackend
+                          .user(userProvider.get())
+                          .project(project)
+                          .ref(ref)
+                          .check(RefPermission.READ);
+                    }
                   } catch (AuthException e) {
+                    repLog.warn(
+                        "NOT scheduling replication {}:{} because lack of permissions to access project/ref",
+                        project,
+                        ref);
                     return false;
                   }
                   return true;
@@ -327,6 +347,7 @@
       Throwables.throwIfUnchecked(e);
       throw new RuntimeException(e);
     }
+    repLog.warn("NOT scheduling replication {}:{}", project, ref);
     return false;
   }
 
@@ -360,13 +381,20 @@
   }
 
   public Future<?> schedule(
-      Project.NameKey project, String ref, ReplicationState state, boolean now) {
+      Project.NameKey project,
+      String ref,
+      ReplicationState state,
+      ReplicationType replicationType) {
     URIish uri = getURI(project);
-    return schedule(project, ref, uri, state, now);
+    return schedule(project, ref, uri, state, replicationType);
   }
 
   public Future<?> schedule(
-      Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
+      Project.NameKey project,
+      String ref,
+      URIish uri,
+      ReplicationState state,
+      ReplicationType replicationType) {
 
     repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
     if (!shouldReplicate(project, ref, state)) {
@@ -406,7 +434,7 @@
         addRef(e, ref);
         e.addState(ref, state);
         pending.put(uri, e);
-        f = pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        f = pool.schedule(e, isSyncCall(replicationType) ? 0 : config.getDelay(), TimeUnit.SECONDS);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
         e.addState(ref, state);
@@ -429,6 +457,10 @@
     postReplicationScheduledEvent(e, ref);
   }
 
+  private boolean isSyncCall(ReplicationType replicationType) {
+    return SYNC.equals(replicationType);
+  }
+
   /**
    * It schedules again a FetchOp instance.
    *
@@ -730,15 +762,20 @@
   private void postReplicationFailedEvent(FetchOne fetchOp, RefUpdate.Result result) {
     Project.NameKey project = fetchOp.getProjectNameKey();
     String sourceNode = resolveNodeName(fetchOp.getURI());
-    for (String ref : fetchOp.getRefs()) {
-      FetchRefReplicatedEvent event =
-          new FetchRefReplicatedEvent(
-              project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
-      try {
-        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
-      } catch (PermissionBackendException e) {
-        repLog.error("error posting event", e);
+    try {
+      Context.setLocalEvent(true);
+      for (String ref : fetchOp.getRefs()) {
+        FetchRefReplicatedEvent event =
+            new FetchRefReplicatedEvent(
+                project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
+        try {
+          eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
+        } catch (PermissionBackendException e) {
+          repLog.error("error posting event", e);
+        }
       }
+    } finally {
+      Context.unsetLocalEvent();
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index e50461d..8046d49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -23,7 +23,7 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 
 public class SourceConfiguration implements RemoteConfiguration {
-  static final int DEFAULT_REPLICATION_DELAY = 0;
+  static final int DEFAULT_REPLICATION_DELAY = 4;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
   static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
index a3497e6..97f8e9e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
@@ -14,7 +14,12 @@
 
 package com.googlesource.gerrit.plugins.replication.pull;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
+
 import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.sshd.CommandMetaData;
 import com.google.gerrit.sshd.SshCommand;
 import com.google.inject.Inject;
@@ -54,6 +59,8 @@
 
   @Inject private ReplicationState.Factory fetchReplicationStateFactory;
 
+  @Inject private DynamicItem<EventDispatcher> eventDispatcher;
+
   @Override
   protected void run() throws Failure {
     if (all && projectPatterns.size() > 0) {
@@ -61,7 +68,8 @@
     }
 
     ReplicationState state =
-        fetchReplicationStateFactory.create(new FetchResultProcessing.CommandProcessing(this));
+        fetchReplicationStateFactory.create(
+            new FetchResultProcessing.CommandProcessing(this, eventDispatcher.get()));
     Future<?> future = null;
 
     ReplicationFilter projectFilter;
@@ -72,7 +80,10 @@
       projectFilter = new ReplicationFilter(projectPatterns);
     }
 
-    future = fetchFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
+    future =
+        fetchFactory
+            .create(urlMatch, projectFilter, state, replicationType(now))
+            .schedule(0, TimeUnit.SECONDS);
 
     if (wait) {
       if (future != null) {
@@ -100,6 +111,10 @@
     }
   }
 
+  private ReplicationType replicationType(Boolean now) {
+    return now ? SYNC : ASYNC;
+  }
+
   @Override
   public void writeStdOutSync(String message) {
     if (wait) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
new file mode 100644
index 0000000..f029834
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
@@ -0,0 +1,86 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.ResourceConflictException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
+import com.google.gerrit.server.project.ProjectResource;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
+import java.io.IOException;
+import java.util.Objects;
+
+public class ApplyObjectAction implements RestModifyView<ProjectResource, RevisionInput> {
+
+  private final ApplyObjectCommand command;
+  private final FetchPreconditions preConditions;
+
+  @Inject
+  public ApplyObjectAction(ApplyObjectCommand command, FetchPreconditions preConditions) {
+    this.command = command;
+    this.preConditions = preConditions;
+  }
+
+  @Override
+  public Response<?> apply(ProjectResource resource, RevisionInput input) throws RestApiException {
+
+    if (!preConditions.canCallFetchApi()) {
+      throw new AuthException("not allowed to call fetch command");
+    }
+    try {
+      if (Strings.isNullOrEmpty(input.getLabel())) {
+        throw new BadRequestException("Source label cannot be null or empty");
+      }
+      if (Strings.isNullOrEmpty(input.getRefName())) {
+        throw new BadRequestException("Ref-update refname cannot be null or empty");
+      }
+
+      if (Objects.isNull(input.getRevisionData())) {
+        throw new BadRequestException("Ref-update revision data cannot be null or empty");
+      }
+
+      if (Objects.isNull(input.getRevisionData().getCommitObject())
+          || Objects.isNull(input.getRevisionData().getCommitObject().getContent())
+          || input.getRevisionData().getCommitObject().getContent().length == 0
+          || Objects.isNull(input.getRevisionData().getCommitObject().getType())) {
+        throw new BadRequestException("Ref-update commit object cannot be null or empty");
+      }
+
+      if (Objects.isNull(input.getRevisionData().getTreeObject())
+          || Objects.isNull(input.getRevisionData().getTreeObject().getContent())
+          || Objects.isNull(input.getRevisionData().getTreeObject().getType())) {
+        throw new BadRequestException("Ref-update tree object cannot be null");
+      }
+
+      command.applyObject(
+          resource.getNameKey(), input.getRefName(), input.getRevisionData(), input.getLabel());
+      return Response.created(input);
+    } catch (MissingParentObjectException e) {
+      throw new ResourceConflictException(e.getMessage(), e);
+    } catch (NumberFormatException | IOException e) {
+      throw RestApiException.wrap(e.getMessage(), e);
+    } catch (RefUpdateException e) {
+      throw new UnprocessableEntityException(e.getMessage());
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
new file mode 100644
index 0000000..276aa16
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -0,0 +1,124 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
+import java.io.IOException;
+import java.util.Set;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.RefSpec;
+
+public class ApplyObjectCommand {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
+      ImmutableSet.of(
+          RefUpdate.Result.NEW,
+          RefUpdate.Result.FORCED,
+          RefUpdate.Result.NO_CHANGE,
+          RefUpdate.Result.FAST_FORWARD);
+
+  private final PullReplicationStateLogger fetchStateLog;
+  private final ApplyObject applyObject;
+  private final ApplyObjectMetrics metrics;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
+
+  @Inject
+  public ApplyObjectCommand(
+      PullReplicationStateLogger fetchStateLog,
+      ApplyObject applyObject,
+      ApplyObjectMetrics metrics,
+      DynamicItem<EventDispatcher> eventDispatcher) {
+    this.fetchStateLog = fetchStateLog;
+    this.applyObject = applyObject;
+    this.metrics = metrics;
+    this.eventDispatcher = eventDispatcher;
+  }
+
+  public void applyObject(
+      Project.NameKey name, String refName, RevisionData revisionData, String sourceLabel)
+      throws IOException, RefUpdateException, MissingParentObjectException {
+
+    repLog.info("Apply object from {} for project {}, ref name {}", sourceLabel, name, refName);
+    Timer1.Context<String> context = metrics.start(sourceLabel);
+    RefUpdateState refUpdateState = applyObject.apply(name, new RefSpec(refName), revisionData);
+    long elapsed = NANOSECONDS.toMillis(context.stop());
+
+    try {
+      Context.setLocalEvent(true);
+      eventDispatcher
+          .get()
+          .postEvent(
+              new FetchRefReplicatedEvent(
+                  name.get(),
+                  refName,
+                  sourceLabel,
+                  getStatus(refUpdateState),
+                  refUpdateState.getResult()));
+    } catch (PermissionBackendException e) {
+      logger.atSevere().withCause(e).log(
+          "Cannot post event for ref '%s', project %s", refName, name);
+    } finally {
+      Context.unsetLocalEvent();
+    }
+
+    if (!isSuccessful(refUpdateState.getResult())) {
+      String message =
+          String.format(
+              "RefUpdate failed with result %s for: sourceLcabel=%s, project=%s, refName=%s",
+              refUpdateState.getResult().name(), sourceLabel, name, refName);
+      fetchStateLog.error(message);
+      throw new RefUpdateException(message);
+    }
+    repLog.info(
+        "Apply object from {} for project {}, ref name {} completed in {}ms",
+        sourceLabel,
+        name,
+        refName,
+        elapsed);
+  }
+
+  private RefFetchResult getStatus(RefUpdateState refUpdateState) {
+    return isSuccessful(refUpdateState.getResult())
+        ? ReplicationState.RefFetchResult.SUCCEEDED
+        : ReplicationState.RefFetchResult.FAILED;
+  }
+
+  private Boolean isSuccessful(RefUpdate.Result result) {
+    return SUCCESSFUL_RESULTS.contains(result);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index c44bcbc..2767f1d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -93,7 +93,7 @@
   private Response<?> applySync(Project.NameKey project, Input input)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    command.fetch(project, input.label, input.refName);
+    command.fetchSync(project, input.label, input.refName);
     return Response.created(input);
   }
 
@@ -127,16 +127,14 @@
     @Override
     public void run() {
       try {
-        command.fetch(project, input.label, input.refName);
+        command.fetchAsync(project, input.label, input.refName);
       } catch (InterruptedException
           | ExecutionException
           | RemoteConfigurationMissingException
           | TimeoutException e) {
         log.atSevere().withCause(e).log(
-            "Exception during the async fetch call for project {}, label {} and ref name {}",
-            project.get(),
-            input.label,
-            input.refName);
+            "Exception during the async fetch call for project %s, label %s and ref name %s",
+            project.get(), input.label, input.refName);
       }
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index 7470c2c..e1ac9ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -14,12 +14,18 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
+
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.replication.pull.Command;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationType;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
@@ -34,22 +40,38 @@
   private ReplicationState.Factory fetchReplicationStateFactory;
   private PullReplicationStateLogger fetchStateLog;
   private SourcesCollection sources;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
 
   @Inject
   public FetchCommand(
       ReplicationState.Factory fetchReplicationStateFactory,
       PullReplicationStateLogger fetchStateLog,
-      SourcesCollection sources) {
+      SourcesCollection sources,
+      DynamicItem<EventDispatcher> eventDispatcher) {
     this.fetchReplicationStateFactory = fetchReplicationStateFactory;
     this.fetchStateLog = fetchStateLog;
     this.sources = sources;
+    this.eventDispatcher = eventDispatcher;
   }
 
-  public void fetch(Project.NameKey name, String label, String refName)
+  public void fetchAsync(Project.NameKey name, String label, String refName)
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    fetch(name, label, refName, ASYNC);
+  }
+
+  public void fetchSync(Project.NameKey name, String label, String refName)
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    fetch(name, label, refName, SYNC);
+  }
+
+  private void fetch(Project.NameKey name, String label, String refName, ReplicationType fetchType)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
     ReplicationState state =
-        fetchReplicationStateFactory.create(new FetchResultProcessing.CommandProcessing(this));
+        fetchReplicationStateFactory.create(
+            new FetchResultProcessing.CommandProcessing(this, eventDispatcher.get()));
     Optional<Source> source =
         sources.getAll().stream().filter(s -> s.getRemoteConfigName().equals(label)).findFirst();
     if (!source.isPresent()) {
@@ -60,7 +82,7 @@
 
     try {
       state.markAllFetchTasksScheduled();
-      Future<?> future = source.get().schedule(name, refName, state, true);
+      Future<?> future = source.get().schedule(name, refName, state, fetchType);
       future.get(source.get().getTimeout(), TimeUnit.SECONDS);
     } catch (ExecutionException
         | IllegalStateException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
index 1663ad2..f94f49b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
@@ -26,7 +26,9 @@
   @Override
   protected void configure() {
     bind(FetchAction.class).in(Scopes.SINGLETON);
+    bind(ApplyObjectAction.class).in(Scopes.SINGLETON);
     post(PROJECT_KIND, "fetch").to(FetchAction.class);
+    post(PROJECT_KIND, "apply-object").to(ApplyObjectAction.class);
 
     bind(FetchPreconditions.class).in(Scopes.SINGLETON);
     bind(CapabilityDefinition.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionData.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionData.java
new file mode 100644
index 0000000..bcd4e05
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionData.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.data;
+
+import java.util.List;
+
+public class RevisionData {
+  private RevisionObjectData commitObject;
+
+  private RevisionObjectData treeObject;
+
+  private List<RevisionObjectData> blobs;
+
+  public RevisionData(
+      RevisionObjectData commitObject,
+      RevisionObjectData treeObject,
+      List<RevisionObjectData> blobs) {
+    this.commitObject = commitObject;
+    this.treeObject = treeObject;
+    this.blobs = blobs;
+  }
+
+  public RevisionObjectData getCommitObject() {
+    return commitObject;
+  }
+
+  public RevisionObjectData getTreeObject() {
+    return treeObject;
+  }
+
+  public List<RevisionObjectData> getBlobs() {
+    return blobs;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionInput.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionInput.java
new file mode 100644
index 0000000..bc3e218
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionInput.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.data;
+
+public class RevisionInput {
+  private String label;
+
+  private String refName;
+
+  private RevisionData revisionData;
+
+  public RevisionInput(String label, String refName, RevisionData revisionData) {
+    this.label = label;
+    this.refName = refName;
+    this.revisionData = revisionData;
+  }
+
+  public String getLabel() {
+    return label;
+  }
+
+  public String getRefName() {
+    return refName;
+  }
+
+  public RevisionData getRevisionData() {
+    return revisionData;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionObjectData.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionObjectData.java
new file mode 100644
index 0000000..02ba06c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/RevisionObjectData.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.data;
+
+import java.util.Base64;
+
+public class RevisionObjectData {
+  private final Integer type;
+  private final String content;
+
+  public RevisionObjectData(int type, byte[] content) {
+    this.type = type;
+    this.content = content == null ? "" : Base64.getEncoder().encodeToString(content);
+  }
+
+  public Integer getType() {
+    return type;
+  }
+
+  public byte[] getContent() {
+    return Base64.getDecoder().decode(content);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/MissingParentObjectException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/MissingParentObjectException.java
new file mode 100644
index 0000000..a96c1ea
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/MissingParentObjectException.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.exception;
+
+import com.google.gerrit.entities.Project;
+import org.eclipse.jgit.lib.ObjectId;
+
+public class MissingParentObjectException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public MissingParentObjectException(
+      Project.NameKey project, String refName, ObjectId parentObjectId) {
+    super(
+        String.format(
+            "Missing parent object %s for project %s ref name: %s",
+            parentObjectId.getName(), project.get(), refName));
+  }
+
+  public MissingParentObjectException(Project.NameKey project, String refName, String targetName) {
+    super(
+        String.format(
+            "Missing parent object on %s for project %s ref name: %s",
+            targetName, project.get(), refName));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/RefUpdateException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/RefUpdateException.java
new file mode 100644
index 0000000..f02f9c5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/RefUpdateException.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.exception;
+
+public class RefUpdateException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public RefUpdateException(String msg) {
+    super(msg);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index bdfc1c3..7b876df 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -14,17 +14,25 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.client;
 
+import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Strings;
 import com.google.common.flogger.FluentLogger;
+import com.google.common.net.MediaType;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.extensions.restapi.Url;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
+import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
@@ -48,6 +56,9 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
 
+  private static final Gson GSON =
+      new GsonBuilder().setFieldNamingPolicy(LOWER_CASE_WITH_UNDERSCORES).create();
+
   public interface Factory {
     FetchRestApiClient create(Source source);
   }
@@ -56,16 +67,22 @@
   private final SourceHttpClient.Factory httpClientFactory;
   private final Source source;
   private final String instanceLabel;
+  private final String pluginName;
+  private final SyncRefsFilter syncRefsFilter;
 
   @Inject
   FetchRestApiClient(
       CredentialsFactory credentials,
       SourceHttpClient.Factory httpClientFactory,
       ReplicationConfig replicationConfig,
+      SyncRefsFilter syncRefsFilter,
+      @PluginName String pluginName,
       @Assisted Source source) {
     this.credentials = credentials;
     this.httpClientFactory = httpClientFactory;
     this.source = source;
+    this.pluginName = pluginName;
+    this.syncRefsFilter = syncRefsFilter;
     this.instanceLabel =
         Strings.nullToEmpty(
                 replicationConfig.getConfig().getString("replication", null, "instanceLabel"))
@@ -80,16 +97,35 @@
         String.format(
             "%s/a/projects/%s/pull-replication~fetch",
             targetUri.toString(), Url.encode(project.get()));
-
+    Boolean callAsync = !syncRefsFilter.match(refName);
     HttpPost post = new HttpPost(url);
     post.setEntity(
         new StringEntity(
-            String.format("{\"label\":\"%s\", \"ref_name\": \"%s\"}", instanceLabel, refName),
+            String.format(
+                "{\"label\":\"%s\", \"ref_name\": \"%s\", \"async\":%s}",
+                instanceLabel, refName, callAsync),
             StandardCharsets.UTF_8));
     post.addHeader(new BasicHeader("Content-Type", "application/json"));
     return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
   }
 
+  public HttpResult callSendObject(
+      Project.NameKey project, String refName, RevisionData revisionData, URIish targetUri)
+      throws ClientProtocolException, IOException {
+
+    RevisionInput input = new RevisionInput(instanceLabel, refName, revisionData);
+
+    String url =
+        String.format(
+            "%s/a/projects/%s/%s~apply-object",
+            targetUri.toString(), Url.encode(project.get()), pluginName);
+
+    HttpPost post = new HttpPost(url);
+    post.setEntity(new StringEntity(GSON.toJson(input)));
+    post.addHeader(new BasicHeader("Content-Type", MediaType.JSON_UTF_8.toString()));
+    return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
+  }
+
   @Override
   public HttpResult handleResponse(HttpResponse response) {
     Optional<String> responseBody = Optional.empty();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java
index 8a2b66d..dc01295 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.client;
 
+import static javax.servlet.http.HttpServletResponse.SC_CONFLICT;
 import static javax.servlet.http.HttpServletResponse.SC_CREATED;
 import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
 import static javax.servlet.http.HttpServletResponse.SC_OK;
@@ -36,4 +37,8 @@
   public boolean isSuccessful() {
     return responseCode == SC_CREATED || responseCode == SC_NO_CONTENT || responseCode == SC_OK;
   }
+
+  public boolean isParentObjectMissing() {
+    return responseCode == SC_CONFLICT;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
new file mode 100644
index 0000000..a618e16
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandler.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+
+public class FetchRefReplicatedEventHandler implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private ChangeIndexer changeIndexer;
+
+  @Inject
+  FetchRefReplicatedEventHandler(ChangeIndexer changeIndexer) {
+    this.changeIndexer = changeIndexer;
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    if (event instanceof FetchRefReplicatedEvent && isLocalEvent()) {
+      FetchRefReplicatedEvent fetchRefReplicatedEvent = (FetchRefReplicatedEvent) event;
+      if (!RefNames.isNoteDbMetaRef(fetchRefReplicatedEvent.getRefName())
+          || !fetchRefReplicatedEvent
+              .getStatus()
+              .equals(ReplicationState.RefFetchResult.SUCCEEDED.toString())) {
+        return;
+      }
+
+      Project.NameKey projectNameKey = fetchRefReplicatedEvent.getProjectNameKey();
+      logger.atFine().log(
+          "Indexing ref '%s' for project %s",
+          fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
+      Change.Id changeId = Change.Id.fromRef(fetchRefReplicatedEvent.getRefName());
+      if (changeId != null) {
+        changeIndexer.index(projectNameKey, changeId);
+      } else {
+        logger.atWarning().log(
+            "Couldn't get changeId from refName. Skipping indexing of change %s for project %s",
+            fetchRefReplicatedEvent.getRefName(), projectNameKey.get());
+      }
+    }
+  }
+
+  private boolean isLocalEvent() {
+    return Context.isLocalEvent();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventModule.java
new file mode 100644
index 0000000..675563a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventModule.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
+
+public class FetchRefReplicatedEventModule extends LifecycleModule {
+
+  @Override
+  protected void configure() {
+    DynamicSet.bind(binder(), EventListener.class).to(FetchRefReplicatedEventHandler.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
new file mode 100644
index 0000000..03362bf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObject.java
@@ -0,0 +1,75 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.LocalDiskRepositoryManager;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import java.io.IOException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectInserter;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+
+public class ApplyObject {
+
+  private final LocalDiskRepositoryManager gitManager;
+
+  // NOTE: We do need specifically the LocalDiskRepositoryManager to make sure
+  // to be able to write onto the directly physical repository without any wrapper.
+  // Using for instance the multi-site wrapper injected by Guice would result
+  // in a split-brain because of the misalignment of local vs. global refs values.
+  @Inject
+  public ApplyObject(LocalDiskRepositoryManager gitManager) {
+    this.gitManager = gitManager;
+  }
+
+  public RefUpdateState apply(Project.NameKey name, RefSpec refSpec, RevisionData revisionData)
+      throws MissingParentObjectException, IOException {
+    try (Repository git = gitManager.openRepository(name)) {
+
+      ObjectId newObjectID = null;
+      try (ObjectInserter oi = git.newObjectInserter()) {
+        RevisionObjectData commitObject = revisionData.getCommitObject();
+        RevCommit commit = RevCommit.parse(commitObject.getContent());
+        for (RevCommit parent : commit.getParents()) {
+          if (!git.getObjectDatabase().has(parent.getId())) {
+            throw new MissingParentObjectException(name, refSpec.getSource(), parent.getId());
+          }
+        }
+        newObjectID = oi.insert(commitObject.getType(), commitObject.getContent());
+
+        RevisionObjectData treeObject = revisionData.getTreeObject();
+        oi.insert(treeObject.getType(), treeObject.getContent());
+
+        for (RevisionObjectData rev : revisionData.getBlobs()) {
+          oi.insert(rev.getType(), rev.getContent());
+        }
+
+        oi.flush();
+      }
+      RefUpdate ru = git.updateRef(refSpec.getSource());
+      ru.setNewObjectId(newObjectID);
+      RefUpdate.Result result = ru.update();
+
+      return new RefUpdateState(refSpec.getSource(), result);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/ExcludedRefsFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/ExcludedRefsFilter.java
new file mode 100644
index 0000000..f2e4ab3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/ExcludedRefsFilter.java
@@ -0,0 +1,51 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.filter;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.RefNames;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import java.util.List;
+import org.eclipse.jgit.lib.Config;
+
+@Singleton
+public class ExcludedRefsFilter extends RefsFilter {
+  @Inject
+  public ExcludedRefsFilter(ReplicationConfig replicationConfig) {
+    super(replicationConfig);
+  }
+
+  @Override
+  protected List<String> getRefNamePatterns(Config cfg) {
+    return ImmutableList.<String>builder()
+        .addAll(getDefaultExcludeRefPatterns())
+        .addAll(ImmutableList.copyOf(cfg.getStringList("replication", null, "excludeRefs")))
+        .build();
+  }
+
+  private List<String> getDefaultExcludeRefPatterns() {
+    return ImmutableList.of(
+        RefNames.REFS_USERS + "*",
+        RefNames.REFS_CONFIG,
+        RefNames.REFS_SEQUENCES + "*",
+        RefNames.REFS_EXTERNAL_IDS,
+        RefNames.REFS_GROUPS + "*",
+        RefNames.REFS_GROUPNAMES,
+        RefNames.REFS_CACHE_AUTOMERGE + "*",
+        RefNames.REFS_STARRED_CHANGES + "*");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RefsFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/RefsFilter.java
similarity index 70%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/pull/RefsFilter.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/RefsFilter.java
index 0b9a691..7ec19ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RefsFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/RefsFilter.java
@@ -12,20 +12,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.googlesource.gerrit.plugins.replication.pull;
+package com.googlesource.gerrit.plugins.replication.pull.filter;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
 import com.google.gerrit.entities.AccessSection;
-import com.google.gerrit.entities.RefNames;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import java.util.List;
 import org.eclipse.jgit.lib.Config;
 
-@Singleton
-public class RefsFilter {
+public abstract class RefsFilter {
   public enum PatternType {
     REGEX,
     WILDCARD,
@@ -44,7 +39,6 @@
 
   private final List<String> refsPatterns;
 
-  @Inject
   public RefsFilter(ReplicationConfig replicationConfig) {
     refsPatterns = getRefNamePatterns(replicationConfig.getConfig());
   }
@@ -66,12 +60,7 @@
     return false;
   }
 
-  private List<String> getRefNamePatterns(Config cfg) {
-    return ImmutableList.<String>builder()
-        .addAll(getDefaultExcludeRefPatterns())
-        .addAll(ImmutableList.copyOf(cfg.getStringList("replication", null, "excludeRefs")))
-        .build();
-  }
+  protected abstract List<String> getRefNamePatterns(Config cfg);
 
   private boolean matchesPattern(String refName, String pattern) {
     boolean match = false;
@@ -87,16 +76,4 @@
     }
     return match;
   }
-
-  private List<String> getDefaultExcludeRefPatterns() {
-    return ImmutableList.of(
-        RefNames.REFS_USERS + "*",
-        RefNames.REFS_CONFIG,
-        RefNames.REFS_SEQUENCES + "*",
-        RefNames.REFS_EXTERNAL_IDS,
-        RefNames.REFS_GROUPS + "*",
-        RefNames.REFS_GROUPNAMES,
-        RefNames.REFS_CACHE_AUTOMERGE + "*",
-        RefNames.REFS_STARRED_CHANGES + "*");
-  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/SyncRefsFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/SyncRefsFilter.java
new file mode 100644
index 0000000..a069935
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/filter/SyncRefsFilter.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.filter;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import java.util.List;
+import org.eclipse.jgit.lib.Config;
+
+@Singleton
+public class SyncRefsFilter extends RefsFilter {
+  @Inject
+  public SyncRefsFilter(ReplicationConfig replicationConfig) {
+    super(replicationConfig);
+  }
+
+  @Override
+  protected List<String> getRefNamePatterns(Config cfg) {
+    return ImmutableList.copyOf(cfg.getStringList("replication", null, "syncRefs"));
+  }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index d921e0b..22b52e7 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -15,3 +15,8 @@
 To be allowed to trigger pull replication a user must be a member of a
 group that is granted the 'Pull Replication' capability (provided
 by this plugin) or the 'Administrate Server' capability.
+
+Change Indexing
+--------
+
+Changes will be automatically indexed upon replication.
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index ae2c2fd..126cd1b 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -181,6 +181,37 @@
 
     By default, all other refs are included.
 
+replication.syncRefs
+:   Specify for which refs git fetch calls should be executed synchronously.
+    It can be provided more than once, and supports three formats: regular expressions,
+    wildcard matching, and single ref matching. All three formats match are case-sensitive.
+
+    Values starting with a caret `^` are treated as regular
+    expressions. For the regular expressions details please follow
+    official [java documentation](https://docs.oracle.com/javase/tutorial/essential/regex/).
+
+    Please note that regular expressions could also be used
+    with inverse match.
+
+    Values that are not regular expressions and end in `*` are
+    treated as wildcard matches. Wildcards match refs whose
+    name agrees from the beginning until the trailing `*`. So
+    `foo/b*` would match the refs `foo/b`, `foo/bar`, and
+    `foo/baz`, but neither `foobar`, nor `bar/foo/baz`.
+
+    Values that are neither regular expressions nor wildcards are
+    treated as single ref matches. So `foo/bar` matches only
+    the ref `foo/bar`, but no other refs.
+
+    By default, set to '*' (all refs are replicated synchronously).
+
+replication.maxApiPayloadSize
+:	Maximum size in bytes of the ref to be sent as a REST Api call
+	payload. For refs larger than threshold git fetch operation
+	will be used.
+
+	Default: 10000
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
@@ -261,13 +292,14 @@
 	Defaults to 0 seconds, wait indefinitely.
 
 remote.NAME.replicationDelay
-:	Time to wait before scheduling a remote fetch operation. Setting
-	the delay to 0 effectively disables the delay, causing the fetch
-	to start as soon as possible.
+:	Time to wait before scheduling an asynchronous remote fetch
+	operation. Setting the delay to 0 effectively disables the delay,
+	causing the fetch to start as soon as possible.
 
 	This is a Gerrit specific extension to the Git remote block.
 
-	By default, 0 seconds.
+	By default for asynchronous fetch, 4 seconds. For a synchronous fetch
+	replicationDelay is zero.
 
 remote.NAME.rescheduleDelay
 :	Delay when rescheduling a fetch operation due to an in-flight fetch
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
index f0f1c54..77dc947 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
@@ -21,6 +21,7 @@
 
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.CommandProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
 import java.net.URISyntaxException;
@@ -32,15 +33,20 @@
 public class FetchGitUpdateProcessingTest {
   private EventDispatcher dispatcherMock;
   private GitUpdateProcessing gitUpdateProcessing;
+  private CommandProcessing commandProcessing;
+  private Command sshCommandMock;
 
   @Before
   public void setUp() throws Exception {
     dispatcherMock = mock(EventDispatcher.class);
     gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
+    sshCommandMock = mock(Command.class);
+    commandProcessing = new CommandProcessing(sshCommandMock, dispatcherMock);
   }
 
   @Test
-  public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
+  public void headRefReplicatedInGitUpdateProcessing()
+      throws URISyntaxException, PermissionBackendException {
     FetchRefReplicatedEvent expectedEvent =
         new FetchRefReplicatedEvent(
             "someProject",
@@ -59,6 +65,26 @@
   }
 
   @Test
+  public void headRefReplicatedInCommandProcessing()
+      throws URISyntaxException, PermissionBackendException {
+    FetchRefReplicatedEvent expectedEvent =
+        new FetchRefReplicatedEvent(
+            "someProject",
+            "refs/heads/master",
+            "someHost",
+            RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+
+    commandProcessing.onOneProjectReplicationDone(
+        "someProject",
+        "refs/heads/master",
+        new URIish("git://someHost/someProject.git"),
+        RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
+  }
+
+  @Test
   public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
     FetchRefReplicatedEvent expectedEvent =
         new FetchRefReplicatedEvent(
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 61520f4..327c6ba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -15,7 +15,14 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static java.nio.file.Files.createTempDirectory;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.gerrit.extensions.api.changes.NotifyHandling;
 import com.google.gerrit.extensions.common.AccountInfo;
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
@@ -27,41 +34,130 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.HttpResult;
+import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.http.client.ClientProtocolException;
+import org.eclipse.jgit.errors.LargeObjectException;
+import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ReplicationQueueTest {
+  private static int CONNECTION_TIMEOUT = 1000000;
+
   @Mock private WorkQueue wq;
+  @Mock private Source source;
+  @Mock private SourcesCollection sourceCollection;
   @Mock private Provider<SourcesCollection> rd;
   @Mock private DynamicItem<EventDispatcher> dis;
   @Mock ReplicationStateListeners sl;
+  @Mock FetchRestApiClient fetchRestApiClient;
   @Mock FetchRestApiClient.Factory fetchClientFactory;
   @Mock AccountInfo accountInfo;
+  @Mock RevisionReader revReader;
+  @Mock RevisionData revisionData;
+  @Mock HttpResult httpResult;
 
-  RefsFilter refsFilter;
-
+  private ExcludedRefsFilter refsFilter;
   private ReplicationQueue objectUnderTest;
   private SitePaths sitePaths;
   private Path pluginDataPath;
 
   @Before
-  public void setup() throws IOException {
+  public void setup() throws IOException, LargeObjectException, RefUpdateException {
     Path sitePath = createTempPath("site");
     sitePaths = new SitePaths(sitePath);
     Path pluginDataPath = createTempPath("data");
     ReplicationConfig replicationConfig = new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
-    refsFilter = new RefsFilter(replicationConfig);
-    objectUnderTest = new ReplicationQueue(wq, rd, dis, sl, fetchClientFactory, refsFilter);
+    refsFilter = new ExcludedRefsFilter(replicationConfig);
+    when(source.getConnectionTimeout()).thenReturn(CONNECTION_TIMEOUT);
+    when(source.wouldFetchProject(any())).thenReturn(true);
+    when(source.wouldFetchRef(anyString())).thenReturn(true);
+    ImmutableList<String> apis = ImmutableList.of("http://localhost:18080");
+    when(source.getApis()).thenReturn(apis);
+    when(sourceCollection.getAll()).thenReturn(Lists.newArrayList(source));
+    when(rd.get()).thenReturn(sourceCollection);
+    when(revReader.read(any(), anyString())).thenReturn(Optional.of(revisionData));
+    when(fetchClientFactory.create(any())).thenReturn(fetchRestApiClient);
+    when(fetchRestApiClient.callSendObject(any(), anyString(), any(), any()))
+        .thenReturn(httpResult);
+    when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(httpResult);
+    when(httpResult.isSuccessful()).thenReturn(true);
+
+    objectUnderTest =
+        new ReplicationQueue(wq, rd, dis, sl, fetchClientFactory, refsFilter, revReader);
+  }
+
+  @Test
+  public void shouldCallSendObjectWhenMetaRef() throws ClientProtocolException, IOException {
+    Event event = new TestEvent("refs/changes/01/1/meta");
+    objectUnderTest.start();
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    verify(fetchRestApiClient).callSendObject(any(), anyString(), any(), any());
+  }
+
+  @Test
+  public void shouldCallSendObjectWhenPatchSetRef() throws ClientProtocolException, IOException {
+    Event event = new TestEvent("refs/changes/01/1/1");
+    objectUnderTest.start();
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    verify(fetchRestApiClient).callSendObject(any(), anyString(), any(), any());
+  }
+
+  @Test
+  public void shouldFallbackToCallFetchWhenIOException()
+      throws ClientProtocolException, IOException, LargeObjectException, RefUpdateException {
+    Event event = new TestEvent("refs/changes/01/1/meta");
+    objectUnderTest.start();
+
+    when(revReader.read(any(), anyString())).thenThrow(IOException.class);
+
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+  }
+
+  @Test
+  public void shouldFallbackToCallFetchWhenLargeRef()
+      throws ClientProtocolException, IOException, LargeObjectException, RefUpdateException {
+    Event event = new TestEvent("refs/changes/01/1/1");
+    objectUnderTest.start();
+
+    when(revReader.read(any(), anyString())).thenReturn(Optional.empty());
+
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
+  }
+
+  @Test
+  public void shouldFallbackToCallFetchWhenParentObjectIsMissing()
+      throws ClientProtocolException, IOException {
+    Event event = new TestEvent("refs/changes/01/1/meta");
+    objectUnderTest.start();
+
+    when(httpResult.isSuccessful()).thenReturn(false);
+    when(httpResult.isParentObjectMissing()).thenReturn(true);
+    when(fetchRestApiClient.callSendObject(any(), anyString(), any(), any()))
+        .thenReturn(httpResult);
+
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    verify(fetchRestApiClient).callFetch(any(), anyString(), any());
   }
 
   @Test
@@ -69,7 +165,7 @@
     Event event = new TestEvent("refs/users/00/1000000");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -77,7 +173,7 @@
     Event event = new TestEvent("refs/groups/a1/a16d5b33cc789d60b682c654f03f9cc2feb12975");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -85,7 +181,7 @@
     Event event = new TestEvent("refs/meta/group-names");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -93,7 +189,7 @@
     Event event = new TestEvent("refs/sequences/changes");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -103,12 +199,14 @@
     fileConfig.setString("replication", null, "excludeRefs", "refs/multi-site/version");
     fileConfig.save();
     ReplicationConfig replicationConfig = new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
-    refsFilter = new RefsFilter(replicationConfig);
-    objectUnderTest = new ReplicationQueue(wq, rd, dis, sl, fetchClientFactory, refsFilter);
+    refsFilter = new ExcludedRefsFilter(replicationConfig);
+
+    objectUnderTest =
+        new ReplicationQueue(wq, rd, dis, sl, fetchClientFactory, refsFilter, revReader);
     Event event = new TestEvent("refs/multi-site/version");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -116,7 +214,7 @@
     Event event = new TestEvent("refs/starred-changes/41/2941/1000000");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -124,7 +222,7 @@
     Event event = new TestEvent("refs/meta/config");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   @Test
@@ -132,7 +230,7 @@
     Event event = new TestEvent("refs/meta/external-ids");
     objectUnderTest.onGitReferenceUpdated(event);
 
-    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+    verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
   }
 
   protected static Path createTempPath(String prefix) throws IOException {
@@ -141,9 +239,17 @@
 
   private class TestEvent implements GitReferenceUpdatedListener.Event {
     private String refName;
+    private String projectName;
+    private ObjectId newObjectId;
 
     public TestEvent(String refName) {
+      this(refName, "defaultProject", ObjectId.zeroId());
+    }
+
+    public TestEvent(String refName, String projectName, ObjectId newObjectId) {
       this.refName = refName;
+      this.projectName = projectName;
+      this.newObjectId = newObjectId;
     }
 
     @Override
@@ -153,7 +259,7 @@
 
     @Override
     public String getProjectName() {
-      return null;
+      return projectName;
     }
 
     @Override
@@ -168,7 +274,7 @@
 
     @Override
     public String getNewObjectId() {
-      return null;
+      return newObjectId.getName();
     }
 
     @Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/RevisionReaderIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/RevisionReaderIT.java
new file mode 100644
index 0000000..d1a4c85
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/RevisionReaderIT.java
@@ -0,0 +1,129 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Patch;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.api.changes.ReviewInput;
+import com.google.gerrit.extensions.api.changes.ReviewInput.CommentInput;
+import com.google.gerrit.extensions.client.Comment;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Constants;
+import org.junit.Before;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.RevisionReaderIT$TestModule")
+public class RevisionReaderIT extends LightweightPluginDaemonTest {
+  RevisionReader objectUnderTest;
+
+  @Before
+  public void setup() {
+    objectUnderTest = plugin.getSysInjector().getInstance(RevisionReader.class);
+  }
+
+  @Test
+  public void shouldReadRefMetaObject() throws Exception {
+    Result pushResult = createChange();
+    String refName = RefNames.changeMetaRef(pushResult.getChange().getId());
+
+    Optional<RevisionData> revisionDataOption = objectUnderTest.read(project, refName);
+
+    assertThat(revisionDataOption.isPresent()).isTrue();
+    RevisionData revisionData = revisionDataOption.get();
+    assertThat(revisionData).isNotNull();
+    assertThat(revisionData.getCommitObject()).isNotNull();
+    assertThat(revisionData.getCommitObject().getType()).isEqualTo(Constants.OBJ_COMMIT);
+    assertThat(revisionData.getCommitObject().getContent()).isNotEmpty();
+
+    assertThat(revisionData.getTreeObject()).isNotNull();
+    assertThat(revisionData.getTreeObject().getType()).isEqualTo(Constants.OBJ_TREE);
+    assertThat(revisionData.getTreeObject().getContent()).isEmpty();
+
+    assertThat(revisionData.getBlobs()).isEmpty();
+  }
+
+  @Test
+  public void shouldReadRefMetaObjectWithComments() throws Exception {
+    Result pushResult = createChange();
+    Change.Id changeId = pushResult.getChange().getId();
+    String refName = RefNames.changeMetaRef(changeId);
+
+    CommentInput comment = createCommentInput(1, 0, 1, 1, "Test comment");
+
+    ReviewInput reviewInput = new ReviewInput();
+    reviewInput.comments = ImmutableMap.of(Patch.COMMIT_MSG, ImmutableList.of(comment));
+    gApi.changes().id(changeId.get()).current().review(reviewInput);
+
+    Optional<RevisionData> revisionDataOption = objectUnderTest.read(project, refName);
+
+    assertThat(revisionDataOption.isPresent()).isTrue();
+    RevisionData revisionData = revisionDataOption.get();
+    assertThat(revisionData).isNotNull();
+    assertThat(revisionData.getCommitObject()).isNotNull();
+    assertThat(revisionData.getCommitObject().getType()).isEqualTo(Constants.OBJ_COMMIT);
+    assertThat(revisionData.getCommitObject().getContent()).isNotEmpty();
+
+    assertThat(revisionData.getTreeObject()).isNotNull();
+    assertThat(revisionData.getTreeObject().getType()).isEqualTo(Constants.OBJ_TREE);
+    assertThat(revisionData.getTreeObject().getContent()).isNotEmpty();
+
+    assertThat(revisionData.getBlobs()).hasSize(1);
+    RevisionObjectData blobObject = revisionData.getBlobs().get(0);
+    assertThat(blobObject.getType()).isEqualTo(Constants.OBJ_BLOB);
+    assertThat(blobObject.getContent()).isNotEmpty();
+  }
+
+  private CommentInput createCommentInput(
+      int startLine, int startCharacter, int endLine, int endCharacter, String message) {
+    CommentInput comment = new CommentInput();
+    comment.range = new Comment.Range();
+    comment.range.startLine = startLine;
+    comment.range.startCharacter = startCharacter;
+    comment.range.endLine = endLine;
+    comment.range.endCharacter = endCharacter;
+    comment.message = message;
+    comment.path = Patch.COMMIT_MSG;
+    return comment;
+  }
+
+  @SuppressWarnings("unused")
+  private static class TestModule extends FactoryModule {
+    @Override
+    protected void configure() {
+      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+      bind(RevisionReader.class).in(Scopes.SINGLETON);
+      bind(ApplyObject.class);
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
new file mode 100644
index 0000000..75c31de
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
@@ -0,0 +1,272 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.restapi.Url;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.pull.RevisionReader;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.List;
+import java.util.Optional;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule")
+public class ApplyObjectActionIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final int TEST_REPLICATION_DELAY = 60;
+  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final String TEST_REPLICATION_REMOTE = "remote1";
+
+  private Path gitPath;
+  private FileBasedConfig config;
+  private FileBasedConfig secureConfig;
+  private RevisionReader revisionReader;
+
+  @Inject private SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  CredentialsFactory credentials;
+  Source source;
+  SourceHttpClient.Factory httpClientFactory;
+  String url;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    setReplicationSource(
+        TEST_REPLICATION_REMOTE,
+        TEST_REPLICATION_SUFFIX,
+        ALL_PROJECTS); // Simulates a full replication.config initialization
+    config.save();
+
+    secureConfig =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED);
+    setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
+    secureConfig.save();
+
+    super.setUpTestPlugin();
+
+    httpClientFactory = plugin.getSysInjector().getInstance(SourceHttpClient.Factory.class);
+    credentials = plugin.getSysInjector().getInstance(CredentialsFactory.class);
+    revisionReader = plugin.getSysInjector().getInstance(RevisionReader.class);
+    source = plugin.getSysInjector().getInstance(SourcesCollection.class).getAll().get(0);
+
+    url =
+        String.format(
+            "%s/a/projects/%s/pull-replication~apply-object",
+            adminRestSession.url(), Url.encode(project.get()));
+  }
+
+  @Test
+  public void shouldAcceptPayloadWithAsyncField() throws Exception {
+    String payloadWithAsyncFieldTemplate =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
+
+    String refName = createRef();
+    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    assertThat(revisionDataOption.isPresent()).isTrue();
+
+    RevisionData revisionData = revisionDataOption.get();
+    String sendObjectPayload = createPayload(payloadWithAsyncFieldTemplate, refName, revisionData);
+
+    HttpPost post = createRequest(sendObjectPayload);
+    httpClientFactory.create(source).execute(post, assertHttpResponseCode(201), getContext());
+  }
+
+  @Test
+  public void shouldAcceptPayloadWithoutAsyncField() throws Exception {
+    String payloadWithoutAsyncFieldTemplate =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+
+    String refName = createRef();
+    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    assertThat(revisionDataOption.isPresent()).isTrue();
+
+    RevisionData revisionData = revisionDataOption.get();
+    String sendObjectPayload =
+        createPayload(payloadWithoutAsyncFieldTemplate, refName, revisionData);
+
+    HttpPost post = createRequest(sendObjectPayload);
+    httpClientFactory.create(source).execute(post, assertHttpResponseCode(201), getContext());
+  }
+
+  @Test
+  public void shouldReturnBadRequestCodeWhenMandatoryFieldLabelIsMissing() throws Exception {
+    String payloadWithoutLabelFieldTemplate =
+        "{\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
+
+    String refName = createRef();
+    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    assertThat(revisionDataOption.isPresent()).isTrue();
+
+    RevisionData revisionData = revisionDataOption.get();
+    String sendObjectPayload =
+        createPayload(payloadWithoutLabelFieldTemplate, refName, revisionData);
+
+    HttpPost post = createRequest(sendObjectPayload);
+    httpClientFactory.create(source).execute(post, assertHttpResponseCode(400), getContext());
+  }
+
+  @Test
+  public void shouldReturnBadRequestCodeWhenPayloadIsNotAProperJSON() throws Exception {
+    String wrongPayloadTemplate =
+        "{\"label\":\""
+            + TEST_REPLICATION_REMOTE
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true,}";
+
+    String refName = createRef();
+    Optional<RevisionData> revisionDataOption = createRevisionData(refName);
+    assertThat(revisionDataOption.isPresent()).isTrue();
+
+    RevisionData revisionData = revisionDataOption.get();
+    String sendObjectPayload = createPayload(wrongPayloadTemplate, refName, revisionData);
+
+    HttpPost post = createRequest(sendObjectPayload);
+    httpClientFactory.create(source).execute(post, assertHttpResponseCode(400), getContext());
+  }
+
+  private String createPayload(
+      String wrongPayloadTemplate, String refName, RevisionData revisionData) {
+    String sendObjectPayload =
+        String.format(
+            wrongPayloadTemplate,
+            refName,
+            encode(revisionData.getCommitObject().getContent()),
+            encode(revisionData.getTreeObject().getContent()));
+    return sendObjectPayload;
+  }
+
+  private HttpPost createRequest(String sendObjectPayload) {
+    HttpPost post = new HttpPost(url);
+    post.setEntity(new StringEntity(sendObjectPayload, StandardCharsets.UTF_8));
+    post.addHeader(new BasicHeader("Content-Type", "application/json"));
+    return post;
+  }
+
+  private String createRef() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    Result pushResult = createChange("topic", "test.txt", "test_content");
+    return RefNames.changeMetaRef(pushResult.getChange().getId());
+  }
+
+  private Optional<RevisionData> createRevisionData(String refName) throws Exception {
+    return revisionReader.read(Project.nameKey(project + TEST_REPLICATION_SUFFIX), refName);
+  }
+
+  private Object encode(byte[] content) {
+    return Base64.getEncoder().encodeToString(content);
+  }
+
+  public ResponseHandler<Object> assertHttpResponseCode(int responseCode) {
+    return new ResponseHandler<Object>() {
+
+      @Override
+      public Object handleResponse(HttpResponse response)
+          throws ClientProtocolException, IOException {
+        assertThat(response.getStatusLine().getStatusCode()).isEqualTo(responseCode);
+        return null;
+      }
+    };
+  }
+
+  private HttpClientContext getContext() {
+    HttpClientContext ctx = HttpClientContext.create();
+    CredentialsProvider adapted = new BasicCredentialsProvider();
+    adapted.setCredentials(
+        AuthScope.ANY, new UsernamePasswordCredentials(admin.username(), admin.httpPassword()));
+    ctx.setCredentialsProvider(adapted);
+    return ctx;
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).parent(project).create();
+  }
+
+  private void setReplicationSource(
+      String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+    setReplicationSource(remoteName, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setReplicationSource(
+      String remoteName, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    config.setString("replication", null, "instanceLabel", remoteName);
+    config.setStringList("remote", remoteName, "url", replicaUrls);
+    config.setString("remote", remoteName, "apiUrl", adminRestSession.url());
+    config.setString("remote", remoteName, "fetch", "+refs/tags/*:refs/tags/*");
+    config.setInt("remote", remoteName, "timeout", 600);
+    config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  private void setReplicationCredentials(String remoteName, String username, String password)
+      throws IOException {
+    secureConfig.setString("remote", remoteName, "username", username);
+    secureConfig.setString("remote", remoteName, "password", password);
+    secureConfig.save();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
new file mode 100644
index 0000000..8738046
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
@@ -0,0 +1,193 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.http.HttpStatus.SC_CREATED;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.ResourceConflictException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.project.ProjectResource;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
+import java.io.IOException;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.ObjectId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ApplyObjectActionTest {
+  ApplyObjectAction applyObjectAction;
+  String label = "instance-2-label";
+  String url = "file:///gerrit-host/instance-1/git/${name}.git";
+  String refName = "refs/heads/master";
+  String location = "http://gerrit-host/a/config/server/tasks/08d173e9";
+  int taskId = 1234;
+
+  private String sampleCommitContent =
+      "tree 4b825dc642cb6eb9a060e54bf8d69288fbee4904\n"
+          + "parent 20eb48d28be86dc88fb4bef747f08de0fbefe12d\n"
+          + "author Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac> 1610471611 +0100\n"
+          + "committer Gerrit Code Review <root@maczech-XPS-15> 1610471611 +0100\n"
+          + "\n"
+          + "Update patch set 1\n"
+          + "\n"
+          + "Change has been successfully merged by Administrator\n"
+          + "\n"
+          + "Patch-set: 1\n"
+          + "Status: merged\n"
+          + "Tag: autogenerated:gerrit:merged\n"
+          + "Reviewer: Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac>\n"
+          + "Label: SUBM=+1\n"
+          + "Submission-id: 1904-1610471611558-783c0a2f\n"
+          + "Submitted-with: OK\n"
+          + "Submitted-with: OK: Code-Review: Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac>";
+
+  @Mock ApplyObjectCommand applyObjectCommand;
+  @Mock ProjectResource projectResource;
+  @Mock FetchPreconditions preConditions;
+
+  @Before
+  public void setup() {
+    when(preConditions.canCallFetchApi()).thenReturn(true);
+
+    applyObjectAction = new ApplyObjectAction(applyObjectCommand, preConditions);
+  }
+
+  @Test
+  public void shouldReturnCreatedResponseCode() throws RestApiException {
+    RevisionInput inputParams = new RevisionInput(label, refName, createSampleRevisionData());
+
+    Response<?> response = applyObjectAction.apply(projectResource, inputParams);
+
+    assertThat(response.statusCode()).isEqualTo(SC_CREATED);
+  }
+
+  @SuppressWarnings("cast")
+  @Test
+  public void shouldReturnSourceUrlAndrefNameAsAResponseBody() throws Exception {
+    RevisionInput inputParams = new RevisionInput(label, refName, createSampleRevisionData());
+    Response<?> response = applyObjectAction.apply(projectResource, inputParams);
+
+    assertThat((RevisionInput) response.value()).isEqualTo(inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingLabel() throws Exception {
+    RevisionInput inputParams = new RevisionInput(null, refName, createSampleRevisionData());
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenEmptyLabel() throws Exception {
+    RevisionInput inputParams = new RevisionInput("", refName, createSampleRevisionData());
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingRefName() throws Exception {
+    RevisionInput inputParams = new RevisionInput(label, null, createSampleRevisionData());
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenEmptyRefName() throws Exception {
+    RevisionInput inputParams = new RevisionInput(label, "", createSampleRevisionData());
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingRevisionData() throws Exception {
+    RevisionInput inputParams = new RevisionInput(label, refName, null);
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingCommitObjectData() throws Exception {
+    RevisionObjectData commitData = new RevisionObjectData(Constants.OBJ_COMMIT, null);
+    RevisionObjectData treeData = new RevisionObjectData(Constants.OBJ_TREE, new byte[] {});
+    RevisionInput inputParams =
+        new RevisionInput(label, refName, createSampleRevisionData(commitData, treeData));
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingTreeObject() throws Exception {
+    RevisionObjectData commitData =
+        new RevisionObjectData(Constants.OBJ_COMMIT, sampleCommitContent.getBytes());
+    RevisionInput inputParams =
+        new RevisionInput(label, refName, createSampleRevisionData(commitData, null));
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = AuthException.class)
+  public void shouldThrowAuthExceptionWhenCallFetchActionCapabilityNotAssigned()
+      throws RestApiException {
+    RevisionInput inputParams = new RevisionInput(label, refName, createSampleRevisionData());
+
+    when(preConditions.canCallFetchApi()).thenReturn(false);
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = ResourceConflictException.class)
+  public void shouldThrowResourceConflictExceptionWhenMissingParentObject()
+      throws RestApiException, IOException, RefUpdateException, MissingParentObjectException {
+    RevisionInput inputParams = new RevisionInput(label, refName, createSampleRevisionData());
+
+    doThrow(
+            new MissingParentObjectException(
+                Project.nameKey("test_projects"), refName, ObjectId.zeroId()))
+        .when(applyObjectCommand)
+        .applyObject(any(), anyString(), any(), anyString());
+
+    applyObjectAction.apply(projectResource, inputParams);
+  }
+
+  private RevisionData createSampleRevisionData() {
+    RevisionObjectData commitData =
+        new RevisionObjectData(Constants.OBJ_COMMIT, sampleCommitContent.getBytes());
+    RevisionObjectData treeData = new RevisionObjectData(Constants.OBJ_TREE, new byte[] {});
+    return createSampleRevisionData(commitData, treeData);
+  }
+
+  private RevisionData createSampleRevisionData(
+      RevisionObjectData commitData, RevisionObjectData treeData) {
+    return new RevisionData(commitData, treeData, Lists.newArrayList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
new file mode 100644
index 0000000..51051c0
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
@@ -0,0 +1,100 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.Project.NameKey;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
+import java.io.IOException;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ApplyObjectCommandTest {
+  private static final String TEST_SOURCE_LABEL = "test-source-label";
+  private static final String TEST_REF_NAME = "refs/changes/01/1/1";
+  private static final NameKey TEST_PROJECT_NAME = Project.nameKey("test-project");
+  private static final String TEST_REMOTE_NAME = "test-remote-name";
+
+  @Mock private PullReplicationStateLogger fetchStateLog;
+  @Mock private ApplyObject applyObject;
+  @Mock private ApplyObjectMetrics metrics;
+  @Mock private DynamicItem<EventDispatcher> eventDispatcherDataItem;
+  @Mock private EventDispatcher eventDispatcher;
+  @Mock private Timer1.Context<String> timetContext;
+  @Captor ArgumentCaptor<Event> eventCaptor;
+
+  private ApplyObjectCommand objectUnderTest;
+
+  @Before
+  public void setup() throws MissingParentObjectException, IOException {
+    RefUpdateState state = new RefUpdateState(TEST_REMOTE_NAME, RefUpdate.Result.NEW);
+    when(eventDispatcherDataItem.get()).thenReturn(eventDispatcher);
+    when(metrics.start(anyString())).thenReturn(timetContext);
+    when(timetContext.stop()).thenReturn(100L);
+    when(applyObject.apply(any(), any(), any())).thenReturn(state);
+
+    objectUnderTest =
+        new ApplyObjectCommand(fetchStateLog, applyObject, metrics, eventDispatcherDataItem);
+  }
+
+  @Test
+  public void shouldSendEventWhenApplyObject()
+      throws PermissionBackendException, IOException, RefUpdateException,
+          MissingParentObjectException {
+    objectUnderTest.applyObject(
+        TEST_PROJECT_NAME, TEST_REF_NAME, createSampleRevisionData(), TEST_SOURCE_LABEL);
+
+    verify(eventDispatcher).postEvent(eventCaptor.capture());
+    Event sentEvent = eventCaptor.getValue();
+    assertThat(sentEvent).isInstanceOf(FetchRefReplicatedEvent.class);
+    FetchRefReplicatedEvent fetchEvent = (FetchRefReplicatedEvent) sentEvent;
+    assertThat(fetchEvent.getProjectNameKey()).isEqualTo(TEST_PROJECT_NAME);
+    assertThat(fetchEvent.getRefName()).isEqualTo(TEST_REF_NAME);
+  }
+
+  private RevisionData createSampleRevisionData() {
+    RevisionObjectData commitData = new RevisionObjectData(Constants.OBJ_COMMIT, new byte[] {});
+    RevisionObjectData treeData = new RevisionObjectData(Constants.OBJ_TREE, new byte[] {});
+    return new RevisionData(commitData, treeData, Lists.newArrayList());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
index 2c888c8..fb7f3d1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -147,7 +147,7 @@
     inputParams.label = label;
     inputParams.refName = refName;
 
-    doThrow(new InterruptedException()).when(fetchCommand).fetch(any(), any(), any());
+    doThrow(new InterruptedException()).when(fetchCommand).fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -162,7 +162,7 @@
 
     doThrow(new RemoteConfigurationMissingException(""))
         .when(fetchCommand)
-        .fetch(any(), any(), any());
+        .fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -177,7 +177,7 @@
 
     doThrow(new ExecutionException(new RuntimeException()))
         .when(fetchCommand)
-        .fetch(any(), any(), any());
+        .fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -190,7 +190,7 @@
     inputParams.label = label;
     inputParams.refName = refName;
 
-    doThrow(new IllegalStateException()).when(fetchCommand).fetch(any(), any(), any());
+    doThrow(new IllegalStateException()).when(fetchCommand).fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
@@ -203,7 +203,7 @@
     inputParams.label = label;
     inputParams.refName = refName;
 
-    doThrow(new TimeoutException()).when(fetchCommand).fetch(any(), any(), any());
+    doThrow(new TimeoutException()).when(fetchCommand).fetchSync(any(), any(), any());
 
     fetchAction.apply(projectResource, inputParams);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
index e625e0f..e1ad565 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -15,6 +15,8 @@
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
 import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.ASYNC;
+import static com.googlesource.gerrit.plugins.replication.pull.ReplicationType.SYNC;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anyString;
@@ -25,6 +27,8 @@
 
 import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
@@ -51,6 +55,7 @@
   @Mock PullReplicationStateLogger fetchStateLog;
   @Mock Source source;
   @Mock SourcesCollection sources;
+  @Mock DynamicItem<EventDispatcher> eventDispatcher;
 
   @SuppressWarnings("rawtypes")
   @Mock
@@ -71,27 +76,37 @@
     when(fetchReplicationStateFactory.create(any())).thenReturn(state);
     when(source.getRemoteConfigName()).thenReturn(label);
     when(sources.getAll()).thenReturn(Lists.newArrayList(source));
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true))
+    when(source.schedule(eq(projectName), eq(REF_NAME_TO_FETCH), eq(state), any()))
         .thenReturn(CompletableFuture.completedFuture(null));
-    objectUnderTest = new FetchCommand(fetchReplicationStateFactory, fetchStateLog, sources);
+    objectUnderTest =
+        new FetchCommand(fetchReplicationStateFactory, fetchStateLog, sources, eventDispatcher);
   }
 
   @Test
   public void shouldScheduleRefFetch()
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH);
+    objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, true);
+    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
+  }
+
+  @Test
+  public void shouldScheduleRefFetchWithDelay()
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    objectUnderTest.fetchAsync(projectName, label, REF_NAME_TO_FETCH);
+
+    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, ASYNC);
   }
 
   @Test
   public void shouldMarkAllFetchTasksScheduled()
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException {
-    objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH);
+    objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH);
 
-    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, true);
+    verify(source, times(1)).schedule(projectName, REF_NAME_TO_FETCH, state, SYNC);
     verify(state, times(1)).markAllFetchTasksScheduled();
   }
 
@@ -99,7 +114,7 @@
   public void shouldUpdateStateWhenRemoteConfigNameIsMissing() {
     assertThrows(
         RemoteConfigurationMissingException.class,
-        () -> objectUnderTest.fetch(projectName, "unknownLabel", REF_NAME_TO_FETCH));
+        () -> objectUnderTest.fetchSync(projectName, "unknownLabel", REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(state));
   }
 
@@ -108,12 +123,12 @@
   public void shouldUpdateStateWhenInterruptedException()
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new InterruptedException());
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
 
     InterruptedException e =
         assertThrows(
             InterruptedException.class,
-            () -> objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH));
+            () -> objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
   }
 
@@ -123,12 +138,12 @@
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS)))
         .thenThrow(new ExecutionException(new Exception()));
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
 
     ExecutionException e =
         assertThrows(
             ExecutionException.class,
-            () -> objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH));
+            () -> objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
   }
 
@@ -137,12 +152,12 @@
   public void shouldUpdateStateWhenTimeoutException()
       throws InterruptedException, ExecutionException, TimeoutException {
     when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new TimeoutException());
-    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, true)).thenReturn(future);
+    when(source.schedule(projectName, REF_NAME_TO_FETCH, state, SYNC)).thenReturn(future);
 
     TimeoutException e =
         assertThrows(
             TimeoutException.class,
-            () -> objectUnderTest.fetch(projectName, label, REF_NAME_TO_FETCH));
+            () -> objectUnderTest.fetchSync(projectName, label, REF_NAME_TO_FETCH));
     verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index c62ddab..a3b0b02 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -23,11 +23,15 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
@@ -36,6 +40,7 @@
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.message.BasicHeader;
+import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.CredentialItem;
 import org.eclipse.jgit.transport.CredentialsProvider;
@@ -64,11 +69,62 @@
   @Captor ArgumentCaptor<HttpPost> httpPostCaptor;
 
   String api = "http://gerrit-host";
+  String pluginName = "pull-replication";
   String label = "Replication";
   String refName = RefNames.REFS_HEADS + "master";
 
-  String expectedPayload = "{\"label\":\"Replication\", \"ref_name\": \"" + refName + "\"}";
+  String expectedPayload =
+      "{\"label\":\"Replication\", \"ref_name\": \"" + refName + "\", \"async\":false}";
+  String expectedAsyncPayload =
+      "{\"label\":\"Replication\", \"ref_name\": \"" + refName + "\", \"async\":true}";
   Header expectedHeader = new BasicHeader("Content-Type", "application/json");
+  SyncRefsFilter syncRefsFilter;
+
+  String expectedSendObjectPayload =
+      "{\"label\":\"Replication\",\"ref_name\":\"refs/heads/master\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"dHJlZSA3NzgxNGQyMTZhNmNhYjJkZGI5ZjI4NzdmYmJkMGZlYmRjMGZhNjA4CnBhcmVudCA5ODNmZjFhM2NmNzQ3MjVhNTNhNWRlYzhkMGMwNjEyMjEyOGY1YThkCmF1dGhvciBHZXJyaXQgVXNlciAxMDAwMDAwIDwxMDAwMDAwQDY5ZWMzOGYwLTM1MGUtNGQ5Yy05NmQ0LWJjOTU2ZjJmYWFhYz4gMTYxMDU3ODY0OCArMDEwMApjb21taXR0ZXIgR2Vycml0IENvZGUgUmV2aWV3IDxyb290QG1hY3plY2gtWFBTLTE1PiAxNjEwNTc4NjQ4ICswMTAwCgpVcGRhdGUgcGF0Y2ggc2V0IDEKClBhdGNoIFNldCAxOgoKKDEgY29tbWVudCkKClBhdGNoLXNldDogMQo\\u003d\"},\"tree_object\":{\"type\":2,\"content\":\"MTAwNjQ0IGJsb2IgYmIzODNmNTI0OWM2OGE0Y2M4YzgyYmRkMTIyOGI0YTg4ODNmZjZlOCAgICBmNzVhNjkwMDRhOTNiNGNjYzhjZTIxNWMxMjgwODYzNmMyYjc1Njc1\"},\"blobs\":[{\"type\":3,\"content\":\"ewogICJjb21tZW50cyI6IFsKICAgIHsKICAgICAgImtleSI6IHsKICAgICAgICAidXVpZCI6ICI5MGI1YWJmZl80ZjY3NTI2YSIsCiAgICAgICAgImZpbGVuYW1lIjogIi9DT01NSVRfTVNHIiwKICAgICAgICAicGF0Y2hTZXRJZCI6IDEKICAgICAgfSwKICAgICAgImxpbmVOYnIiOiA5LAogICAgICAiYXV0aG9yIjogewogICAgICAgICJpZCI6IDEwMDAwMDAKICAgICAgfSwKICAgICAgIndyaXR0ZW5PbiI6ICIyMDIxLTAxLTEzVDIyOjU3OjI4WiIsCiAgICAgICJzaWRlIjogMSwKICAgICAgIm1lc3NhZ2UiOiAidGVzdCBjb21tZW50IiwKICAgICAgInJhbmdlIjogewogICAgICAgICJzdGFydExpbmUiOiA5LAogICAgICAgICJzdGFydENoYXIiOiAyMSwKICAgICAgICAiZW5kTGluZSI6IDksCiAgICAgICAgImVuZENoYXIiOiAzNAogICAgICB9LAogICAgICAicmV2SWQiOiAiZjc1YTY5MDA0YTkzYjRjY2M4Y2UyMTVjMTI4MDg2MzZjMmI3NTY3NSIsCiAgICAgICJzZXJ2ZXJJZCI6ICI2OWVjMzhmMC0zNTBlLTRkOWMtOTZkNC1iYzk1NmYyZmFhYWMiLAogICAgICAidW5yZXNvbHZlZCI6IHRydWUKICAgIH0KICBdCn0\\u003d\"}]}}";
+  String commitObject =
+      "tree 77814d216a6cab2ddb9f2877fbbd0febdc0fa608\n"
+          + "parent 983ff1a3cf74725a53a5dec8d0c06122128f5a8d\n"
+          + "author Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac> 1610578648 +0100\n"
+          + "committer Gerrit Code Review <root@maczech-XPS-15> 1610578648 +0100\n"
+          + "\n"
+          + "Update patch set 1\n"
+          + "\n"
+          + "Patch Set 1:\n"
+          + "\n"
+          + "(1 comment)\n"
+          + "\n"
+          + "Patch-set: 1\n";
+  String treeObject =
+      "100644 blob bb383f5249c68a4cc8c82bdd1228b4a8883ff6e8    f75a69004a93b4ccc8ce215c12808636c2b75675";
+  String blobObject =
+      "{\n"
+          + "  \"comments\": [\n"
+          + "    {\n"
+          + "      \"key\": {\n"
+          + "        \"uuid\": \"90b5abff_4f67526a\",\n"
+          + "        \"filename\": \"/COMMIT_MSG\",\n"
+          + "        \"patchSetId\": 1\n"
+          + "      },\n"
+          + "      \"lineNbr\": 9,\n"
+          + "      \"author\": {\n"
+          + "        \"id\": 1000000\n"
+          + "      },\n"
+          + "      \"writtenOn\": \"2021-01-13T22:57:28Z\",\n"
+          + "      \"side\": 1,\n"
+          + "      \"message\": \"test comment\",\n"
+          + "      \"range\": {\n"
+          + "        \"startLine\": 9,\n"
+          + "        \"startChar\": 21,\n"
+          + "        \"endLine\": 9,\n"
+          + "        \"endChar\": 34\n"
+          + "      },\n"
+          + "      \"revId\": \"f75a69004a93b4ccc8ce215c12808636c2b75675\",\n"
+          + "      \"serverId\": \"69ec38f0-350e-4d9c-96d4-bc956f2faaac\",\n"
+          + "      \"unresolved\": true\n"
+          + "    }\n"
+          + "  ]\n"
+          + "}";
 
   FetchRestApiClient objectUnderTest;
 
@@ -92,14 +148,17 @@
     when(credentialProvider.get(any(), any(CredentialItem.class))).thenReturn(true);
     when(credentials.create(anyString())).thenReturn(credentialProvider);
     when(replicationConfig.getConfig()).thenReturn(config);
+    when(config.getStringList("replication", null, "syncRefs")).thenReturn(new String[0]);
     when(source.getRemoteConfigName()).thenReturn("Replication");
     when(config.getString("replication", null, "instanceLabel")).thenReturn(label);
 
     HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
     when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult);
     when(httpClientFactory.create(any())).thenReturn(httpClient);
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
     objectUnderTest =
-        new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source);
+        new FetchRestApiClient(
+            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
   }
 
   @Test
@@ -117,6 +176,67 @@
   }
 
   @Test
+  public void shouldByDefaultCallSyncFetchForAllRefs()
+      throws ClientProtocolException, IOException, URISyntaxException {
+
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
+  }
+
+  @Test
+  public void shouldCallAsyncFetchForAllRefs()
+      throws ClientProtocolException, IOException, URISyntaxException {
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"NO_SYNC_REFS"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedAsyncPayload);
+  }
+
+  @Test
+  public void shouldCallSyncFetchOnlyForMetaRef()
+      throws ClientProtocolException, IOException, URISyntaxException {
+    String metaRefName = "refs/changes/01/101/meta";
+    String expectedMetaRefPayload =
+        "{\"label\":\"Replication\", \"ref_name\": \"" + metaRefName + "\", \"async\":false}";
+
+    when(config.getStringList("replication", null, "syncRefs"))
+        .thenReturn(new String[] {"^refs\\/changes\\/.*\\/meta"});
+    syncRefsFilter = new SyncRefsFilter(replicationConfig);
+    objectUnderTest =
+        new FetchRestApiClient(
+            credentials, httpClientFactory, replicationConfig, syncRefsFilter, pluginName, source);
+
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedAsyncPayload);
+
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), metaRefName, new URIish(api));
+    verify(httpClient, times(2)).execute(httpPostCaptor.capture(), any(), any());
+    httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedMetaRefPayload);
+  }
+
+  @Test
   public void shouldCallFetchEndpointWithPayload()
       throws ClientProtocolException, IOException, URISyntaxException {
 
@@ -142,11 +262,59 @@
   }
 
   @Test
+  public void shouldCallSendObjectEndpoint()
+      throws ClientProtocolException, IOException, URISyntaxException {
+
+    objectUnderTest.callSendObject(
+        Project.nameKey("test_repo"), refName, createSampleRevisionData(), new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host");
+    assertThat(httpPost.getURI().getPath())
+        .isEqualTo("/a/projects/test_repo/pull-replication~apply-object");
+  }
+
+  @Test
+  public void shouldCallSendObjectEndpointWithPayload()
+      throws ClientProtocolException, IOException, URISyntaxException {
+
+    objectUnderTest.callSendObject(
+        Project.nameKey("test_repo"), refName, createSampleRevisionData(), new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(readPayload(httpPost)).isEqualTo(expectedSendObjectPayload);
+  }
+
+  @Test
+  public void shouldSetContentTypeHeaderForSendObjectCall()
+      throws ClientProtocolException, IOException, URISyntaxException {
+
+    objectUnderTest.callFetch(Project.nameKey("test_repo"), refName, new URIish(api));
+
+    verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any(), any());
+
+    HttpPost httpPost = httpPostCaptor.getValue();
+    assertThat(httpPost.getLastHeader("Content-Type").getValue())
+        .isEqualTo(expectedHeader.getValue());
+  }
+
+  @Test
   public void shouldThrowExceptionWhenInstanceLabelIsNull() {
     when(config.getString("replication", null, "instanceLabel")).thenReturn(null);
     assertThrows(
         NullPointerException.class,
-        () -> new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source));
+        () ->
+            new FetchRestApiClient(
+                credentials,
+                httpClientFactory,
+                replicationConfig,
+                syncRefsFilter,
+                pluginName,
+                source));
   }
 
   @Test
@@ -154,7 +322,14 @@
     when(config.getString("replication", null, "instanceLabel")).thenReturn(" ");
     assertThrows(
         NullPointerException.class,
-        () -> new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source));
+        () ->
+            new FetchRestApiClient(
+                credentials,
+                httpClientFactory,
+                replicationConfig,
+                syncRefsFilter,
+                pluginName,
+                source));
   }
 
   @Test
@@ -162,11 +337,26 @@
     when(config.getString("replication", null, "instanceLabel")).thenReturn("");
     assertThrows(
         NullPointerException.class,
-        () -> new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source));
+        () ->
+            new FetchRestApiClient(
+                credentials,
+                httpClientFactory,
+                replicationConfig,
+                syncRefsFilter,
+                pluginName,
+                source));
   }
 
   public String readPayload(HttpPost entity) throws UnsupportedOperationException, IOException {
     ByteBuffer buf = IO.readWholeStream(entity.getEntity().getContent(), 1024);
     return RawParseUtils.decode(buf.array(), buf.arrayOffset(), buf.limit()).trim();
   }
+
+  private RevisionData createSampleRevisionData() {
+    RevisionObjectData commitData =
+        new RevisionObjectData(Constants.OBJ_COMMIT, commitObject.getBytes());
+    RevisionObjectData treeData = new RevisionObjectData(Constants.OBJ_TREE, treeObject.getBytes());
+    RevisionObjectData blobData = new RevisionObjectData(Constants.OBJ_BLOB, blobObject.getBytes());
+    return new RevisionData(commitData, treeData, Lists.newArrayList(blobData));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
new file mode 100644
index 0000000..e528eca
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/FetchRefReplicatedEventHandlerTest.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.googlesource.gerrit.plugins.replication.pull.Context;
+import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FetchRefReplicatedEventHandlerTest {
+  private ChangeIndexer changeIndexerMock;
+  private FetchRefReplicatedEventHandler fetchRefReplicatedEventHandler;
+
+  @Before
+  public void setUp() throws Exception {
+    changeIndexerMock = mock(ChangeIndexer.class);
+    fetchRefReplicatedEventHandler = new FetchRefReplicatedEventHandler(changeIndexerMock);
+  }
+
+  @Test
+  public void onEventShouldIndexExistingChange() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    try {
+      Context.setLocalEvent(true);
+      fetchRefReplicatedEventHandler.onEvent(
+          new FetchRefReplicatedEvent(
+              projectNameKey.get(),
+              ref,
+              "aSourceNode",
+              ReplicationState.RefFetchResult.SUCCEEDED,
+              RefUpdate.Result.FAST_FORWARD));
+      verify(changeIndexerMock, times(1)).index(eq(projectNameKey), eq(changeId));
+    } finally {
+      Context.unsetLocalEvent();
+    }
+  }
+
+  @Test
+  public void onEventShouldNotIndexIfNotLocalEvent() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    fetchRefReplicatedEventHandler.onEvent(
+        new FetchRefReplicatedEvent(
+            projectNameKey.get(),
+            ref,
+            "aSourceNode",
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.FAST_FORWARD));
+    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
+  }
+
+  @Test
+  public void onEventShouldIndexOnlyMetaRef() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/1";
+    Change.Id changeId = Change.Id.fromRef(ref);
+    fetchRefReplicatedEventHandler.onEvent(
+        new FetchRefReplicatedEvent(
+            projectNameKey.get(),
+            ref,
+            "aSourceNode",
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.FAST_FORWARD));
+    verify(changeIndexerMock, never()).index(eq(projectNameKey), eq(changeId));
+  }
+
+  @Test
+  public void onEventShouldNotIndexMissingChange() {
+    fetchRefReplicatedEventHandler.onEvent(
+        new FetchRefReplicatedEvent(
+            Project.nameKey("testProject").get(),
+            "invalidRef",
+            "aSourceNode",
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.FAST_FORWARD));
+    verify(changeIndexerMock, never()).index(any(), any());
+  }
+
+  @Test
+  public void onEventShouldNotIndexFailingChange() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    fetchRefReplicatedEventHandler.onEvent(
+        new FetchRefReplicatedEvent(
+            projectNameKey.get(),
+            ref,
+            "aSourceNode",
+            ReplicationState.RefFetchResult.FAILED,
+            RefUpdate.Result.FAST_FORWARD));
+    verify(changeIndexerMock, never()).index(any(), any());
+  }
+
+  @Test
+  public void onEventShouldNotIndexNotAttemptedChange() {
+    Project.NameKey projectNameKey = Project.nameKey("testProject");
+    String ref = "refs/changes/41/41/meta";
+    fetchRefReplicatedEventHandler.onEvent(
+        new FetchRefReplicatedEvent(
+            projectNameKey.get(),
+            ref,
+            "aSourceNode",
+            ReplicationState.RefFetchResult.NOT_ATTEMPTED,
+            RefUpdate.Result.FAST_FORWARD));
+    verify(changeIndexerMock, never()).index(any(), any());
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java
new file mode 100644
index 0000000..f6cd342
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/fetch/ApplyObjectIT.java
@@ -0,0 +1,200 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Bytes;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Patch;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.api.changes.ReviewInput;
+import com.google.gerrit.extensions.api.changes.ReviewInput.CommentInput;
+import com.google.gerrit.extensions.client.Comment;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.pull.RevisionReader;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.junit.TestRepository;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RefSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObjectIT$TestModule")
+public class ApplyObjectIT extends LightweightPluginDaemonTest {
+  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+
+  @Inject private ProjectOperations projectOperations;
+  @Inject ApplyObject objectUnderTest;
+  RevisionReader reader;
+
+  @Before
+  public void setup() {
+    reader = plugin.getSysInjector().getInstance(RevisionReader.class);
+  }
+
+  @Test
+  public void shouldApplyRefMetaObject() throws Exception {
+    String testRepoProjectName = project + TEST_REPLICATION_SUFFIX;
+    testRepo = cloneProject(createTestProject(testRepoProjectName));
+
+    Result pushResult = createChange();
+    String refName = RefNames.changeMetaRef(pushResult.getChange().getId());
+
+    Optional<RevisionData> revisionData =
+        reader.read(Project.nameKey(testRepoProjectName), refName);
+
+    RefSpec refSpec = new RefSpec(refName);
+    objectUnderTest.apply(project, refSpec, revisionData.get());
+    Optional<RevisionData> newRevisionData = reader.read(project, refName);
+
+    compareObjects(revisionData.get(), newRevisionData);
+
+    try (Repository repo = repoManager.openRepository(project);
+        TestRepository<Repository> testRepo = new TestRepository<>(repo); ) {
+      testRepo.fsck();
+    }
+  }
+
+  @Test
+  public void shouldApplyRefMetaObjectWithComments() throws Exception {
+    String testRepoProjectName = project + TEST_REPLICATION_SUFFIX;
+    testRepo = cloneProject(createTestProject(testRepoProjectName));
+
+    Result pushResult = createChange();
+    Change.Id changeId = pushResult.getChange().getId();
+    String refName = RefNames.changeMetaRef(changeId);
+    RefSpec refSpec = new RefSpec(refName);
+
+    Optional<RevisionData> revisionData =
+        reader.read(Project.nameKey(testRepoProjectName), refName);
+    objectUnderTest.apply(project, refSpec, revisionData.get());
+
+    ReviewInput reviewInput = new ReviewInput();
+    CommentInput comment = createCommentInput(1, 0, 1, 1, "Test comment");
+    reviewInput.comments = ImmutableMap.of(Patch.COMMIT_MSG, ImmutableList.of(comment));
+    gApi.changes().id(changeId.get()).current().review(reviewInput);
+
+    Optional<RevisionData> revisionDataWithComment =
+        reader.read(Project.nameKey(testRepoProjectName), refName);
+
+    objectUnderTest.apply(project, refSpec, revisionDataWithComment.get());
+
+    Optional<RevisionData> newRevisionData = reader.read(project, refName);
+
+    compareObjects(revisionDataWithComment.get(), newRevisionData);
+
+    try (Repository repo = repoManager.openRepository(project);
+        TestRepository<Repository> testRepo = new TestRepository<>(repo)) {
+      testRepo.fsck();
+    }
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenParentCommitObjectIsMissing() throws Exception {
+    String testRepoProjectName = project + TEST_REPLICATION_SUFFIX;
+    testRepo = cloneProject(createTestProject(testRepoProjectName));
+
+    Result pushResult = createChange();
+    Change.Id changeId = pushResult.getChange().getId();
+    String refName = RefNames.changeMetaRef(changeId);
+
+    CommentInput comment = createCommentInput(1, 0, 1, 1, "Test comment");
+    ReviewInput reviewInput = new ReviewInput();
+    reviewInput.comments = ImmutableMap.of(Patch.COMMIT_MSG, ImmutableList.of(comment));
+    gApi.changes().id(changeId.get()).current().review(reviewInput);
+
+    Optional<RevisionData> revisionData =
+        reader.read(Project.nameKey(testRepoProjectName), refName);
+
+    RefSpec refSpec = new RefSpec(refName);
+    assertThrows(
+        MissingParentObjectException.class,
+        () -> objectUnderTest.apply(project, refSpec, revisionData.get()));
+  }
+
+  private void compareObjects(RevisionData expected, Optional<RevisionData> actualOption) {
+    assertThat(actualOption.isPresent()).isTrue();
+    RevisionData actual = actualOption.get();
+    compareContent(expected.getCommitObject(), actual.getCommitObject());
+    compareContent(expected.getTreeObject(), expected.getTreeObject());
+    List<List<Byte>> actualBlobs =
+        actual.getBlobs().stream()
+            .map(revision -> Bytes.asList(revision.getContent()))
+            .collect(Collectors.toList());
+    List<List<Byte>> expectedBlobs =
+        expected.getBlobs().stream()
+            .map(revision -> Bytes.asList(revision.getContent()))
+            .collect(Collectors.toList());
+    assertThat(actualBlobs).containsExactlyElementsIn(expectedBlobs);
+  }
+
+  private void compareContent(RevisionObjectData expected, RevisionObjectData actual) {
+    assertThat(actual.getType()).isEqualTo(expected.getType());
+    assertThat(Bytes.asList(actual.getContent()))
+        .containsExactlyElementsIn(Bytes.asList(expected.getContent()))
+        .inOrder();
+  }
+
+  private CommentInput createCommentInput(
+      int startLine, int startCharacter, int endLine, int endCharacter, String message) {
+    CommentInput comment = new CommentInput();
+    comment.range = new Comment.Range();
+    comment.range.startLine = startLine;
+    comment.range.startCharacter = startCharacter;
+    comment.range.endLine = endLine;
+    comment.range.endCharacter = endCharacter;
+    comment.message = message;
+    comment.path = Patch.COMMIT_MSG;
+    return comment;
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  @SuppressWarnings("unused")
+  private static class TestModule extends FactoryModule {
+    @Override
+    protected void configure() {
+      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+      bind(RevisionReader.class).in(Scopes.SINGLETON);
+      bind(ApplyObject.class);
+    }
+  }
+}