Stop creating redundant replication tasks from stream-events
Currently the broker-based ref-update triggers a replication
task once the event is received from the topic, even if the
apply-object was executed successfully beforehand.
To avoid this problem, an apply-objects cache has been introduced to
store temporally the apply-objects already successfully applied.
Bug: Issue 16885
Change-Id: Ib864bd0ae337f6b93308dfea155a22e1955409c3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectCacheModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectCacheModule.java
new file mode 100644
index 0000000..944326e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectCacheModule.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.server.cache.CacheModule;
+import java.time.Duration;
+
+public class ApplyObjectCacheModule extends CacheModule {
+ public static final String APPLY_OBJECTS_CACHE = "apply_objects";
+ public static final Duration APPLY_OBJECTS_CACHE_MAX_AGE = Duration.ofMinutes(1);
+
+ @Override
+ protected void configure() {
+ cache(APPLY_OBJECTS_CACHE, ApplyObjectsCacheKey.class, Long.class)
+ .expireAfterWrite(APPLY_OBJECTS_CACHE_MAX_AGE);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectsCacheKey.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectsCacheKey.java
new file mode 100644
index 0000000..9e83b4d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ApplyObjectsCacheKey.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class ApplyObjectsCacheKey {
+
+ public static ApplyObjectsCacheKey create(String objectId, String refName, String project) {
+ return new AutoValue_ApplyObjectsCacheKey(objectId, refName, project);
+ }
+
+ public abstract String objectId();
+
+ public abstract String refName();
+
+ public abstract String project();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index ed3aecb..d1c5ca1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -80,6 +80,7 @@
bind(RevisionReader.class).in(Scopes.SINGLETON);
bind(ApplyObject.class);
install(new FactoryModuleBuilder().build(FetchJob.Factory.class));
+ install(new ApplyObjectCacheModule());
install(new PullReplicationApiModule());
install(new FetchRefReplicatedEventModule());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
index 04cc9e8..b3379f2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectAction.java
@@ -97,7 +97,11 @@
}
applyObjectCommand.applyObject(
- resource.getNameKey(), input.getRefName(), input.getRevisionData(), input.getLabel());
+ resource.getNameKey(),
+ input.getRefName(),
+ input.getRevisionData(),
+ input.getLabel(),
+ input.getEventCreatedOn());
return Response.created(input);
} catch (MissingParentObjectException e) {
repLog.error(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
index b27d8b7..5f368c5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommand.java
@@ -14,9 +14,11 @@
package com.googlesource.gerrit.plugins.replication.pull.api;
+import static com.googlesource.gerrit.plugins.replication.pull.ApplyObjectCacheModule.APPLY_OBJECTS_CACHE;
import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
@@ -25,19 +27,18 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
-import com.googlesource.gerrit.plugins.replication.pull.Context;
-import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
-import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
-import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.*;
import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
+import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException;
import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.transport.RefSpec;
@@ -45,7 +46,7 @@
public class ApplyObjectCommand {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
+ private final Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache;
private static final Set<RefUpdate.Result> SUCCESSFUL_RESULTS =
ImmutableSet.of(
RefUpdate.Result.NEW,
@@ -63,21 +64,31 @@
PullReplicationStateLogger fetchStateLog,
ApplyObject applyObject,
ApplyObjectMetrics metrics,
- DynamicItem<EventDispatcher> eventDispatcher) {
+ DynamicItem<EventDispatcher> eventDispatcher,
+ @Named(APPLY_OBJECTS_CACHE) Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache) {
this.fetchStateLog = fetchStateLog;
this.applyObject = applyObject;
this.metrics = metrics;
this.eventDispatcher = eventDispatcher;
+ this.refUpdatesSucceededCache = refUpdatesSucceededCache;
}
public void applyObject(
- Project.NameKey name, String refName, RevisionData revisionsData, String sourceLabel)
+ Project.NameKey name,
+ String refName,
+ RevisionData revisionsData,
+ String sourceLabel,
+ long eventCreatedOn)
throws IOException, RefUpdateException, MissingParentObjectException {
- applyObjects(name, refName, new RevisionData[] {revisionsData}, sourceLabel);
+ applyObjects(name, refName, new RevisionData[] {revisionsData}, sourceLabel, eventCreatedOn);
}
public void applyObjects(
- Project.NameKey name, String refName, RevisionData[] revisionsData, String sourceLabel)
+ Project.NameKey name,
+ String refName,
+ RevisionData[] revisionsData,
+ String sourceLabel,
+ long eventCreatedOn)
throws IOException, RefUpdateException, MissingParentObjectException {
repLog.info(
@@ -89,7 +100,26 @@
Timer1.Context<String> context = metrics.start(sourceLabel);
RefUpdateState refUpdateState = applyObject.apply(name, new RefSpec(refName), revisionsData);
+ Boolean isRefUpdateSuccessful = isSuccessful(refUpdateState.getResult());
+ if (isRefUpdateSuccessful) {
+ for (RevisionData revisionData : revisionsData) {
+ RevisionObjectData commitObj = revisionData.getCommitObject();
+ List<RevisionObjectData> blobs = revisionData.getBlobs();
+
+ if (commitObj != null) {
+ refUpdatesSucceededCache.put(
+ ApplyObjectsCacheKey.create(
+ revisionData.getCommitObject().getSha1(), refName, name.get()),
+ eventCreatedOn);
+ } else if (blobs != null) {
+ for (RevisionObjectData blob : blobs) {
+ refUpdatesSucceededCache.put(
+ ApplyObjectsCacheKey.create(blob.getSha1(), refName, name.get()), eventCreatedOn);
+ }
+ }
+ }
+ }
long elapsed = NANOSECONDS.toMillis(context.stop());
try {
@@ -110,7 +140,7 @@
Context.unsetLocalEvent();
}
- if (!isSuccessful(refUpdateState.getResult())) {
+ if (!isRefUpdateSuccessful) {
String message =
String.format(
"RefUpdate failed with result %s for: sourceLcabel=%s, project=%s, refName=%s",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
index a1e1f5b..dd155d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectsAction.java
@@ -97,7 +97,11 @@
}
command.applyObjects(
- resource.getNameKey(), input.getRefName(), input.getRevisionsData(), input.getLabel());
+ resource.getNameKey(),
+ input.getRefName(),
+ input.getRevisionsData(),
+ input.getLabel(),
+ input.getEventCreatedOn());
return Response.created(input);
} catch (MissingParentObjectException e) {
repLog.error(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
index 2ea8a33..b14d4a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -14,9 +14,11 @@
package com.googlesource.gerrit.plugins.replication.pull.event;
+import static com.googlesource.gerrit.plugins.replication.pull.ApplyObjectCacheModule.APPLY_OBJECTS_CACHE;
import static java.util.Objects.requireNonNull;
import com.google.common.base.Strings;
+import com.google.common.cache.Cache;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.Project.NameKey;
@@ -24,6 +26,7 @@
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.data.RefUpdateAttribute;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectCreatedEvent;
@@ -33,6 +36,8 @@
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.inject.Inject;
import com.google.inject.Provider;
+import com.google.inject.name.Named;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
import com.googlesource.gerrit.plugins.replication.pull.Source;
import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
@@ -59,6 +64,7 @@
private final SourcesCollection sources;
private final String instanceId;
private final WorkQueue workQueue;
+ private final Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache;
@Inject
public StreamEventListener(
@@ -69,7 +75,8 @@
FetchJob.Factory fetchJobFactory,
Provider<PullReplicationApiRequestMetrics> metricsProvider,
SourcesCollection sources,
- ExcludedRefsFilter excludedRefsFilter) {
+ ExcludedRefsFilter excludedRefsFilter,
+ @Named(APPLY_OBJECTS_CACHE) Cache<ApplyObjectsCacheKey, Long> refUpdatesSucceededCache) {
this.instanceId = instanceId;
this.deleteCommand = deleteCommand;
this.projectInitializationAction = projectInitializationAction;
@@ -78,6 +85,7 @@
this.metricsProvider = metricsProvider;
this.sources = sources;
this.refsFilter = excludedRefsFilter;
+ this.refUpdatesSucceededCache = refUpdatesSucceededCache;
requireNonNull(
Strings.emptyToNull(this.instanceId), "gerrit.instanceId cannot be null or empty");
@@ -120,6 +128,17 @@
return;
}
+ if (isApplyObjectsCacheHit(refUpdatedEvent)) {
+ logger.atFine().log(
+ "Skipping refupdate '%s' '%s'=>'%s' (eventCreatedOn=%d) for project '%s' because has been already replicated via apply-object",
+ refUpdatedEvent.getRefName(),
+ refUpdatedEvent.refUpdate.get().oldRev,
+ refUpdatedEvent.refUpdate.get().newRev,
+ refUpdatedEvent.eventCreatedOn,
+ refUpdatedEvent.getProjectNameKey());
+ return;
+ }
+
fetchRefsAsync(
refUpdatedEvent.getRefName(),
refUpdatedEvent.instanceId,
@@ -207,4 +226,15 @@
private String getProjectRepositoryName(ProjectCreatedEvent projectCreatedEvent) {
return String.format("%s.git", projectCreatedEvent.projectName);
}
+
+ private boolean isApplyObjectsCacheHit(RefUpdatedEvent refUpdateEvent) {
+ RefUpdateAttribute refUpdateAttribute = refUpdateEvent.refUpdate.get();
+ Long refUpdateSuccededTimestamp =
+ refUpdatesSucceededCache.getIfPresent(
+ ApplyObjectsCacheKey.create(
+ refUpdateAttribute.newRev, refUpdateAttribute.refName, refUpdateAttribute.project));
+
+ return refUpdateSuccededTimestamp != null
+ && refUpdateEvent.eventCreatedOn <= refUpdateSuccededTimestamp;
+ }
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index eab5fe6..13b16ea 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -49,6 +49,37 @@
To manually trigger replication at runtime, see
SSH command [start](cmd-start.md).
+File `gerrit.config`
+-------------------------
+
+cache.@PLUGIN@-apply_objects.maxAge
+: Maximum age to keep history of the latest successful apply-object refs.
+ Values should use common unit suffixes to express their setting:
+
+ s, sec, second, seconds
+
+ m, min, minute, minutes
+
+ h, hr, hour, hours
+
+ d, day, days
+
+ w, week, weeks (1 week is treated as 7 days)
+
+ mon, month, months (1 month is treated as 30 days)
+
+ y, year, years (1 year is treated as 365 days)
+
+ If a unit suffix is not specified, seconds is assumed. If 0 is supplied, the maximum age
+ is infinite and items are never purged except when the cache is full.
+
+ Default is 60s.
+
+cache.@PLUGIN@-apply_objects.memoryLimit
+: The maximum number of apply-object refs retained in memory.
+
+ Default is 1024.
+
File `@PLUGIN@.config`
-------------------------
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
index 784515e..453a6b0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionIT.java
@@ -32,7 +32,7 @@
String payloadWithAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -54,7 +54,7 @@
String payloadWithoutAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -78,7 +78,7 @@
String payloadWithoutAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -102,7 +102,7 @@
String payloadWithoutAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
NameKey projectName = Project.nameKey("test/repo");
String refName = createRef(projectName);
Optional<RevisionData> revisionDataOption = createRevisionData(projectName, refName);
@@ -129,7 +129,7 @@
String payloadWithoutAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -148,7 +148,7 @@
@GerritConfig(name = "gerrit.instanceId", value = "testInstanceId")
public void shouldReturnBadRequestCodeWhenMandatoryFieldLabelIsMissing() throws Exception {
String payloadWithoutLabelFieldTemplate =
- "{\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
+ "{\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -171,7 +171,7 @@
String wrongPayloadTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true,}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}, \"async\":true,}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -196,7 +196,7 @@
String payloadWithoutAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -222,7 +222,7 @@
String payloadWithoutAsyncFieldTemplate =
"{\"label\":\""
+ TEST_REPLICATION_REMOTE
- + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
+ + "\",\"ref_name\":\"%s\",\"revision_data\":{\"commit_object\":{\"sha1\":\"%s\",\"type\":1,\"content\":\"%s\"},\"tree_object\":{\"type\":2,\"content\":\"%s\"},\"blobs\":[]}}";
String refName = createRef();
Optional<RevisionData> revisionDataOption = createRevisionData(refName);
@@ -245,6 +245,7 @@
String.format(
wrongPayloadTemplate,
refName,
+ revisionData.getCommitObject().getSha1(),
encode(revisionData.getCommitObject().getContent()),
encode(revisionData.getTreeObject().getContent()));
return sendObjectPayload;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
index e575eb9..6ef5b2a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectActionTest.java
@@ -17,6 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.apache.http.HttpStatus.SC_CREATED;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
@@ -209,7 +210,7 @@
new MissingParentObjectException(
Project.nameKey("test_projects"), refName, ObjectId.zeroId()))
.when(applyObjectCommand)
- .applyObject(any(), anyString(), any(), anyString());
+ .applyObject(any(), anyString(), any(), anyString(), anyLong());
applyObjectAction.apply(projectResource, inputParams);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
index d73a6e7..ee49aee 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/ApplyObjectCommandTest.java
@@ -20,6 +20,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.Project.NameKey;
@@ -29,6 +31,7 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectMetrics;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
import com.googlesource.gerrit.plugins.replication.pull.FetchRefReplicatedEvent;
import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -55,10 +58,13 @@
private static final String TEST_REF_NAME = "refs/changes/01/1/1";
private static final NameKey TEST_PROJECT_NAME = Project.nameKey("test-project");
private static final String TEST_REMOTE_NAME = "test-remote-name";
+ private static final long TEST_EVENT_TIMESTAMP = 1L;
private String sampleCommitObjectId = "9f8d52853089a3cf00c02ff7bd0817bd4353a95a";
private String sampleTreeObjectId = "4b825dc642cb6eb9a060e54bf8d69288fbee4904";
private String sampleBlobObjectId = "b5d7bcf1d1c5b0f0726d10a16c8315f06f900bfb";
+ private String sampleCommitObjectId2 = "9f8d52853089a3cf00c02ff7bd0817bd4353a95b";
+ private String sampleTreeObjectId2 = "4b825dc642cb6eb9a060e54bf8d69288fbee4905";
@Mock private PullReplicationStateLogger fetchStateLog;
@Mock private ApplyObject applyObject;
@@ -67,11 +73,13 @@
@Mock private EventDispatcher eventDispatcher;
@Mock private Timer1.Context<String> timetContext;
@Captor ArgumentCaptor<Event> eventCaptor;
+ private Cache<ApplyObjectsCacheKey, Long> cache;
private ApplyObjectCommand objectUnderTest;
@Before
public void setup() throws MissingParentObjectException, IOException {
+ cache = CacheBuilder.newBuilder().build();
RefUpdateState state = new RefUpdateState(TEST_REMOTE_NAME, RefUpdate.Result.NEW);
when(eventDispatcherDataItem.get()).thenReturn(eventDispatcher);
when(metrics.start(anyString())).thenReturn(timetContext);
@@ -79,15 +87,21 @@
when(applyObject.apply(any(), any(), any())).thenReturn(state);
objectUnderTest =
- new ApplyObjectCommand(fetchStateLog, applyObject, metrics, eventDispatcherDataItem);
+ new ApplyObjectCommand(fetchStateLog, applyObject, metrics, eventDispatcherDataItem, cache);
}
@Test
public void shouldSendEventWhenApplyObject()
throws PermissionBackendException, IOException, RefUpdateException,
MissingParentObjectException {
+ RevisionData sampleRevisionData =
+ createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
objectUnderTest.applyObject(
- TEST_PROJECT_NAME, TEST_REF_NAME, createSampleRevisionData(), TEST_SOURCE_LABEL);
+ TEST_PROJECT_NAME,
+ TEST_REF_NAME,
+ sampleRevisionData,
+ TEST_SOURCE_LABEL,
+ TEST_EVENT_TIMESTAMP);
verify(eventDispatcher).postEvent(eventCaptor.capture());
Event sentEvent = eventCaptor.getValue();
@@ -97,11 +111,64 @@
assertThat(fetchEvent.getRefName()).isEqualTo(TEST_REF_NAME);
}
- private RevisionData createSampleRevisionData() {
+ @Test
+ public void shouldInsertIntoApplyObjectsCacheWhenApplyObjectIsSuccessful()
+ throws IOException, RefUpdateException, MissingParentObjectException {
+ RevisionData sampleRevisionData =
+ createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
+ RevisionData sampleRevisionData2 =
+ createSampleRevisionData(sampleCommitObjectId2, sampleTreeObjectId2);
+ objectUnderTest.applyObjects(
+ TEST_PROJECT_NAME,
+ TEST_REF_NAME,
+ new RevisionData[] {sampleRevisionData, sampleRevisionData2},
+ TEST_SOURCE_LABEL,
+ TEST_EVENT_TIMESTAMP);
+
+ assertThat(
+ cache.getIfPresent(
+ ApplyObjectsCacheKey.create(
+ sampleRevisionData.getCommitObject().getSha1(),
+ TEST_REF_NAME,
+ TEST_PROJECT_NAME.get())))
+ .isEqualTo(TEST_EVENT_TIMESTAMP);
+ assertThat(
+ cache.getIfPresent(
+ ApplyObjectsCacheKey.create(
+ sampleRevisionData2.getCommitObject().getSha1(),
+ TEST_REF_NAME,
+ TEST_PROJECT_NAME.get())))
+ .isEqualTo(TEST_EVENT_TIMESTAMP);
+ }
+
+ @Test(expected = RefUpdateException.class)
+ public void shouldNotInsertIntoApplyObjectsCacheWhenApplyObjectIsFailure()
+ throws IOException, RefUpdateException, MissingParentObjectException {
+ RevisionData sampleRevisionData =
+ createSampleRevisionData(sampleCommitObjectId, sampleTreeObjectId);
+ RefUpdateState failureState = new RefUpdateState(TEST_REMOTE_NAME, RefUpdate.Result.IO_FAILURE);
+ when(applyObject.apply(any(), any(), any())).thenReturn(failureState);
+ objectUnderTest.applyObject(
+ TEST_PROJECT_NAME,
+ TEST_REF_NAME,
+ sampleRevisionData,
+ TEST_SOURCE_LABEL,
+ TEST_EVENT_TIMESTAMP);
+
+ assertThat(
+ cache.getIfPresent(
+ ApplyObjectsCacheKey.create(
+ sampleRevisionData.getCommitObject().getSha1(),
+ TEST_REF_NAME,
+ TEST_PROJECT_NAME.get())))
+ .isNull();
+ }
+
+ private RevisionData createSampleRevisionData(String commitObjectId, String treeObjectId) {
RevisionObjectData commitData =
- new RevisionObjectData(sampleCommitObjectId, Constants.OBJ_COMMIT, new byte[] {});
+ new RevisionObjectData(commitObjectId, Constants.OBJ_COMMIT, new byte[] {});
RevisionObjectData treeData =
- new RevisionObjectData(sampleTreeObjectId, Constants.OBJ_TREE, new byte[] {});
+ new RevisionObjectData(treeObjectId, Constants.OBJ_TREE, new byte[] {});
return new RevisionData(Collections.emptyList(), commitData, treeData, Lists.newArrayList());
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
index 92314a4..8dc0359 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -21,6 +21,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
@@ -32,6 +34,7 @@
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.ApplyObjectsCacheKey;
import com.googlesource.gerrit.plugins.replication.pull.FetchOne;
import com.googlesource.gerrit.plugins.replication.pull.Source;
import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
@@ -60,6 +63,7 @@
private static final String INSTANCE_ID = "node_instance_id";
private static final String NEW_REV = "0000000000000000000000000000000000000001";
private static final String REMOTE_INSTANCE_ID = "remote_node_instance_id";
+ private static final long TEST_EVENT_TIMESTAMP = 1684879097024L;
@Mock private ProjectInitializationAction projectInitializationAction;
@Mock private WorkQueue workQueue;
@@ -72,11 +76,13 @@
@Mock private SourcesCollection sources;
@Mock private Source source;
@Mock private ExcludedRefsFilter refsFilter;
+ private Cache<ApplyObjectsCacheKey, Long> cache;
private StreamEventListener objectUnderTest;
@Before
public void setup() {
+ cache = CacheBuilder.newBuilder().build();
when(workQueue.getDefaultQueue()).thenReturn(executor);
when(fetchJobFactory.create(eq(Project.nameKey(TEST_PROJECT)), any(), any()))
.thenReturn(fetchJob);
@@ -95,7 +101,8 @@
fetchJobFactory,
() -> metrics,
sources,
- refsFilter);
+ refsFilter,
+ cache);
}
@Test
@@ -257,4 +264,58 @@
verify(executor).submit(any(FetchJob.class));
}
+
+ @Test
+ public void shouldSkipEventWhenFoundInApplyObjectsCacheWithTheSameTimestamp() {
+ sendRefUpdateEventWithTimestamp(TEST_EVENT_TIMESTAMP, TEST_EVENT_TIMESTAMP);
+ verify(executor, never()).submit(any(Runnable.class));
+ }
+
+ @Test
+ public void shouldSkipEventWhenFoundInApplyObjectsCacheWithOlderTimestamp() {
+ sendRefUpdateEventWithTimestamp(TEST_EVENT_TIMESTAMP - 1, TEST_EVENT_TIMESTAMP);
+ verify(executor, never()).submit(any(Runnable.class));
+ }
+
+ @Test
+ public void shouldProcessEventWhenFoundInApplyObjectsCacheWithNewerTimestamp() {
+ sendRefUpdateEventWithTimestamp(TEST_EVENT_TIMESTAMP + 1, TEST_EVENT_TIMESTAMP);
+ verify(executor).submit(any(Runnable.class));
+ }
+
+ private void sendRefUpdateEventWithTimestamp(long eventTimestamp, long cachedTimestamp) {
+ RefUpdatedEvent event = new RefUpdatedEvent();
+ RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+ refUpdate.refName = TEST_REF_NAME;
+ refUpdate.project = TEST_PROJECT;
+ refUpdate.oldRev = ObjectId.zeroId().getName();
+ refUpdate.newRev = NEW_REV;
+ event.eventCreatedOn = eventTimestamp;
+
+ cache.put(
+ ApplyObjectsCacheKey.create(refUpdate.newRev, refUpdate.refName, refUpdate.project),
+ cachedTimestamp);
+
+ event.instanceId = REMOTE_INSTANCE_ID;
+ event.refUpdate = () -> refUpdate;
+
+ objectUnderTest.onEvent(event);
+ }
+
+ @Test
+ public void shouldScheduleAllRefsFetchWhenNotFoundInApplyObjectsCache() {
+ RefUpdatedEvent event = new RefUpdatedEvent();
+ RefUpdateAttribute refUpdate = new RefUpdateAttribute();
+ refUpdate.refName = TEST_REF_NAME;
+ refUpdate.project = TEST_PROJECT;
+ refUpdate.oldRev = ObjectId.zeroId().getName();
+ refUpdate.newRev = NEW_REV;
+
+ event.instanceId = REMOTE_INSTANCE_ID;
+ event.refUpdate = () -> refUpdate;
+
+ objectUnderTest.onEvent(event);
+
+ verify(executor).submit(any(FetchJob.class));
+ }
}