Stop creating redundant replication tasks from stream-events

Currently the broker-based ref-update triggers a replication
task once the event is received from the topic, even if the
apply-object was executed successfully beforehand.

To avoid this problem, an apply-objects cache has been introduced to
store temporally the apply-objects already successfully applied.

Bug: Issue 16885
Change-Id: Ib864bd0ae337f6b93308dfea155a22e1955409c3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectCacheModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectCacheModule.java
new file mode 100644
index 0000000..944326e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectCacheModule.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.server.cache.CacheModule;
+import java.time.Duration;
+
+public class ApplyObjectCacheModule extends CacheModule {
+  public static final String APPLY_OBJECTS_CACHE = "apply_objects";
+  public static final Duration APPLY_OBJECTS_CACHE_MAX_AGE = Duration.ofMinutes(1);
+
+  @Override
+  protected void configure() {
+    cache(APPLY_OBJECTS_CACHE, ApplyObjectsCacheKey.class, Long.class)
+        .expireAfterWrite(APPLY_OBJECTS_CACHE_MAX_AGE);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectsCacheKey.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectsCacheKey.java
new file mode 100644
index 0000000..9e83b4d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectsCacheKey.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class ApplyObjectsCacheKey {
+
+  public static ApplyObjectsCacheKey create(String objectId, String refName, String project) {
+    return new AutoValue_ApplyObjectsCacheKey(objectId, refName, project);
+  }
+
+  public abstract String objectId();
+
+  public abstract String refName();
+
+  public abstract String project();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index ed3aecb..d1c5ca1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -80,6 +80,7 @@
     bind(RevisionReader.class).in(Scopes.SINGLETON);
     bind(ApplyObject.class);
     install(new FactoryModuleBuilder().build(FetchJob.Factory.class));
+    install(new ApplyObjectCacheModule());
     install(new PullReplicationApiModule());
 
     install(new FetchRefReplicatedEventModule());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
index 04cc9e8..b3379f2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
@@ -97,7 +97,11 @@
       }
 
       applyObjectCommand.applyObject(
-          resource.getNameKey(), input.getRefName(), input.getRevisionData(), input.getLabel());
+          resource.getNameKey(),
+          input.getRefName(),
+          input.getRevisionData(),
+          input.getLabel(),
+          input.getEventCreatedOn());
       return Response.created(input);
     } catch (MissingParentObjectException e) {
       repLog.error(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
index b27d8b7..5f368c5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -14,9 +14,11 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ApplyObjectCacheModule.APPLY_OBJECTS_CACHE;
 import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
+import com.google.common.cache.Cache;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
@@ -25,19 +27,18 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
-import com.googlesource.gerrit.plugins.replication.pull.Context;
-import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
-import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.*;
 import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.transport.RefSpec;
@@ -45,7 +46,7 @@
 public class ApplyObjectCommand {
 
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
+  private final Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache;
   private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
       ImmutableSet.of(
           RefUpdate.Result.NEW,
@@ -63,21 +64,31 @@
       PullReplicationStateLogger fetchStateLog,
       ApplyObject applyObject,
       ApplyObjectMetrics metrics,
-      DynamicItem<EventDispatcher> eventDispatcher) {
+      DynamicItem<EventDispatcher> eventDispatcher,
+      @Named(APPLY_OBJECTS_CACHE) Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache) {
     this.fetchStateLog = fetchStateLog;
     this.applyObject = applyObject;
     this.metrics = metrics;
     this.eventDispatcher = eventDispatcher;
+    this.refUpdatesSucceededCache = refUpdatesSucceededCache;
   }
 
   public void applyObject(
-      Project.NameKey name, String refName, RevisionData revisionsData, String sourceLabel)
+      Project.NameKey name,
+      String refName,
+      RevisionData revisionsData,
+      String sourceLabel,
+      long eventCreatedOn)
       throws IOException, RefUpdateException, MissingParentObjectException {
-    applyObjects(name, refName, new RevisionData[] {revisionsData}, sourceLabel);
+    applyObjects(name, refName, new RevisionData[] {revisionsData}, sourceLabel, eventCreatedOn);
   }
 
   public void applyObjects(
-      Project.NameKey name, String refName, RevisionData[] revisionsData, String sourceLabel)
+      Project.NameKey name,
+      String refName,
+      RevisionData[] revisionsData,
+      String sourceLabel,
+      long eventCreatedOn)
       throws IOException, RefUpdateException, MissingParentObjectException {
 
     repLog.info(
@@ -89,7 +100,26 @@
     Timer1.Context<String> context = metrics.start(sourceLabel);
 
     RefUpdateState refUpdateState = applyObject.apply(name, new RefSpec(refName), revisionsData);
+    Boolean isRefUpdateSuccessful = isSuccessful(refUpdateState.getResult());
 
+    if (isRefUpdateSuccessful) {
+      for (RevisionData revisionData : revisionsData) {
+        RevisionObjectData commitObj = revisionData.getCommitObject();
+        List<RevisionObjectData> blobs = revisionData.getBlobs();
+
+        if (commitObj != null) {
+          refUpdatesSucceededCache.put(
+              ApplyObjectsCacheKey.create(
+                  revisionData.getCommitObject().getSha1(), refName, name.get()),
+              eventCreatedOn);
+        } else if (blobs != null) {
+          for (RevisionObjectData blob : blobs) {
+            refUpdatesSucceededCache.put(
+                ApplyObjectsCacheKey.create(blob.getSha1(), refName, name.get()), eventCreatedOn);
+          }
+        }
+      }
+    }
     long elapsed = NANOSECONDS.toMillis(context.stop());
 
     try {
@@ -110,7 +140,7 @@
       Context.unsetLocalEvent();
     }
 
-    if (!isSuccessful(refUpdateState.getResult())) {
+    if (!isRefUpdateSuccessful) {
       String message =
           String.format(
               "RefUpdate failed with result %s for: sourceLcabel=%s, project=%s, refName=%s",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
index a1e1f5b..dd155d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
@@ -97,7 +97,11 @@
       }
 
       command.applyObjects(
-          resource.getNameKey(), input.getRefName(), input.getRevisionsData(), input.getLabel());
+          resource.getNameKey(),
+          input.getRefName(),
+          input.getRevisionsData(),
+          input.getLabel(),
+          input.getEventCreatedOn());
       return Response.created(input);
     } catch (MissingParentObjectException e) {
       repLog.error(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
index 2ea8a33..b14d4a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -14,9 +14,11 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.event;
 
+import static com.googlesource.gerrit.plugins.replication.pull.ApplyObjectCacheModule.APPLY_OBJECTS_CACHE;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.entities.Project.NameKey;
@@ -24,6 +26,7 @@
 import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.data.RefUpdateAttribute;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gerrit.server.events.ProjectCreatedEvent;
@@ -33,6 +36,8 @@
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
 import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
@@ -59,6 +64,7 @@
   private final SourcesCollection sources;
   private final String instanceId;
   private final WorkQueue workQueue;
+  private final Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache;
 
   @Inject
   public StreamEventListener(
@@ -69,7 +75,8 @@
       FetchJob.Factory fetchJobFactory,
       Provider<PullReplicationApiRequestMetrics> metricsProvider,
       SourcesCollection sources,
-      ExcludedRefsFilter excludedRefsFilter) {
+      ExcludedRefsFilter excludedRefsFilter,
+      @Named(APPLY_OBJECTS_CACHE) Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache) {
     this.instanceId = instanceId;
     this.deleteCommand = deleteCommand;
     this.projectInitializationAction = projectInitializationAction;
@@ -78,6 +85,7 @@
     this.metricsProvider = metricsProvider;
     this.sources = sources;
     this.refsFilter = excludedRefsFilter;
+    this.refUpdatesSucceededCache = refUpdatesSucceededCache;
 
     requireNonNull(
         Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
@@ -120,6 +128,17 @@
         return;
       }
 
+      if (isApplyObjectsCacheHit(refUpdatedEvent)) {
+        logger.atFine().log(
+            "Skipping refupdate '%s'  '%s'=>'%s' (eventCreatedOn=%d) for project '%s' because has been already replicated via apply-object",
+            refUpdatedEvent.getRefName(),
+            refUpdatedEvent.refUpdate.get().oldRev,
+            refUpdatedEvent.refUpdate.get().newRev,
+            refUpdatedEvent.eventCreatedOn,
+            refUpdatedEvent.getProjectNameKey());
+        return;
+      }
+
       fetchRefsAsync(
           refUpdatedEvent.getRefName(),
           refUpdatedEvent.instanceId,
@@ -207,4 +226,15 @@
   private String getProjectRepositoryName(ProjectCreatedEvent projectCreatedEvent) {
     return String.format("%s.git", projectCreatedEvent.projectName);
   }
+
+  private boolean isApplyObjectsCacheHit(RefUpdatedEvent refUpdateEvent) {
+    RefUpdateAttribute refUpdateAttribute = refUpdateEvent.refUpdate.get();
+    Long refUpdateSuccededTimestamp =
+        refUpdatesSucceededCache.getIfPresent(
+            ApplyObjectsCacheKey.create(
+                refUpdateAttribute.newRev, refUpdateAttribute.refName, refUpdateAttribute.project));
+
+    return refUpdateSuccededTimestamp != null
+        && refUpdateEvent.eventCreatedOn <= refUpdateSuccededTimestamp;
+  }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index eab5fe6..13b16ea 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -49,6 +49,37 @@
 To manually trigger replication at runtime, see
 SSH command [start](cmd-start.md).
 
+File `gerrit.config`
+-------------------------
+
+cache.@PLUGIN@-apply_objects.maxAge
+:	Maximum age to keep history of the latest successful apply-object refs.
+	Values should use common unit suffixes to express their setting:
+
+	s, sec, second, seconds
+
+	m, min, minute, minutes
+
+	h, hr, hour, hours
+
+	d, day, days
+
+	w, week, weeks (1 week is treated as 7 days)
+
+	mon, month, months (1 month is treated as 30 days)
+
+	y, year, years (1 year is treated as 365 days)
+
+	If a unit suffix is not specified, seconds is assumed. If 0 is supplied, the maximum age
+	is infinite and items are never purged except when the cache is full.
+
+	Default is 60s.
+
+cache.@PLUGIN@-apply_objects.memoryLimit
+:	The maximum number of apply-object refs retained in memory.
+
+	Default is 1024.
+
 File `@PLUGIN@.config`
 -------------------------
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
index 784515e..453a6b0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
@@ -32,7 +32,7 @@
     String payloadWithAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -54,7 +54,7 @@
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -78,7 +78,7 @@
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -102,7 +102,7 @@
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
     NameKey projectName = Project.nameKey("test/repo");
     String refName = createRef(projectName);
     Optional<RevisionData> revisionDataOption = createRevisionData(projectName, refName);
@@ -129,7 +129,7 @@
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -148,7 +148,7 @@
   @GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
   public void shouldReturnBadRequestCodeWhenMandatoryFieldLabelIsMissing() throws Exception {
     String payloadWithoutLabelFieldTemplate =
-        "{\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
+        "{\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -171,7 +171,7 @@
     String wrongPayloadTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true,}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true,}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -196,7 +196,7 @@
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -222,7 +222,7 @@
     String payloadWithoutAsyncFieldTemplate =
         "{\"label\":\""
             + TEST_REPLICATION_REMOTE
-            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+            + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
 
     String refName = createRef();
     Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -245,6 +245,7 @@
         String.format(
             wrongPayloadTemplate,
             refName,
+            revisionData.getCommitObject().getSha1(),
             encode(revisionData.getCommitObject().getContent()),
             encode(revisionData.getTreeObject().getContent()));
     return sendObjectPayload;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
index e575eb9..6ef5b2a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
@@ -17,6 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static org.apache.http.HttpStatus.SC_CREATED;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
@@ -209,7 +210,7 @@
             new MissingParentObjectException(
                 Project.nameKey("test_projects"), refName, ObjectId.zeroId()))
         .when(applyObjectCommand)
-        .applyObject(any(), anyString(), any(), anyString());
+        .applyObject(any(), anyString(), any(), anyString(), anyLong());
 
     applyObjectAction.apply(projectResource, inputParams);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
index d73a6e7..ee49aee 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
@@ -20,6 +20,8 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
@@ -29,6 +31,7 @@
 import com.google.gerrit.server.events.EventDispatcher;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
 import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
 import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -55,10 +58,13 @@
   private static final String TEST_REF_NAME = "refs/changes/01/1/1";
   private static final NameKey TEST_PROJECT_NAME = Project.nameKey("test-project");
   private static final String TEST_REMOTE_NAME = "test-remote-name";
+  private static final long TEST_EVENT_TIMESTAMP = 1L;
 
   private String sampleCommitObjectId = "9f8d52853089a3cf00c02ff7bd0817bd4353a95a";
   private String sampleTreeObjectId = "4b825dc642cb6eb9a060e54bf8d69288fbee4904";
   private String sampleBlobObjectId = "b5d7bcf1d1c5b0f0726d10a16c8315f06f900bfb";
+  private String sampleCommitObjectId2 = "9f8d52853089a3cf00c02ff7bd0817bd4353a95b";
+  private String sampleTreeObjectId2 = "4b825dc642cb6eb9a060e54bf8d69288fbee4905";
 
   @Mock private PullReplicationStateLogger fetchStateLog;
   @Mock private ApplyObject applyObject;
@@ -67,11 +73,13 @@
   @Mock private EventDispatcher eventDispatcher;
   @Mock private Timer1.Context<String> timetContext;
   @Captor ArgumentCaptor<Event> eventCaptor;
+  private Cache<ApplyObjectsCacheKey, Long> cache;
 
   private ApplyObjectCommand objectUnderTest;
 
   @Before
   public void setup() throws MissingParentObjectException, IOException {
+    cache = CacheBuilder.newBuilder().build();
     RefUpdateState state = new RefUpdateState(TEST_REMOTE_NAME, RefUpdate.Result.NEW);
     when(eventDispatcherDataItem.get()).thenReturn(eventDispatcher);
     when(metrics.start(anyString())).thenReturn(timetContext);
@@ -79,15 +87,21 @@
     when(applyObject.apply(any(), any(), any())).thenReturn(state);
 
     objectUnderTest =
-        new ApplyObjectCommand(fetchStateLog, applyObject, metrics, eventDispatcherDataItem);
+        new ApplyObjectCommand(fetchStateLog, applyObject, metrics, eventDispatcherDataItem, cache);
   }
 
   @Test
   public void shouldSendEventWhenApplyObject()
       throws PermissionBackendException, IOException, RefUpdateException,
           MissingParentObjectException {
+    RevisionData sampleRevisionData =
+        createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
     objectUnderTest.applyObject(
-        TEST_PROJECT_NAME, TEST_REF_NAME, createSampleRevisionData(), TEST_SOURCE_LABEL);
+        TEST_PROJECT_NAME,
+        TEST_REF_NAME,
+        sampleRevisionData,
+        TEST_SOURCE_LABEL,
+        TEST_EVENT_TIMESTAMP);
 
     verify(eventDispatcher).postEvent(eventCaptor.capture());
     Event sentEvent = eventCaptor.getValue();
@@ -97,11 +111,64 @@
     assertThat(fetchEvent.getRefName()).isEqualTo(TEST_REF_NAME);
   }
 
-  private RevisionData createSampleRevisionData() {
+  @Test
+  public void shouldInsertIntoApplyObjectsCacheWhenApplyObjectIsSuccessful()
+      throws IOException, RefUpdateException, MissingParentObjectException {
+    RevisionData sampleRevisionData =
+        createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
+    RevisionData sampleRevisionData2 =
+        createSampleRevisionData(sampleCommitObjectId2, sampleTreeObjectId2);
+    objectUnderTest.applyObjects(
+        TEST_PROJECT_NAME,
+        TEST_REF_NAME,
+        new RevisionData[] {sampleRevisionData, sampleRevisionData2},
+        TEST_SOURCE_LABEL,
+        TEST_EVENT_TIMESTAMP);
+
+    assertThat(
+            cache.getIfPresent(
+                ApplyObjectsCacheKey.create(
+                    sampleRevisionData.getCommitObject().getSha1(),
+                    TEST_REF_NAME,
+                    TEST_PROJECT_NAME.get())))
+        .isEqualTo(TEST_EVENT_TIMESTAMP);
+    assertThat(
+            cache.getIfPresent(
+                ApplyObjectsCacheKey.create(
+                    sampleRevisionData2.getCommitObject().getSha1(),
+                    TEST_REF_NAME,
+                    TEST_PROJECT_NAME.get())))
+        .isEqualTo(TEST_EVENT_TIMESTAMP);
+  }
+
+  @Test(expected = RefUpdateException.class)
+  public void shouldNotInsertIntoApplyObjectsCacheWhenApplyObjectIsFailure()
+      throws IOException, RefUpdateException, MissingParentObjectException {
+    RevisionData sampleRevisionData =
+        createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
+    RefUpdateState failureState = new RefUpdateState(TEST_REMOTE_NAME, RefUpdate.Result.IO_FAILURE);
+    when(applyObject.apply(any(), any(), any())).thenReturn(failureState);
+    objectUnderTest.applyObject(
+        TEST_PROJECT_NAME,
+        TEST_REF_NAME,
+        sampleRevisionData,
+        TEST_SOURCE_LABEL,
+        TEST_EVENT_TIMESTAMP);
+
+    assertThat(
+            cache.getIfPresent(
+                ApplyObjectsCacheKey.create(
+                    sampleRevisionData.getCommitObject().getSha1(),
+                    TEST_REF_NAME,
+                    TEST_PROJECT_NAME.get())))
+        .isNull();
+  }
+
+  private RevisionData createSampleRevisionData(String commitObjectId, String treeObjectId) {
     RevisionObjectData commitData =
-        new RevisionObjectData(sampleCommitObjectId, Constants.OBJ_COMMIT, new byte[] {});
+        new RevisionObjectData(commitObjectId, Constants.OBJ_COMMIT, new byte[] {});
     RevisionObjectData treeData =
-        new RevisionObjectData(sampleTreeObjectId, Constants.OBJ_TREE, new byte[] {});
+        new RevisionObjectData(treeObjectId, Constants.OBJ_TREE, new byte[] {});
     return new RevisionData(Collections.emptyList(), commitData, treeData, Lists.newArrayList());
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
index 92314a4..8dc0359 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -21,6 +21,8 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
@@ -32,6 +34,7 @@
 import com.google.gerrit.server.events.RefUpdatedEvent;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
 import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
@@ -60,6 +63,7 @@
   private static final String INSTANCE_ID = "node_instance_id";
   private static final String NEW_REV = "0000000000000000000000000000000000000001";
   private static final String REMOTE_INSTANCE_ID = "remote_node_instance_id";
+  private static final long TEST_EVENT_TIMESTAMP = 1684879097024L;
 
   @Mock private ProjectInitializationAction projectInitializationAction;
   @Mock private WorkQueue workQueue;
@@ -72,11 +76,13 @@
   @Mock private SourcesCollection sources;
   @Mock private Source source;
   @Mock private ExcludedRefsFilter refsFilter;
+  private Cache<ApplyObjectsCacheKey, Long> cache;
 
   private StreamEventListener objectUnderTest;
 
   @Before
   public void setup() {
+    cache = CacheBuilder.newBuilder().build();
     when(workQueue.getDefaultQueue()).thenReturn(executor);
     when(fetchJobFactory.create(eq(Project.nameKey(TEST_PROJECT)), any(), any()))
         .thenReturn(fetchJob);
@@ -95,7 +101,8 @@
             fetchJobFactory,
             () -> metrics,
             sources,
-            refsFilter);
+            refsFilter,
+            cache);
   }
 
   @Test
@@ -257,4 +264,58 @@
 
     verify(executor).submit(any(FetchJob.class));
   }
+
+  @Test
+  public void shouldSkipEventWhenFoundInApplyObjectsCacheWithTheSameTimestamp() {
+    sendRefUpdateEventWithTimestamp(TEST_EVENT_TIMESTAMP, TEST_EVENT_TIMESTAMP);
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldSkipEventWhenFoundInApplyObjectsCacheWithOlderTimestamp() {
+    sendRefUpdateEventWithTimestamp(TEST_EVENT_TIMESTAMP - 1, TEST_EVENT_TIMESTAMP);
+    verify(executor, never()).submit(any(Runnable.class));
+  }
+
+  @Test
+  public void shouldProcessEventWhenFoundInApplyObjectsCacheWithNewerTimestamp() {
+    sendRefUpdateEventWithTimestamp(TEST_EVENT_TIMESTAMP + 1, TEST_EVENT_TIMESTAMP);
+    verify(executor).submit(any(Runnable.class));
+  }
+
+  private void sendRefUpdateEventWithTimestamp(long eventTimestamp, long cachedTimestamp) {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = TEST_REF_NAME;
+    refUpdate.project = TEST_PROJECT;
+    refUpdate.oldRev = ObjectId.zeroId().getName();
+    refUpdate.newRev = NEW_REV;
+    event.eventCreatedOn = eventTimestamp;
+
+    cache.put(
+        ApplyObjectsCacheKey.create(refUpdate.newRev, refUpdate.refName, refUpdate.project),
+        cachedTimestamp);
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+  }
+
+  @Test
+  public void shouldScheduleAllRefsFetchWhenNotFoundInApplyObjectsCache() {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+    refUpdate.refName = TEST_REF_NAME;
+    refUpdate.project = TEST_PROJECT;
+    refUpdate.oldRev = ObjectId.zeroId().getName();
+    refUpdate.newRev = NEW_REV;
+
+    event.instanceId = REMOTE_INSTANCE_ID;
+    event.refUpdate = () -> refUpdate;
+
+    objectUnderTest.onEvent(event);
+
+    verify(executor).submit(any(FetchJob.class));
+  }
 }