Introduce `apply-batch-object` REST API endpoint Add `batch-apply-object` REST API endpoint which allows to send multiple refs as a single call. This allows to preserve the refs update order and improves the performance. Bug: Issue 40015567 Change-Id: I843902c639829e04c4964b1f6c86602f1e554a6c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java index bcbd73d..54cc6f4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -37,6 +37,7 @@ import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.ObservableQueue; import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing; +import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData; import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException; import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient; @@ -44,8 +45,8 @@ import com.googlesource.gerrit.plugins.replication.pull.filter.ApplyObjectsRefsFilter; import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URISyntaxException; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -57,9 +58,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.eclipse.jgit.errors.CorruptObjectException; import org.eclipse.jgit.errors.IncorrectObjectTypeException; -import org.eclipse.jgit.errors.InvalidObjectIdException; import org.eclipse.jgit.errors.MissingObjectException; import org.eclipse.jgit.errors.RepositoryNotFoundException; import org.eclipse.jgit.lib.Config; @@ -89,7 +90,7 @@ private final Provider<SourcesCollection> sources; // For Guice circular dependency private volatile boolean running; private volatile boolean replaying; - private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; + private final Queue<ReferenceBatchUpdatedEvent> beforeStartupEventsQueue; private FetchApiClient.Factory fetchClientFactory; private Integer fetchCallsTimeout; private ExcludedRefsFilter refsFilter; @@ -175,14 +176,30 @@ if (e.type.equals(BATCH_REF_UPDATED_EVENT_TYPE)) { BatchRefUpdateEvent event = (BatchRefUpdateEvent) e; - event.refUpdates.get().stream() - .sorted(ReplicationQueue::sortByMetaRefAsLast) - .forEachOrdered( - refUpdateAttribute -> { - if (isRefToBeReplicated(refUpdateAttribute.refName)) { - fireRefUpdate(refUpdateAttribute, e.eventCreatedOn); - } - }); + long eventCreatedOn = e.eventCreatedOn; + List<ReferenceUpdatedEvent> refs = + event.refUpdates.get().stream() + .filter(u -> isRefToBeReplicated(u.refName)) + .map( + u -> { + repLog.info( + "Ref event received: {} on project {}:{} - {} => {}", + refUpdateType(u.oldRev, u.newRev), + event.getProjectNameKey().get(), + u.refName, + u.oldRev, + u.newRev); + return ReferenceUpdatedEvent.from(u, eventCreatedOn); + }) + .sorted(ReplicationQueue::sortByMetaRefAsLast) + .collect(Collectors.toList()); + + if (!refs.isEmpty()) { + ReferenceBatchUpdatedEvent referenceBatchUpdatedEvent = + ReferenceBatchUpdatedEvent.create( + event.getProjectNameKey().get(), refs, eventCreatedOn); + fire(referenceBatchUpdatedEvent); + } } return; } @@ -191,22 +208,25 @@ RefUpdatedEvent event = (RefUpdatedEvent) e; if (isRefToBeReplicated(event.getRefName())) { - fireRefUpdate(event.refUpdate.get(), event.eventCreatedOn); + RefUpdateAttribute refUpdateAttribute = event.refUpdate.get(); + repLog.info( + "Ref event received: {} on project {}:{} - {} => {}", + refUpdateType(refUpdateAttribute.oldRev, refUpdateAttribute.newRev), + event.getProjectNameKey().get(), + refUpdateAttribute.refName, + refUpdateAttribute.oldRev, + refUpdateAttribute.newRev); + + ReferenceBatchUpdatedEvent referenceBatchUpdatedEvent = + ReferenceBatchUpdatedEvent.create( + event.getProjectNameKey().get(), + List.of(ReferenceUpdatedEvent.from(refUpdateAttribute, e.eventCreatedOn)), + e.eventCreatedOn); + fire(referenceBatchUpdatedEvent); } } } - private void fireRefUpdate(RefUpdateAttribute refUpdate, long eventCreatedOn) { - repLog.info( - "Ref event received: {} on project {}:{} - {} => {}", - refUpdateType(refUpdate.oldRev, refUpdate.newRev), - refUpdate.project, - refUpdate.refName, - refUpdate.oldRev, - refUpdate.newRev); - fire(ReferenceUpdatedEvent.from(refUpdate, eventCreatedOn)); - } - @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey project = Project.nameKey(event.getProjectName()); @@ -217,10 +237,10 @@ source.getApis().forEach(apiUrl -> source.scheduleDeleteProject(apiUrl, project))); } - private static int sortByMetaRefAsLast(RefUpdateAttribute a, RefUpdateAttribute b) { - repLog.debug("sortByMetaRefAsLast({} <=> {})", a.refName, b.refName); + private static int sortByMetaRefAsLast(ReferenceUpdatedEvent a, ReferenceUpdatedEvent b) { + repLog.debug("sortByMetaRefAsLast({} <=> {})", a.refName(), b.refName()); return Boolean.compare( - RefNames.isNoteDbMetaRef(a.refName), RefNames.isNoteDbMetaRef(b.refName)); + RefNames.isNoteDbMetaRef(a.refName()), RefNames.isNoteDbMetaRef(b.refName())); } private static String refUpdateType(String oldRev, String newRev) { @@ -237,13 +257,13 @@ return !refsFilter.match(refName); } - private void fire(ReferenceUpdatedEvent event) { + private void fire(ReferenceBatchUpdatedEvent event) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); fire(event, state); state.markAllFetchTasksScheduled(); } - private void fire(ReferenceUpdatedEvent event, ReplicationState state) { + private void fire(ReferenceBatchUpdatedEvent event, ReplicationState state) { if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", @@ -263,12 +283,7 @@ final Consumer<Source> callFunction = callFunction( - Project.nameKey(event.projectName()), - event.objectId(), - event.refName(), - event.eventCreatedOn(), - event.isDelete(), - state); + Project.nameKey(event.projectName()), event.refs(), event.eventCreatedOn(), state); fetchCallsPool .submit(() -> allSources.parallelStream().forEach(callFunction)) .get(fetchCallsTimeout, TimeUnit.MILLISECONDS); @@ -288,13 +303,10 @@ private Consumer<Source> callFunction( NameKey project, - ObjectId objectId, - String refName, + List<ReferenceUpdatedEvent> refs, long eventCreatedOn, - boolean isDelete, ReplicationState state) { - CallFunction call = - getCallFunction(project, objectId, refName, eventCreatedOn, isDelete, state); + CallFunction call = getCallFunction(project, refs, eventCreatedOn, state); return (source) -> { boolean callSuccessful; @@ -303,60 +315,65 @@ } catch (Exception e) { repLog.warn( String.format( - "Failed to apply object %s on project %s:%s, falling back to git fetch", - objectId.name(), project, refName), + "Failed to batch apply object %s on project %s, falling back to git fetch", + refs.stream() + .map(event -> String.format("%s:%s", event.refName(), event.objectId())) + .collect(Collectors.joining(",")), + project), e); callSuccessful = false; } if (!callSuccessful) { - callFetch(source, project, refName, state); + callFetch(source, project, refs, state); } }; } private CallFunction getCallFunction( NameKey project, - ObjectId objectId, - String refName, + List<ReferenceUpdatedEvent> refs, long eventCreatedOn, - boolean isDelete, ReplicationState state) { - if (isDelete) { - return ((source) -> - callSendObject(source, project, refName, eventCreatedOn, isDelete, null, state)); - } try { - Optional<RevisionData> revisionData = - revReaderProvider.get().read(project, objectId, refName, 0); - repLog.info( - "RevisionData is {} for {}:{}", - revisionData.map(RevisionData::toString).orElse("ABSENT"), - project, - refName); + List<BatchApplyObjectData> refsBatch = + refs.stream() + .map(ref -> toBatchApplyObject(project, ref, state)) + .collect(Collectors.toList()); - if (revisionData.isPresent()) { - return ((source) -> - callSendObject( - source, - project, - refName, - eventCreatedOn, - isDelete, - Arrays.asList(revisionData.get()), - state)); + if (!containsLargeRef(refsBatch)) { + return ((source) -> callBatchSendObject(source, project, refsBatch, eventCreatedOn, state)); } - } catch (InvalidObjectIdException | IOException e) { + } catch (UncheckedIOException e) { + stateLog.error("Falling back to calling fetch", e, state); + } + return ((source) -> callFetch(source, project, refs, state)); + } + + private BatchApplyObjectData toBatchApplyObject( + NameKey project, ReferenceUpdatedEvent event, ReplicationState state) { + if (event.isDelete()) { + Optional<RevisionData> noRevisionData = Optional.empty(); + return BatchApplyObjectData.create(event.refName(), noRevisionData, event.isDelete()); + } + try { + Optional<RevisionData> maybeRevisionData = + revReaderProvider.get().read(project, event.objectId(), event.refName(), 0); + return BatchApplyObjectData.create(event.refName(), maybeRevisionData, event.isDelete()); + } catch (IOException e) { stateLog.error( String.format( "Exception during reading ref: %s, project:%s, message: %s", - refName, project.get(), e.getMessage()), + event.refName(), project.get(), e.getMessage()), e, state); + throw new UncheckedIOException(e); } + } - return (source) -> callFetch(source, project, refName, state); + private boolean containsLargeRef(List<BatchApplyObjectData> batchApplyObjectData) { + return batchApplyObjectData.stream().anyMatch(e -> e.revisionData().isEmpty() && !e.isDelete()); } private boolean callSendObject( @@ -460,6 +477,126 @@ return resultIsSuccessful; } + private boolean callBatchSendObject( + Source source, + NameKey project, + List<BatchApplyObjectData> refsBatch, + long eventCreatedOn, + ReplicationState state) + throws MissingParentObjectException { + boolean batchResultSuccessful = true; + + List<BatchApplyObjectData> filteredRefsBatch = + refsBatch.stream() + .filter(r -> source.wouldFetchProject(project) && source.wouldFetchRef(r.refName())) + .collect(Collectors.toList()); + + String batchApplyObjectStr = + filteredRefsBatch.stream() + .map(BatchApplyObjectData::toString) + .collect(Collectors.joining(",")); + FetchApiClient fetchClient = fetchClientFactory.create(source); + + for (String apiUrl : source.getApis()) { + try { + URIish uri = new URIish(apiUrl); + repLog.info( + "Pull replication REST API batch apply object to {} for {}:[{}]", + apiUrl, + project, + batchApplyObjectStr); + Context<String> apiTimer = applyObjectMetrics.startEnd2End(source.getRemoteConfigName()); + HttpResult result = + fetchClient.callBatchSendObject(project, filteredRefsBatch, eventCreatedOn, uri); + boolean resultSuccessful = result.isSuccessful(); + repLog.info( + "Pull replication REST API batch apply object to {} COMPLETED for {}:[{}], HTTP Result:" + + " {} - time:{} ms", + apiUrl, + project, + batchApplyObjectStr, + result, + apiTimer.stop() / 1000000.0); + + if (!resultSuccessful + && result.isProjectMissing(project) + && source.isCreateMissingRepositories()) { + result = initProject(project, uri, fetchClient, result); + repLog.info("Missing project {} created, HTTP Result:{}", project, result); + } + + if (!resultSuccessful && result.isParentObjectMissing()) { + resultSuccessful = true; + for (BatchApplyObjectData batchApplyObject : filteredRefsBatch) { + String refName = batchApplyObject.refName(); + if ((RefNames.isNoteDbMetaRef(refName) || applyObjectsRefsFilter.match(refName)) + && batchApplyObject.revisionData().isPresent()) { + + Optional<RevisionData> maybeRevisionData = batchApplyObject.revisionData(); + List<RevisionData> allRevisions = + fetchWholeMetaHistory(project, refName, maybeRevisionData.get()); + + resultSuccessful &= + callSendObject( + source, + project, + refName, + eventCreatedOn, + batchApplyObject.isDelete(), + allRevisions, + state); + } else { + throw new MissingParentObjectException( + project, refName, source.getRemoteConfigName()); + } + } + } + + if (!resultSuccessful && !result.isSendBatchObjectAvailable()) { + resultSuccessful = true; + for (BatchApplyObjectData batchApplyObjectData : filteredRefsBatch) { + resultSuccessful &= + callSendObject( + source, + project, + batchApplyObjectData.refName(), + eventCreatedOn, + batchApplyObjectData.isDelete(), + batchApplyObjectData.revisionData().map(ImmutableList::of).orElse(null), + state); + } + } + + batchResultSuccessful &= resultSuccessful; + } catch (URISyntaxException e) { + repLog.warn( + "Pull replication REST API batch apply object to {} *FAILED* for {}:[{}]", + apiUrl, + project, + batchApplyObjectStr, + e); + stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state); + batchResultSuccessful = false; + } catch (IOException | IllegalArgumentException e) { + repLog.warn( + "Pull replication REST API batch apply object to {} *FAILED* for {}:[{}]", + apiUrl, + project, + batchApplyObjectStr, + e); + stateLog.error( + String.format( + "Exception during the pull replication fetch rest api call. Endpoint url:%s," + + " message:%s", + apiUrl, e.getMessage()), + e, + state); + batchResultSuccessful = false; + } + } + return batchResultSuccessful; + } + private List<RevisionData> fetchWholeMetaHistory( NameKey project, String refName, RevisionData revision) throws RepositoryNotFoundException, MissingObjectException, IncorrectObjectTypeException, @@ -483,52 +620,60 @@ } private boolean callFetch( - Source source, Project.NameKey project, String refName, ReplicationState state) { + Source source, + Project.NameKey project, + List<ReferenceUpdatedEvent> refs, + ReplicationState state) { boolean resultIsSuccessful = true; - if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) { - for (String apiUrl : source.getApis()) { - try { - URIish uri = new URIish(apiUrl); - FetchApiClient fetchClient = fetchClientFactory.create(source); - repLog.info("Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName); - Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName()); - HttpResult result = fetchClient.callFetch(project, refName, uri); - long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop()); - boolean resultSuccessful = result.isSuccessful(); - repLog.info( - "Pull replication REST API fetch to {} COMPLETED for {}:{}, HTTP Result:" - + " {} - time:{} ms", - apiUrl, - project, - refName, - result, - elapsedMs); - if (!resultSuccessful - && result.isProjectMissing(project) - && source.isCreateMissingRepositories()) { - result = initProject(project, uri, fetchClient, result); - } - if (!resultSuccessful) { - stateLog.warn( - String.format( - "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s", - apiUrl, result.getMessage().orElse("unknown")), - state); - } + for (ReferenceUpdatedEvent refEvent : refs) { + String refName = refEvent.refName(); + if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) { + for (String apiUrl : source.getApis()) { + try { + URIish uri = new URIish(apiUrl); + FetchApiClient fetchClient = fetchClientFactory.create(source); + repLog.info( + "Pull replication REST API fetch to {} for {}:{}", apiUrl, project, refName); + Context<String> timer = fetchMetrics.startEnd2End(source.getRemoteConfigName()); + HttpResult result = fetchClient.callFetch(project, refName, uri); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(timer.stop()); + boolean resultSuccessful = result.isSuccessful(); + repLog.info( + "Pull replication REST API fetch to {} COMPLETED for {}:{}, HTTP Result:" + + " {} - time:{} ms", + apiUrl, + project, + refName, + result, + elapsedMs); + if (!resultSuccessful + && result.isProjectMissing(project) + && source.isCreateMissingRepositories()) { + result = initProject(project, uri, fetchClient, result); + } + if (!resultSuccessful) { + stateLog.warn( + String.format( + "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s", + apiUrl, result.getMessage().orElse("unknown")), + state); + } - resultIsSuccessful &= result.isSuccessful(); - } catch (URISyntaxException e) { - stateLog.error(String.format("Cannot parse pull replication api url:%s", apiUrl), state); - resultIsSuccessful = false; - } catch (Exception e) { - stateLog.error( - String.format( - "Exception during the pull replication fetch rest api call. Endpoint url:%s," - + " message:%s", - apiUrl, e.getMessage()), - e, - state); - resultIsSuccessful = false; + resultIsSuccessful &= result.isSuccessful(); + } catch (URISyntaxException e) { + stateLog.error( + String.format("Cannot parse pull replication api url:%s", apiUrl), state); + resultIsSuccessful = false; + } catch (Exception e) { + stateLog.error( + String.format( + "Exception during the pull replication fetch rest api call. Endpoint url:%s," + + " message:%s", + apiUrl, e.getMessage()), + e, + state); + resultIsSuccessful = false; + } } } } @@ -555,8 +700,14 @@ private void fireBeforeStartupEvents() { Set<String> eventsReplayed = new HashSet<>(); - for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) { - String eventKey = String.format("%s:%s", event.projectName(), event.refName()); + for (ReferenceBatchUpdatedEvent event : beforeStartupEventsQueue) { + String eventKey = + String.format( + "%s:%s", + event.projectName(), + event.refs().stream() + .map(ReferenceUpdatedEvent::refName) + .collect(Collectors.joining())); if (!eventsReplayed.contains(eventKey)) { repLog.info("Firing pending task {}", event); fire(event); @@ -577,6 +728,22 @@ } @AutoValue + abstract static class ReferenceBatchUpdatedEvent { + + static ReferenceBatchUpdatedEvent create( + String projectName, List<ReferenceUpdatedEvent> refs, long eventCreatedOn) { + return new AutoValue_ReplicationQueue_ReferenceBatchUpdatedEvent( + projectName, refs, eventCreatedOn); + } + + public abstract String projectName(); + + public abstract List<ReferenceUpdatedEvent> refs(); + + public abstract long eventCreatedOn(); + } + + @AutoValue abstract static class ReferenceUpdatedEvent { static ReferenceUpdatedEvent create(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectAction.java new file mode 100644 index 0000000..c67ceec --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectAction.java
@@ -0,0 +1,49 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.api; + +import com.google.gerrit.extensions.restapi.Response; +import com.google.gerrit.extensions.restapi.RestApiException; +import com.google.gerrit.extensions.restapi.RestModifyView; +import com.google.gerrit.server.project.ProjectResource; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput; +import java.util.ArrayList; +import java.util.List; + +@Singleton +class BatchApplyObjectAction implements RestModifyView<ProjectResource, List<RevisionInput>> { + + private final ApplyObjectAction applyObjectAction; + + @Inject + BatchApplyObjectAction(ApplyObjectAction applyObjectAction) { + this.applyObjectAction = applyObjectAction; + } + + @Override + public Response<?> apply(ProjectResource resource, List<RevisionInput> inputs) + throws RestApiException { + + List<Response<?>> allResponses = new ArrayList<>(); + for (RevisionInput input : inputs) { + Response<?> individualResponse = applyObjectAction.apply(resource, input); + allResponses.add(individualResponse); + } + + return Response.ok(allResponses); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java index be71946..b6b8fd0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
@@ -125,6 +125,7 @@ return (requestURI.contains(pluginName) && (requestURI.endsWith(String.format("/%s~apply-object", pluginName)) || requestURI.endsWith(String.format("/%s~apply-objects", pluginName)) + || requestURI.endsWith(String.format("/%s~batch-apply-object", pluginName)) || requestURI.endsWith(String.format("/%s~fetch", pluginName)) || requestURI.endsWith(String.format("/%s~delete-project", pluginName)) || requestURI.contains(String.format("/%s/init-project/", pluginName))))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectData.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectData.java new file mode 100644 index 0000000..7a613c0 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectData.java
@@ -0,0 +1,45 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.api.data; + +import com.google.auto.value.AutoValue; +import java.util.Optional; + +@AutoValue +public abstract class BatchApplyObjectData { + + public static BatchApplyObjectData create( + String refName, Optional<RevisionData> revisionData, boolean isDelete) + throws IllegalArgumentException { + if (isDelete && revisionData.isPresent()) { + throw new IllegalArgumentException( + "DELETE ref-updates cannot be associated with a RevisionData"); + } + return new AutoValue_BatchApplyObjectData(refName, revisionData, isDelete); + } + + public abstract String refName(); + + public abstract Optional<RevisionData> revisionData(); + + public abstract boolean isDelete(); + + @Override + public String toString() { + return String.format( + "%s:%s isDelete=%s", + refName(), revisionData().map(RevisionData::toString).orElse("ABSENT"), isDelete()); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java index 1b3dc43..5f39811 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -19,6 +19,7 @@ import com.google.gerrit.entities.Project; import com.google.gerrit.entities.Project.NameKey; import com.googlesource.gerrit.plugins.replication.pull.Source; +import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData; import java.io.IOException; import java.util.List; @@ -54,6 +55,13 @@ URIish targetUri) throws IOException; + HttpResult callBatchSendObject( + NameKey project, + List<BatchApplyObjectData> batchApplyObjects, + long eventCreatedOn, + URIish targetUri) + throws IOException; + HttpResult callSendObjects( NameKey project, String refName,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java index 09148fe..2b9ef09 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -36,6 +36,7 @@ import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider; import com.googlesource.gerrit.plugins.replication.pull.Source; import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics; +import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionsInput; @@ -44,6 +45,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.ParseException; @@ -196,6 +198,32 @@ } @Override + public HttpResult callBatchSendObject( + NameKey project, + List<BatchApplyObjectData> batchedRefs, + long eventCreatedOn, + URIish targetUri) + throws IOException { + List<RevisionInput> inputs = + batchedRefs.stream() + .map( + batchApplyObject -> + new RevisionInput( + instanceId, + batchApplyObject.refName(), + eventCreatedOn, + batchApplyObject.revisionData().orElse(null))) + .collect(Collectors.toList()); + + String url = formatUrl(targetUri.toString(), project, "batch-apply-object"); + + HttpPost post = new HttpPost(url); + post.setEntity(new StringEntity(GSON.toJson(inputs))); + post.addHeader(new BasicHeader(CONTENT_TYPE, MediaType.JSON_UTF_8.toString())); + return executeRequest(post, bearerTokenProvider.get(), targetUri); + } + + @Override public HttpResult callSendObjects( NameKey project, String refName,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java index ec9d65f..6428ece 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResult.java
@@ -15,6 +15,7 @@ package com.googlesource.gerrit.plugins.replication.pull.client; import static javax.servlet.http.HttpServletResponse.SC_CONFLICT; +import static javax.servlet.http.HttpServletResponse.SC_NOT_FOUND; import com.google.gerrit.entities.Project; import java.util.Optional; @@ -51,4 +52,8 @@ ? "OK" : "FAILED" + ", status=" + responseCode + message.map(s -> " '" + s + "'").orElse(""); } + + public boolean isSendBatchObjectAvailable() { + return responseCode != SC_NOT_FOUND; + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java index 4f853a2..7131513 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -22,7 +22,6 @@ import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -49,6 +48,7 @@ import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.ReplicationConfig; import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig; +import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData; import com.googlesource.gerrit.plugins.replication.pull.api.exception.RefUpdateException; import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient; @@ -57,6 +57,7 @@ import com.googlesource.gerrit.plugins.replication.pull.filter.ApplyObjectsRefsFilter; import com.googlesource.gerrit.plugins.replication.pull.filter.ExcludedRefsFilter; import java.io.IOException; +import java.net.URISyntaxException; import java.nio.file.Path; import java.util.Arrays; import java.util.List; @@ -66,13 +67,13 @@ import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.transport.URIish; import org.eclipse.jgit.util.FS; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -103,6 +104,7 @@ @Mock RevisionData revisionDataWithParents; List<ObjectId> revisionDataParentObjectIds; @Mock HttpResult httpResult; + @Mock HttpResult batchHttpResult; @Mock ApplyObjectsRefsFilter applyObjectsRefsFilter; @Mock Config config; @@ -112,6 +114,7 @@ @Captor ArgumentCaptor<String> stringCaptor; @Captor ArgumentCaptor<Project.NameKey> projectNameKeyCaptor; @Captor ArgumentCaptor<List<RevisionData>> revisionsDataCaptor; + @Captor ArgumentCaptor<List<BatchApplyObjectData>> batchRefsCaptor; private ExcludedRefsFilter refsFilter; private ReplicationQueue objectUnderTest; @@ -159,12 +162,17 @@ lenient() .when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any())) .thenReturn(httpResult); + lenient() + .when(fetchRestApiClient.callBatchSendObject(any(), any(), anyLong(), any())) + .thenReturn(batchHttpResult); when(fetchRestApiClient.callFetch(any(), anyString(), any())).thenReturn(fetchHttpResult); when(fetchRestApiClient.initProject(any(), any())).thenReturn(successfulHttpResult); when(successfulHttpResult.isSuccessful()).thenReturn(true); when(httpResult.isSuccessful()).thenReturn(true); + when(batchHttpResult.isSuccessful()).thenReturn(true); when(fetchHttpResult.isSuccessful()).thenReturn(true); when(httpResult.isProjectMissing(any())).thenReturn(false); + when(batchHttpResult.isProjectMissing(any())).thenReturn(false); when(applyObjectsRefsFilter.match(any())).thenReturn(false); applyObjectMetrics = new ApplyObjectMetrics("pull-replication", new DisabledMetricMaker()); @@ -187,12 +195,12 @@ } @Test - public void shouldCallSendObjectWhenMetaRef() throws IOException { + public void shouldCallBatchSendObjectWhenMetaRef() throws IOException { Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); objectUnderTest.start(); objectUnderTest.onEvent(event); - verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any()); } @Test @@ -219,7 +227,7 @@ objectUnderTest.start(); objectUnderTest.onEvent(event); - verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any()); } @Test @@ -229,15 +237,14 @@ objectUnderTest.start(); objectUnderTest.onEvent(event); - verify(fetchRestApiClient, never()) - .callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient, never()).callBatchSendObject(any(), any(), anyLong(), any()); } @Test public void shouldCallInitProjectWhenProjectIsMissing() throws IOException { Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); - when(httpResult.isSuccessful()).thenReturn(false); - when(httpResult.isProjectMissing(any())).thenReturn(true); + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isProjectMissing(any())).thenReturn(true); when(source.isCreateMissingRepositories()).thenReturn(true); objectUnderTest.start(); @@ -265,8 +272,10 @@ @Test public void shouldNotCallInitProjectWhenReplicateNewRepositoriesNotSet() throws IOException { Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); - when(httpResult.isSuccessful()).thenReturn(false); - when(httpResult.isProjectMissing(any())).thenReturn(true); + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isProjectMissing(any())).thenReturn(true); + lenient().when(httpResult.isSuccessful()).thenReturn(false); + lenient().when(httpResult.isProjectMissing(any())).thenReturn(true); when(source.isCreateMissingRepositories()).thenReturn(false); objectUnderTest.start(); @@ -276,7 +285,7 @@ } @Test - public void shouldCallSendObjectWhenPatchSetRefAndRefUpdateEvent() throws IOException { + public void shouldCallBatchSendObjectWhenPatchSetRefAndRefUpdateEvent() throws IOException { when(config.getBoolean("event", "stream-events", "enableBatchRefUpdatedEvents", false)) .thenReturn(false); @@ -299,16 +308,16 @@ objectUnderTest.start(); objectUnderTest.onEvent(event); - verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any()); } @Test - public void shouldCallSendObjectWhenPatchSetRef() throws IOException { + public void shouldCallBatchSendObjectWhenPatchSetRef() throws IOException { Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1"); objectUnderTest.start(); objectUnderTest.onEvent(event); - verify(fetchRestApiClient).callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient).callBatchSendObject(any(), any(), anyLong(), any()); } @Test @@ -338,14 +347,49 @@ } @Test - public void shouldFallbackToCallFetchWhenParentObjectIsMissing() throws IOException { + public void + shouldFallbackToCallFetchWhenParentObjectIsMissingAndRefDoesntMatchApplyObjectsRefsFilter() + throws IOException { Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1"); objectUnderTest.start(); + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(true); + + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient).callFetch(any(), anyString(), any()); + } + + @Test + public void + shouldFallbackToApplyObjectsForEachRefWhenParentObjectIsMissingAndRefMatchesApplyObjectsRefFilter() + throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/02/1/1"); + objectUnderTest.start(); + + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(true); + when(applyObjectsRefsFilter.match(any())).thenReturn(true); + + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient, times(2)) + .callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient, never()).callFetch(any(), anyString(), any()); + } + + @Test + public void shouldFallbackToCallFetchWhenSendBatchObjectNotAvailableAndApplyObjectFails() + throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1"); + objectUnderTest.start(); + + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(false); + when(batchHttpResult.isSendBatchObjectAvailable()).thenReturn(false); when(httpResult.isSuccessful()).thenReturn(false); - when(httpResult.isParentObjectMissing()).thenReturn(true); - when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any())) - .thenReturn(httpResult); + when(httpResult.isParentObjectMissing()).thenReturn(false); objectUnderTest.onEvent(event); @@ -358,10 +402,56 @@ Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); objectUnderTest.start(); + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(true); + + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient, times(1)) + .callSendObjects(any(), anyString(), anyLong(), revisionsDataCaptor.capture(), any()); + List<List<RevisionData>> revisionsDataValues = revisionsDataCaptor.getAllValues(); + assertThat(revisionsDataValues).hasSize(1); + + List<RevisionData> firstRevisionsValues = revisionsDataValues.get(0); + assertThat(firstRevisionsValues).hasSize(1 + revisionDataParentObjectIds.size()); + assertThat(firstRevisionsValues).contains(revisionData); + } + + @Test + public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnAllowedRefs() + throws IOException { + String refName = "refs/tags/test-tag"; + Event event = generateBatchRefUpdateEvent(refName); + objectUnderTest.start(); + + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(true); + when(applyObjectsRefsFilter.match(refName)).thenReturn(true); + + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient, times(1)) + .callSendObjects(any(), anyString(), anyLong(), revisionsDataCaptor.capture(), any()); + List<List<RevisionData>> revisionsDataValues = revisionsDataCaptor.getAllValues(); + assertThat(revisionsDataValues).hasSize(1); + + List<RevisionData> firstRevisionsValues = revisionsDataValues.get(0); + assertThat(firstRevisionsValues).hasSize(1 + revisionDataParentObjectIds.size()); + assertThat(firstRevisionsValues).contains(revisionData); + } + + @Test + public void + shouldFallbackToApplyAllParentObjectsWhenSendBatchObjectNotAvailableAndParentObjectIsMissingOnMetaRef() + throws IOException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/meta"); + objectUnderTest.start(); + + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(false); + when(batchHttpResult.isSendBatchObjectAvailable()).thenReturn(false); when(httpResult.isSuccessful()).thenReturn(false, true); when(httpResult.isParentObjectMissing()).thenReturn(true, false); - when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any())) - .thenReturn(httpResult); objectUnderTest.onEvent(event); @@ -379,16 +469,18 @@ } @Test - public void shouldFallbackToApplyAllParentObjectsWhenParentObjectIsMissingOnAllowedRefs() - throws IOException { + public void + shouldFallbackToApplyAllParentObjectsWhenSendBatchObjectNotAvailableAndParentObjectIsMissingOnAllowedRefs() + throws IOException { String refName = "refs/tags/test-tag"; Event event = generateBatchRefUpdateEvent(refName); objectUnderTest.start(); + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(false); + when(batchHttpResult.isSendBatchObjectAvailable()).thenReturn(false); when(httpResult.isSuccessful()).thenReturn(false, true); when(httpResult.isParentObjectMissing()).thenReturn(true, false); - when(fetchRestApiClient.callSendObjects(any(), anyString(), anyLong(), any(), any())) - .thenReturn(httpResult); when(applyObjectsRefsFilter.match(refName)).thenReturn(true); objectUnderTest.onEvent(event); @@ -407,6 +499,27 @@ } @Test + public void shouldCallBatchFetchForAllTheRefsInTheBatchIfApplyObjectFails() + throws IOException, URISyntaxException { + Event event = generateBatchRefUpdateEvent("refs/changes/01/1/1", "refs/changes/02/1/1"); + when(batchHttpResult.isSuccessful()).thenReturn(false); + when(batchHttpResult.isParentObjectMissing()).thenReturn(true); + when(applyObjectsRefsFilter.match(any())).thenReturn(true, true); + when(httpResult.isSuccessful()).thenReturn(true, false); + when(httpResult.isParentObjectMissing()).thenReturn(true); + + objectUnderTest.start(); + objectUnderTest.onEvent(event); + + verify(fetchRestApiClient, times(2)) + .callSendObjects(any(), anyString(), anyLong(), any(), any()); + verify(fetchRestApiClient) + .callFetch(PROJECT, "refs/changes/01/1/1", new URIish("http://localhost:18080")); + verify(fetchRestApiClient) + .callFetch(PROJECT, "refs/changes/02/1/1", new URIish("http://localhost:18080")); + } + + @Test public void shouldSkipEventWhenMultiSiteVersionRef() throws IOException { FileBasedConfig fileConfig = new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); @@ -525,14 +638,12 @@ } private void verifySendObjectOrdering(String firstRef, String secondRef) throws IOException { - InOrder inOrder = inOrder(fetchRestApiClient); + verify(fetchRestApiClient) + .callBatchSendObject(any(), batchRefsCaptor.capture(), anyLong(), any()); + List<BatchApplyObjectData> batchRefs = batchRefsCaptor.getValue(); - inOrder - .verify(fetchRestApiClient) - .callSendObjects(any(), eq(firstRef), anyLong(), any(), any()); - inOrder - .verify(fetchRestApiClient) - .callSendObjects(any(), eq(secondRef), anyLong(), any(), any()); + assertThat(batchRefs.get(0).refName()).isEqualTo(firstRef); + assertThat(batchRefs.get(1).refName()).isEqualTo(secondRef); } private class TestEvent extends RefUpdatedEvent {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectActionTest.java new file mode 100644 index 0000000..d8dabf1 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchApplyObjectActionTest.java
@@ -0,0 +1,214 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.api; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.http.HttpStatus.SC_OK; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; +import com.google.gerrit.extensions.restapi.MergeConflictException; +import com.google.gerrit.extensions.restapi.Response; +import com.google.gerrit.extensions.restapi.RestApiException; +import com.google.gerrit.server.project.ProjectResource; +import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData; +import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput; +import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData; +import java.util.Collections; +import java.util.List; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jgit.lib.Constants; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class BatchApplyObjectActionTest { + + private static final long DUMMY_EVENT_TIMESTAMP = 1684875939; + + private BatchApplyObjectAction batchApplyObjectAction; + private static final String LABEL = "instance-2-label"; + private static final String REF_NAME = "refs/heads/master"; + private static final String REF_META_NAME = "refs/meta/version"; + private static final String SAMPLE_COMMIT_OBJECT_ID = "9f8d52853089a3cf00c02ff7bd0817bd4353a95a"; + private static final String SAMPLE_TREE_OBJECT_ID = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"; + + private static final String SAMPLE_COMMIT_CONTENT = + "tree " + + SAMPLE_TREE_OBJECT_ID + + "\n" + + "parent 20eb48d28be86dc88fb4bef747f08de0fbefe12d\n" + + "author Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac> 1610471611 +0100\n" + + "committer Gerrit Code Review <root@maczech-XPS-15> 1610471611 +0100\n" + + "\n" + + "Update patch set 1\n" + + "\n" + + "Change has been successfully merged by Administrator\n" + + "\n" + + "Patch-set: 1\n" + + "Status: merged\n" + + "Tag: autogenerated:gerrit:merged\n" + + "Reviewer: Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac>\n" + + "Label: SUBM=+1\n" + + "Submission-id: 1904-1610471611558-783c0a2f\n" + + "Submitted-with: OK\n" + + "Submitted-with: OK: Code-Review: Gerrit User 1000000 <1000000@69ec38f0-350e-4d9c-96d4-bc956f2faaac>"; + + @Mock private ApplyObjectAction applyObjectAction; + @Mock private ProjectResource projectResource; + + @Before + public void setup() { + batchApplyObjectAction = new BatchApplyObjectAction(applyObjectAction); + } + + @Test + public void shouldDelegateToApplyObjectActionForEveryRevision() throws RestApiException { + RevisionInput first = + new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + RevisionInput second = + new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + + batchApplyObjectAction.apply(projectResource, List.of(first, second)); + + verify(applyObjectAction).apply(projectResource, first); + verify(applyObjectAction).apply(projectResource, second); + } + + @Test + public void shouldReturnOkResponseCodeWhenAllRevisionsAreProcessedSuccessfully() + throws RestApiException { + RevisionInput first = + new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + RevisionInput second = + new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + + when(applyObjectAction.apply(projectResource, first)) + .thenAnswer((Answer<Response<?>>) invocation -> Response.created(first)); + when(applyObjectAction.apply(projectResource, second)) + .thenAnswer((Answer<Response<?>>) invocation -> Response.created(second)); + + Response<?> response = batchApplyObjectAction.apply(projectResource, List.of(first, second)); + + assertThat(response.statusCode()).isEqualTo(SC_OK); + } + + @Test + public void shouldReturnAListWithAllTheRevisionsInResponseBodyOnSuccess() + throws RestApiException { + RevisionInput first = + new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + RevisionInput second = + new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + Response<?> firstResponse = Response.created(first); + Response<?> secondResponse = Response.created(second); + + when(applyObjectAction.apply(projectResource, first)) + .thenAnswer((Answer<Response<?>>) invocation -> firstResponse); + when(applyObjectAction.apply(projectResource, second)) + .thenAnswer((Answer<Response<?>>) invocation -> secondResponse); + + Response<?> response = batchApplyObjectAction.apply(projectResource, List.of(first, second)); + + assertThat((List<Response<?>>) response.value()) + .isEqualTo(List.of(firstResponse, secondResponse)); + } + + @Test + public void shouldAcceptAMixOfCreatesAndDeletes() throws RestApiException { + RevisionInput delete = new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, null); + RevisionInput create = + new RevisionInput(LABEL, "foo", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + Response<?> deleteResponse = Response.withStatusCode(HttpServletResponse.SC_NO_CONTENT, ""); + Response<?> createResponse = Response.created(create); + + when(applyObjectAction.apply(projectResource, delete)) + .thenAnswer((Answer<Response<?>>) invocation -> deleteResponse); + when(applyObjectAction.apply(projectResource, create)) + .thenAnswer((Answer<Response<?>>) invocation -> createResponse); + + Response<?> response = batchApplyObjectAction.apply(projectResource, List.of(delete, create)); + + assertThat((List<Response<?>>) response.value()) + .isEqualTo(List.of(deleteResponse, createResponse)); + } + + @Test + public void shouldReturnOneOkCodeEvenIfInputContainsBothCreatesAndDeletes() + throws RestApiException { + RevisionInput create = + new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + RevisionInput delete = new RevisionInput(LABEL, REF_META_NAME, DUMMY_EVENT_TIMESTAMP + 1, null); + + List<RevisionInput> inputs = List.of(create, delete); + + Response<?> response = batchApplyObjectAction.apply(projectResource, inputs); + + assertThat(response.statusCode()).isEqualTo(SC_OK); + } + + @Test(expected = RestApiException.class) + public void shouldThrowARestApiExceptionIfProcessingFailsForAnyOfTheRevisions() + throws RestApiException { + RevisionInput good = + new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + RevisionInput bad = + new RevisionInput(LABEL, "bad", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + + when(applyObjectAction.apply(projectResource, good)) + .thenAnswer((Answer<Response<?>>) invocation -> Response.created(good)); + when(applyObjectAction.apply(projectResource, bad)) + .thenThrow(new MergeConflictException("BOOM")); + + batchApplyObjectAction.apply(projectResource, List.of(good, bad)); + } + + @Test + public void shouldStopProcessingWhenAFailureOccurs() throws RestApiException { + RevisionInput good = + new RevisionInput(LABEL, REF_NAME, DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + RevisionInput bad = + new RevisionInput(LABEL, "bad", DUMMY_EVENT_TIMESTAMP, createSampleRevisionData()); + + when(applyObjectAction.apply(projectResource, bad)) + .thenThrow(new MergeConflictException("BOOM")); + + try { + batchApplyObjectAction.apply(projectResource, List.of(bad, good)); + } catch (MergeConflictException e) { + verify(applyObjectAction, never()).apply(projectResource, good); + } + } + + private RevisionData createSampleRevisionData() { + RevisionObjectData commitData = + new RevisionObjectData( + SAMPLE_COMMIT_OBJECT_ID, Constants.OBJ_COMMIT, SAMPLE_COMMIT_CONTENT.getBytes()); + RevisionObjectData treeData = + new RevisionObjectData(SAMPLE_TREE_OBJECT_ID, Constants.OBJ_TREE, new byte[] {}); + return createSampleRevisionData(commitData, treeData); + } + + private RevisionData createSampleRevisionData( + RevisionObjectData commitData, RevisionObjectData treeData) { + return new RevisionData(Collections.emptyList(), commitData, treeData, Lists.newArrayList()); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java index ca69f06..9b0106c 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
@@ -91,6 +91,11 @@ } @Test + public void shouldAuthenticateWhenBatchApplyObject() throws ServletException, IOException { + authenticateAndFilter("any-prefix/pull-replication~batch-apply-object", NO_QUERY_PARAMETERS); + } + + @Test public void shouldAuthenticateWhenDeleteProject() throws ServletException, IOException { authenticateAndFilter("any-prefix/pull-replication~delete-project", NO_QUERY_PARAMETERS); }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectDataTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectDataTest.java new file mode 100644 index 0000000..bf74b56 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/data/BatchApplyObjectDataTest.java
@@ -0,0 +1,32 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.api.data; + +import java.util.Optional; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class BatchApplyObjectDataTest { + + @Mock private RevisionData revisionData; + + @Test(expected = IllegalArgumentException.class) + public void shouldFailIfRevisionDataIsPresentForADelete() { + BatchApplyObjectData.create("foo", Optional.of(revisionData), true); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java index 5de7e7d..e64bc6f 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
@@ -30,6 +30,7 @@ import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig; import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider; import com.googlesource.gerrit.plugins.replication.pull.Source; +import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData; import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData; import com.googlesource.gerrit.plugins.replication.pull.filter.SyncRefsFilter; @@ -37,7 +38,10 @@ import java.io.InputStreamReader; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.Optional; import org.apache.http.Header; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpPost; @@ -430,19 +434,141 @@ assertAuthentication(httpPut); } + @Test + public void shouldCallBatchSendObjectEndpoint() throws IOException, URISyntaxException { + + List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>(); + batchApplyObjects.add( + BatchApplyObjectData.create(refName, Optional.of(createSampleRevisionData("a")), false)); + + objectUnderTest.callBatchSendObject( + Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api)); + + verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any()); + + HttpPost httpPost = httpPostCaptor.getValue(); + assertThat(httpPost.getURI().getHost()).isEqualTo("gerrit-host"); + assertThat(httpPost.getURI().getPath()) + .isEqualTo( + String.format( + "%s/projects/test_repo/pull-replication~batch-apply-object", + urlAuthenticationPrefix())); + assertAuthentication(httpPost); + } + + @Test + public void shouldCallBatchApplyObjectEndpointWithAListOfRefsInPayload() + throws IOException, URISyntaxException { + List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>(); + RevisionData revisionA = createSampleRevisionData("a"); + RevisionData revisionB = createSampleRevisionData("b"); + String refNameB = "refs/heads/b"; + batchApplyObjects.add(BatchApplyObjectData.create(refName, Optional.of(revisionA), false)); + batchApplyObjects.add(BatchApplyObjectData.create(refNameB, Optional.of(revisionB), false)); + + objectUnderTest.callBatchSendObject( + Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api)); + + verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any()); + + HttpPost httpPost = httpPostCaptor.getValue(); + + String expectedSendObjectsPayload = + "[{\"label\":\"Replication\",\"ref_name\":\"" + + refName + + "\",\"event_created_on\":" + + eventCreatedOn + + ",\"revision_data\":{\"commit_object\":{\"sha1\":\"" + + revisionA.getCommitObject().getSha1() + + "\",\"type\":1,\"content\":\"Y29tbWl0YWNvbnRlbnQ\\u003d\"},\"tree_object\":{\"sha1\":\"" + + revisionA.getTreeObject().getSha1() + + "\",\"type\":2,\"content\":\"dHJlZWFjb250ZW50\"},\"blobs\":[{\"sha1\":\"" + + revisionA.getBlobs().get(0).getSha1() + + "\",\"type\":3,\"content\":\"YmxvYmFjb250ZW50\"}]}}," + + "{\"label\":\"Replication\",\"ref_name\":\"" + + refNameB + + "\",\"event_created_on\":" + + eventCreatedOn + + ",\"revision_data\":{\"commit_object\":{\"sha1\":\"" + + revisionB.getCommitObject().getSha1() + + "\",\"type\":1,\"content\":\"Y29tbWl0YmNvbnRlbnQ\\u003d\"},\"tree_object\":{\"sha1\":\"" + + revisionB.getTreeObject().getSha1() + + "\",\"type\":2,\"content\":\"dHJlZWJjb250ZW50\"},\"blobs\":[{\"sha1\":\"" + + revisionB.getBlobs().get(0).getSha1() + + "\",\"type\":3,\"content\":\"YmxvYmJjb250ZW50\"}]}}]"; + assertThat(readPayload(httpPost)).isEqualTo(expectedSendObjectsPayload); + } + + @Test + public void shouldCallBatchApplyObjectEndpointWithNoRevisionDataForDeletes() + throws IOException, URISyntaxException { + List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>(); + batchApplyObjects.add(BatchApplyObjectData.create(refName, Optional.empty(), true)); + + objectUnderTest.callBatchSendObject( + Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api)); + + verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any()); + + HttpPost httpPost = httpPostCaptor.getValue(); + + String expectedSendObjectsPayload = + "[{\"label\":\"Replication\",\"ref_name\":\"" + + refName + + "\",\"event_created_on\":" + + eventCreatedOn + + "}]"; + assertThat(readPayload(httpPost)).isEqualTo(expectedSendObjectsPayload); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowExceptionIfDeleteFlagIsSetButRevisionDataIsPresentForBatchSendEndpoint() + throws IOException, URISyntaxException { + List<BatchApplyObjectData> batchApplyObjects = new ArrayList<>(); + batchApplyObjects.add( + BatchApplyObjectData.create(refName, Optional.of(createSampleRevisionData()), true)); + + objectUnderTest.callBatchSendObject( + Project.nameKey("test_repo"), batchApplyObjects, eventCreatedOn, new URIish(api)); + } + public String readPayload(HttpPost entity) throws UnsupportedOperationException, IOException { ByteBuffer buf = IO.readWholeStream(entity.getEntity().getContent(), 1024); return RawParseUtils.decode(buf.array(), buf.arrayOffset(), buf.limit()).trim(); } - private RevisionData createSampleRevisionData() { + private RevisionData createSampleRevisionData(String prefix) { + String commitPrefix = "commit" + prefix; + String treePrefix = "tree" + prefix; + String blobPrefix = "blob" + prefix; + return createSampleRevisionData( + commitPrefix, + commitPrefix + "content", + treePrefix, + treePrefix + "content", + blobPrefix, + blobPrefix + "content"); + } + + private RevisionData createSampleRevisionData( + String commitObjectId, + String commitContent, + String treeObjectId, + String treeContent, + String blobObjectId, + String blobContent) { RevisionObjectData commitData = - new RevisionObjectData(commitObjectId, Constants.OBJ_COMMIT, commitObject.getBytes()); + new RevisionObjectData(commitObjectId, Constants.OBJ_COMMIT, commitContent.getBytes()); RevisionObjectData treeData = - new RevisionObjectData(treeObjectId, Constants.OBJ_TREE, treeObject.getBytes()); + new RevisionObjectData(treeObjectId, Constants.OBJ_TREE, treeContent.getBytes()); RevisionObjectData blobData = - new RevisionObjectData(blobObjectId, Constants.OBJ_BLOB, blobObject.getBytes()); + new RevisionObjectData(blobObjectId, Constants.OBJ_BLOB, blobContent.getBytes()); return new RevisionData( Collections.emptyList(), commitData, treeData, Lists.newArrayList(blobData)); } + + private RevisionData createSampleRevisionData() { + return createSampleRevisionData( + commitObjectId, commitObject, treeObjectId, treeObject, blobObjectId, blobObject); + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java index d82f3a5..900723e 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpResultTest.java
@@ -30,21 +30,25 @@ public static Iterable<Object[]> data() { return Arrays.asList( new Object[][] { - {HttpServletResponse.SC_OK, true}, - {HttpServletResponse.SC_CREATED, true}, - {HttpServletResponse.SC_ACCEPTED, true}, - {HttpServletResponse.SC_NO_CONTENT, true}, - {HttpServletResponse.SC_BAD_REQUEST, false}, - {HttpServletResponse.SC_CONFLICT, false} + {HttpServletResponse.SC_OK, true, true}, + {HttpServletResponse.SC_CREATED, true, true}, + {HttpServletResponse.SC_ACCEPTED, true, true}, + {HttpServletResponse.SC_NO_CONTENT, true, true}, + {HttpServletResponse.SC_BAD_REQUEST, false, true}, + {HttpServletResponse.SC_NOT_FOUND, false, false}, + {HttpServletResponse.SC_CONFLICT, false, true} }); } private Integer httpStatus; private boolean isSuccessful; + private boolean isSendBatchObjectAvailable; - public HttpResultTest(Integer httpStatus, Boolean isSuccessful) { + public HttpResultTest( + Integer httpStatus, Boolean isSuccessful, Boolean isSendBatchObjectAvailable) { this.httpStatus = httpStatus; this.isSuccessful = isSuccessful; + this.isSendBatchObjectAvailable = isSendBatchObjectAvailable; } @Test @@ -52,4 +56,10 @@ HttpResult httpResult = new HttpResult(httpStatus, Optional.empty()); assertThat(httpResult.isSuccessful()).isEqualTo(isSuccessful); } + + @Test + public void httpResultIsSendBatchObjectAvailable() { + HttpResult httpResult = new HttpResult(httpStatus, Optional.empty()); + assertThat(httpResult.isSendBatchObjectAvailable()).isEqualTo(isSendBatchObjectAvailable); + } }