Allow batch-fetch endpoint to delete refs

Currently ref deletion is only wired to apply-object.

If the apply-object call fails for any reason, when the client falls
back to the fetch call, the receiving end would just attept to *fetch*
the deleted ref.

This is obviously destined to fail, since the remote ref does not exist
anymore and thus cannot be fetched.

Change this behaviour by hydrating the payload request with the
additional information of the ref deletion status.

The receving side can then opportunely schedule a deletion (rather than
a fetch) for the refs that have been deleted.

Note that this change does not change the payload of the single fetch
endpoint, so that back-compatibility can be provided during the rollout
phase by temporarily switching off batch mode until all nodes have been
migrated to this version.

Bug: Issue 288965464
Change-Id: I0c8591ee01caa80927ac16375ba8bd98d434ceb4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/AutoValueTypeAdapterFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/AutoValueTypeAdapterFactory.java
new file mode 100644
index 0000000..264b87e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/AutoValueTypeAdapterFactory.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gson.TypeAdapterFactory;
+import com.ryanharter.auto.value.gson.GsonTypeAdapterFactory;
+
+@GsonTypeAdapterFactory
+public abstract class AutoValueTypeAdapterFactory implements TypeAdapterFactory {
+
+  public static TypeAdapterFactory create() {
+    return new AutoValueGson_AutoValueTypeAdapterFactory();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index a058426..ddc6258 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -40,6 +40,7 @@
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfigModule;
 import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import com.googlesource.gerrit.plugins.replication.pull.api.DeleteRefJob;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchApiCapability;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchJob;
 import com.googlesource.gerrit.plugins.replication.pull.auth.PullReplicationGroupModule;
@@ -82,6 +83,7 @@
     bind(RevisionReader.class).in(Scopes.SINGLETON);
     bind(ApplyObject.class);
     install(new FactoryModuleBuilder().build(FetchJob.Factory.class));
+    install(new FactoryModuleBuilder().build(DeleteRefJob.Factory.class));
     install(new ApplyObjectCacheModule());
 
     install(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index baeb330..d9b47d3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -40,6 +40,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.MissingParentObjectException;
@@ -611,13 +612,13 @@
 
     boolean resultIsSuccessful = true;
 
-    List<String> filteredRefs =
+    List<RefInput> filteredRefs =
         refs.stream()
-            .map(ReferenceUpdatedEvent::refName)
-            .filter(refName -> source.wouldFetchProject(project) && source.wouldFetchRef(refName))
+            .map(ref -> RefInput.create(ref.refName(), ref.isDelete()))
+            .filter(ref -> source.wouldFetchProject(project) && source.wouldFetchRef(ref.refName()))
             .collect(Collectors.toList());
 
-    String refsStr = String.join(",", filteredRefs);
+    String refsStr = filteredRefs.stream().map(RefInput::refName).collect(Collectors.joining(","));
     FetchApiClient fetchClient = fetchClientFactory.create(source);
 
     for (String apiUrl : source.getApis()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
index 3a7d0ff..df13944 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefCommand.java
@@ -38,6 +38,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.Set;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -67,6 +68,18 @@
     this.gitManager = gitManagerProvider.get();
   }
 
+  public void deleteRefsSync(
+      Project.NameKey name, Set<String> deletedRefNames, String sourceLabel) {
+    deletedRefNames.forEach(
+        r -> {
+          try {
+            deleteRef(name, r, sourceLabel);
+          } catch (RestApiException | IOException e) {
+            repLog.error("Could not delete ref {}:{} from source {}", name.get(), r, sourceLabel);
+          }
+        });
+  }
+
   public void deleteRef(Project.NameKey name, String refName, String sourceLabel)
       throws IOException, RestApiException {
     Source source =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefJob.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefJob.java
new file mode 100644
index 0000000..acc40cb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/DeleteRefJob.java
@@ -0,0 +1,45 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.entities.Project;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.BatchInput;
+
+public class DeleteRefJob implements Runnable {
+  public interface Factory {
+    DeleteRefJob create(Project.NameKey project, BatchInput input);
+  }
+
+  private final DeleteRefCommand command;
+  private final Project.NameKey project;
+  private final BatchInput batchInput;
+
+  @Inject
+  public DeleteRefJob(
+      DeleteRefCommand command,
+      @Assisted Project.NameKey project,
+      @Assisted BatchInput batchInput) {
+    this.command = command;
+    this.project = project;
+    this.batchInput = batchInput;
+  }
+
+  @Override
+  public void run() {
+    command.deleteRefsSync(project, batchInput.getDeletedRefNames(), batchInput.label);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index 38d0cdd..9cb9285 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -16,7 +16,9 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Strings;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -30,6 +32,9 @@
 import com.google.gerrit.server.git.WorkQueue.Task;
 import com.google.gerrit.server.ioutil.HexFormat;
 import com.google.gerrit.server.project.ProjectResource;
+import com.google.gson.Gson;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.SerializedName;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
@@ -39,6 +44,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jgit.errors.TransportException;
@@ -46,43 +52,92 @@
 @Singleton
 public class FetchAction implements RestModifyView<ProjectResource, Input> {
   private final FetchCommand command;
+  private final DeleteRefCommand deleteRefCommand;
   private final WorkQueue workQueue;
   private final DynamicItem<UrlFormatter> urlFormatter;
   private final FetchPreconditions preConditions;
   private final Factory fetchJobFactory;
+  private final DeleteRefJob.Factory deleteJobFactory;
 
   @Inject
   public FetchAction(
       FetchCommand command,
+      DeleteRefCommand deleteRefCommand,
       WorkQueue workQueue,
       DynamicItem<UrlFormatter> urlFormatter,
       FetchPreconditions preConditions,
-      FetchJob.Factory fetchJobFactory) {
+      FetchJob.Factory fetchJobFactory,
+      DeleteRefJob.Factory deleteJobFactory) {
     this.command = command;
+    this.deleteRefCommand = deleteRefCommand;
     this.workQueue = workQueue;
     this.urlFormatter = urlFormatter;
     this.preConditions = preConditions;
     this.fetchJobFactory = fetchJobFactory;
+    this.deleteJobFactory = deleteJobFactory;
   }
 
   public static class Input {
     public String label;
     public String refName;
     public boolean async;
+    public boolean isDelete;
+  }
+
+  @AutoValue
+  public abstract static class RefInput {
+    public static final Predicate<RefInput> IS_DELETE = RefInput::isDelete;
+
+    @Nullable
+    @SerializedName("ref_name")
+    public abstract String refName();
+
+    @SerializedName("is_delete")
+    public abstract boolean isDelete();
+
+    public static RefInput create(@Nullable String refName, boolean isDelete) {
+      return new AutoValue_FetchAction_RefInput(refName, isDelete);
+    }
+
+    public static RefInput create(@Nullable String refName) {
+      return new AutoValue_FetchAction_RefInput(refName, false);
+    }
+
+    public static TypeAdapter<RefInput> typeAdapter(Gson gson) {
+      return new AutoValue_FetchAction_RefInput.GsonTypeAdapter(gson);
+    }
   }
 
   public static class BatchInput {
     public String label;
-    public Set<String> refsNames;
+    public Set<RefInput> refInputs;
     public boolean async;
 
     public static BatchInput fromInput(Input... input) {
       BatchInput batchInput = new BatchInput();
       batchInput.async = input[0].async;
       batchInput.label = input[0].label;
-      batchInput.refsNames = Stream.of(input).map(i -> i.refName).collect(Collectors.toSet());
+      batchInput.refInputs =
+          Stream.of(input)
+              .map(i -> RefInput.create(i.refName, i.isDelete))
+              .collect(Collectors.toSet());
       return batchInput;
     }
+
+    private Set<String> getFilteredRefNames(Predicate<RefInput> filterFunc) {
+      return refInputs.stream()
+          .filter(filterFunc)
+          .map(RefInput::refName)
+          .collect(Collectors.toSet());
+    }
+
+    public Set<String> getNonDeletedRefNames() {
+      return getFilteredRefNames(RefInput.IS_DELETE.negate());
+    }
+
+    public Set<String> getDeletedRefNames() {
+      return getFilteredRefNames(RefInput.IS_DELETE);
+    }
   }
 
   @Override
@@ -101,12 +156,12 @@
         throw new BadRequestException("Source label cannot be null or empty");
       }
 
-      if (batchInput.refsNames.isEmpty()) {
+      if (batchInput.refInputs.isEmpty()) {
         throw new BadRequestException("Ref-update refname cannot be null or empty");
       }
 
-      for (String refName : batchInput.refsNames) {
-        if (Strings.isNullOrEmpty(refName)) {
+      for (RefInput input : batchInput.refInputs) {
+        if (Strings.isNullOrEmpty(input.refName())) {
           throw new BadRequestException("Ref-update refname cannot be null or empty");
         }
       }
@@ -129,7 +184,16 @@
   private Response<?> applySync(Project.NameKey project, BatchInput input)
       throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
           TimeoutException, TransportException {
-    command.fetchSync(project, input.label, input.refsNames);
+    command.fetchSync(project, input.label, input.getNonDeletedRefNames());
+
+    /* git fetches and deletes cannot be handled atomically within the same transaction.
+    Here we choose to handle fetches first and then deletes:
+    - If the fetch fails delete is not even attempted.
+    - If the delete fails after the fetch then the client is left with some extra refs.
+    */
+    if (!input.getDeletedRefNames().isEmpty()) {
+      deleteRefCommand.deleteRefsSync(project, input.getDeletedRefNames(), input.label);
+    }
     return Response.created(input);
   }
 
@@ -146,6 +210,10 @@
         urlFormatter
             .get()
             .getRestUrl("a/config/server/tasks/" + HexFormat.fromInt(task.getTaskId()));
+
+    if (!batchInput.getDeletedRefNames().isEmpty()) {
+      workQueue.getDefaultQueue().submit(deleteJobFactory.create(project, batchInput));
+    }
     // We're in a HTTP handler, so must be present.
     checkState(url.isPresent());
     return Response.accepted(url.get());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
index 0975045..d30ba74 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchJob.java
@@ -52,7 +52,7 @@
   @Override
   public void run() {
     try {
-      command.fetchAsync(project, batchInput.label, batchInput.refsNames, metrics);
+      command.fetchAsync(project, batchInput.label, batchInput.getNonDeletedRefNames(), metrics);
     } catch (InterruptedException
         | ExecutionException
         | RemoteConfigurationMissingException
@@ -60,7 +60,7 @@
         | TransportException e) {
       log.atSevere().withCause(e).log(
           "Exception during the async fetch call for project %s, label %s and ref(s) name(s) %s",
-          project.get(), batchInput.label, batchInput.refsNames);
+          project.get(), batchInput.label, batchInput.getNonDeletedRefNames());
     }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpPayloadGsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpPayloadGsonProvider.java
new file mode 100644
index 0000000..cf1bd86
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/HttpPayloadGsonProvider.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.json.OutputFormat;
+import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.replication.pull.AutoValueTypeAdapterFactory;
+
+public class HttpPayloadGsonProvider {
+
+  public static Gson get() {
+    return OutputFormat.JSON
+        .newGsonBuilder()
+        .registerTypeAdapterFactory(AutoValueTypeAdapterFactory.create())
+        .create();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
index 368e61a..9618168 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilter.java
@@ -38,7 +38,6 @@
 import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
 import com.google.gerrit.httpd.AllRequestFilter;
 import com.google.gerrit.httpd.restapi.RestApiServlet;
-import com.google.gerrit.json.OutputFormat;
 import com.google.gerrit.server.AnonymousUser;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -117,7 +116,7 @@
     this.projectDeletionAction = projectDeletionAction;
     this.projectCache = projectCache;
     this.pluginName = pluginName;
-    this.gson = OutputFormat.JSON.newGsonBuilder().create();
+    this.gson = HttpPayloadGsonProvider.get();
     this.currentUserProvider = currentUserProvider;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/util/PayloadSerDes.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/util/PayloadSerDes.java
index 414af37..d188247 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/util/PayloadSerDes.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/util/PayloadSerDes.java
@@ -22,11 +22,11 @@
 import com.google.gerrit.extensions.restapi.BadRequestException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.httpd.restapi.RestApiServlet;
-import com.google.gerrit.json.OutputFormat;
 import com.google.gson.Gson;
 import com.google.gson.stream.JsonReader;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction;
+import com.googlesource.gerrit.plugins.replication.pull.api.HttpPayloadGsonProvider;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionsInput;
 import java.io.BufferedReader;
@@ -38,7 +38,7 @@
 
 public class PayloadSerDes {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final Gson gson = OutputFormat.JSON.newGsonBuilder().create();
+  private static final Gson gson = HttpPayloadGsonProvider.get();
 
   public static RevisionInput parseRevisionInput(HttpServletRequest httpRequest)
       throws BadRequestException, IOException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
index 28c74c9..b908435 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchApiClient.java
@@ -19,6 +19,7 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.Project.NameKey;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import java.io.IOException;
@@ -46,11 +47,11 @@
   }
 
   HttpResult callBatchFetch(
-      Project.NameKey project, List<String> refsInBatch, URIish targetUri, long startTimeNanos)
+      Project.NameKey project, List<RefInput> refsInBatch, URIish targetUri, long startTimeNanos)
       throws IOException;
 
   default HttpResult callBatchFetch(
-      Project.NameKey project, List<String> refsInBatch, URIish targetUri) throws IOException {
+      Project.NameKey project, List<RefInput> refsInBatch, URIish targetUri) throws IOException {
     return callBatchFetch(
         project, refsInBatch, targetUri, MILLISECONDS.toNanos(System.currentTimeMillis()));
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index 614774d..cb03606 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -36,6 +36,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiRequestMetrics;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
@@ -151,19 +152,22 @@
     return executeRequest(post, bearerTokenProvider.get(), targetUri);
   }
 
-  private Boolean containsSyncFetchRef(List<String> refsInBatch) {
-    return refsInBatch.stream().anyMatch(syncRefsFilter::match);
+  private Boolean containsSyncFetchRef(List<RefInput> refsInBatch) {
+    return refsInBatch.stream().anyMatch(r -> syncRefsFilter.match(r.refName()));
   }
 
   @Override
   public HttpResult callBatchFetch(
-      NameKey project, List<String> refsInBatch, URIish targetUri, long startTimeNanos)
+      NameKey project, List<RefInput> refsInBatch, URIish targetUri, long startTimeNanos)
       throws IOException {
     boolean callAsync = !containsSyncFetchRef(refsInBatch);
-    String refsNamesBody = refsInBatch.stream().collect(Collectors.joining("\",\"", "\"", "\""));
+    String refsNamesBody =
+        refsInBatch.stream()
+            .map(r -> "{\"ref_name\":\"" + r.refName() + "\", \"is_delete\":" + r.isDelete() + "}")
+            .collect(Collectors.joining(","));
     String msgBody =
         String.format(
-            "{\"label\":\"%s\", \"refs_names\": [ %s ], \"async\":%s}",
+            "{\"label\":\"%s\", \"ref_inputs\": [ %s ], \"async\":%s}",
             instanceId, refsNamesBody, callAsync);
 
     String url = formatUrl(targetUri.toString(), project, "batch-fetch");
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index abc96ca..3b76be9 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -609,6 +609,10 @@
 >	*NOTE*: if any ref from a single batch matches `replication.syncRefs`
 >	filter, all refs in that batch are going to be fetched synchronously as
 >	a single git fetch operation.
+
+>	*NOTE*: Should ref deletions over apply-object/HTTP, they will
+>	be attempted over fetch/HTTP endpoint only when `enableBatchedRefs` is
+>	enabled.
 >
 >	By default, true.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
index 84fee55..f494c2f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -51,6 +51,7 @@
 import com.googlesource.gerrit.plugins.replication.MergedConfigResource;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfigImpl;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchApiClient;
@@ -65,6 +66,7 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -503,7 +505,7 @@
     verify(fetchRestApiClient)
         .callBatchFetch(
             PROJECT,
-            List.of("refs/changes/01/1/1", "refs/changes/02/1/1"),
+            Stream.of("refs/changes/01/1/1", "refs/changes/02/1/1").map(RefInput::create).toList(),
             new URIish("http://localhost:18080"));
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
index 9dc736f..23b0fe1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BatchFetchActionTest.java
@@ -22,11 +22,14 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.extensions.restapi.MergeConflictException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestApiException;
 import com.google.gerrit.server.project.ProjectResource;
-import java.util.Set;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
+import java.util.Arrays;
+import java.util.stream.Collectors;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -103,10 +106,12 @@
     return input;
   }
 
+  @VisibleForTesting
   private FetchAction.BatchInput createBatchInput(String... refNames) {
     FetchAction.BatchInput batchInput = new FetchAction.BatchInput();
     batchInput.label = label;
-    batchInput.refsNames = Set.of(refNames);
+    batchInput.refInputs =
+        Arrays.stream(refNames).map(RefInput::create).collect(Collectors.toSet());
     return batchInput;
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
index 9653209..a4642ba 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -19,7 +19,9 @@
 import static org.apache.http.HttpStatus.SC_CREATED;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.gerrit.extensions.registration.DynamicItem;
@@ -32,6 +34,8 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.git.WorkQueue.Task;
 import com.google.gerrit.server.project.ProjectResource;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.BatchInput;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
 import java.util.Optional;
 import java.util.Set;
@@ -57,8 +61,11 @@
   int taskId = 1234;
 
   @Mock FetchCommand fetchCommand;
+  @Mock DeleteRefCommand deleteRefCommand;
   @Mock FetchJob fetchJob;
+  @Mock DeleteRefJob deleteRefJob;
   @Mock FetchJob.Factory fetchJobFactory;
+  @Mock DeleteRefJob.Factory deleteRefJobFactory;
   @Mock ProjectResource projectResource;
   @Mock WorkQueue workQueue;
   @Mock ScheduledExecutorService exceutorService;
@@ -70,6 +77,7 @@
   @Before
   public void setup() throws Exception {
     when(fetchJobFactory.create(any(), any(), any())).thenReturn(fetchJob);
+    when(deleteRefJobFactory.create(any(), any())).thenReturn(deleteRefJob);
     when(workQueue.getDefaultQueue()).thenReturn(exceutorService);
     when(urlFormatter.getRestUrl(anyString())).thenReturn(Optional.of(location));
     when(exceutorService.submit(any(Runnable.class)))
@@ -86,7 +94,13 @@
 
     fetchAction =
         new FetchAction(
-            fetchCommand, workQueue, urlFormatterDynamicItem, preConditions, fetchJobFactory);
+            fetchCommand,
+            deleteRefCommand,
+            workQueue,
+            urlFormatterDynamicItem,
+            preConditions,
+            fetchJobFactory,
+            deleteRefJobFactory);
   }
 
   @Test
@@ -104,13 +118,38 @@
   public void shouldReturnCreatedResponseCodeForBatchRefFetchAction() throws Exception {
     FetchAction.BatchInput batchInputParams = new FetchAction.BatchInput();
     batchInputParams.label = label;
-    batchInputParams.refsNames = Set.of(refName, altRefName);
+    batchInputParams.refInputs = Set.of(RefInput.create(refName), RefInput.create(altRefName));
 
     Response<?> response = fetchAction.apply(projectResource, batchInputParams);
 
     assertThat(response.statusCode()).isEqualTo(SC_CREATED);
   }
 
+  @Test
+  public void shouldDeleteRefSync() throws Exception {
+    FetchAction.BatchInput batchInputParams = new FetchAction.BatchInput();
+    batchInputParams.label = label;
+    batchInputParams.refInputs = Set.of(RefInput.create(refName, true));
+
+    Response<?> response = fetchAction.apply(projectResource, batchInputParams);
+    verify(deleteRefCommand).deleteRefsSync(any(), eq(Set.of(refName)), eq(label));
+
+    assertThat(response.statusCode()).isEqualTo(SC_CREATED);
+  }
+
+  @Test
+  public void shouldDeleteRefAsync() throws Exception {
+    FetchAction.BatchInput batchInputParams = new FetchAction.BatchInput();
+    batchInputParams.label = label;
+    batchInputParams.async = true;
+    batchInputParams.refInputs = Set.of(RefInput.create(refName, true));
+
+    Response<?> response = fetchAction.apply(projectResource, batchInputParams);
+    verify(deleteRefJobFactory).create(any(), eq(batchInputParams));
+
+    assertThat(response.statusCode()).isEqualTo(SC_ACCEPTED);
+  }
+
   @SuppressWarnings("cast")
   @Test
   public void shouldReturnSourceUrlAndrefNameAsAResponseBody() throws Exception {
@@ -120,11 +159,11 @@
 
     Response<?> response = fetchAction.apply(projectResource, inputParams);
 
-    FetchAction.BatchInput responseBatchInput = (FetchAction.BatchInput) response.value();
+    BatchInput responseBatchInput = (BatchInput) response.value();
 
     assertThat(responseBatchInput.label).isEqualTo(inputParams.label);
     assertThat(responseBatchInput.async).isEqualTo(inputParams.async);
-    assertThat(responseBatchInput.refsNames).containsExactly(inputParams.refName);
+    assertThat(responseBatchInput.refInputs).containsExactly(RefInput.create(inputParams.refName));
   }
 
   @Test(expected = BadRequestException.class)
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
index b193cd7..bc9e870 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationFilterTest.java
@@ -145,7 +145,7 @@
     byte[] payloadBatchFetch =
         ("{"
                 + "\"label\":\"Replication\", "
-                + "\"refs_names\": [ \"refs/heads/master\" , \"refs/heads/test\" ], "
+                + "\"ref_inputs\": [ {\"ref_name\":\"refs/heads/master\", \"is_delete\":false}, {\"ref_name\":\"refs/heads/test\", \"is_delete\":false} ], "
                 + "\"async\":false"
                 + "}")
             .getBytes(StandardCharsets.UTF_8);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
index 3aa5b5a..1c7b553 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientBase.java
@@ -31,6 +31,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.pull.BearerTokenProvider;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.RefInput;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.BatchApplyObjectData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionData;
 import com.googlesource.gerrit.plugins.replication.pull.api.data.RevisionObjectData;
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.http.Header;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.HttpDelete;
@@ -185,7 +187,7 @@
 
     objectUnderTest.callBatchFetch(
         Project.nameKey("test_repo"),
-        List.of(refName, RefNames.REFS_HEADS + "test"),
+        List.of(RefInput.create(refName), RefInput.create(RefNames.REFS_HEADS + "test")),
         new URIish(api));
 
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
@@ -253,19 +255,21 @@
             source);
 
     String testRef = RefNames.REFS_HEADS + "test";
-    List<String> refs = List.of(refName, testRef);
+    List<RefInput> refs = refInputs(refName, testRef);
     objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
 
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     String expectedPayload =
-        "{\"label\":\"Replication\", \"refs_names\": [ "
-            + '"'
+        "{\"label\":\"Replication\", \"ref_inputs\": ["
+            + " {\"ref_name\":\""
             + refName
-            + "\",\""
+            + "\", \"is_delete\":false}"
+            + ",{\"ref_name\":\""
             + testRef
-            + "\" ]"
+            + "\", \"is_delete\":false}"
+            + " ]"
             + ", \"async\":true}";
     assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
   }
@@ -305,7 +309,10 @@
   public void shouldCallSyncBatchFetchOnlyForMetaRef() throws Exception {
     String metaRefName = "refs/changes/01/101/meta";
     String expectedMetaRefPayload =
-        "{\"label\":\"Replication\", \"refs_names\": [ \"" + metaRefName + "\" ], \"async\":false}";
+        "{\"label\":\"Replication\", \"ref_inputs\": [ "
+            + "{\"ref_name\":\""
+            + metaRefName
+            + "\", \"is_delete\":false} ], \"async\":false}";
 
     when(config.getStringList("replication", null, "syncRefs"))
         .thenReturn(new String[] {"^refs\\/changes\\/.*\\/meta"});
@@ -322,7 +329,7 @@
             source);
 
     objectUnderTest.callBatchFetch(
-        Project.nameKey("test_repo"), List.of(metaRefName), new URIish(api));
+        Project.nameKey("test_repo"), List.of(RefInput.create(metaRefName)), new URIish(api));
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
     HttpPost httpPost = httpPostCaptor.getValue();
     assertThat(readPayload(httpPost)).isEqualTo(expectedMetaRefPayload);
@@ -343,19 +350,23 @@
   public void shouldCallBatchFetchEndpointWithPayload() throws Exception {
 
     String testRef = RefNames.REFS_HEADS + "test";
-    List<String> refs = List.of(refName, testRef);
+    List<RefInput> refs = refInputs(refName, testRef);
     objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), refs, new URIish(api));
 
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
     HttpPost httpPost = httpPostCaptor.getValue();
     String expectedPayload =
-        "{\"label\":\"Replication\", \"refs_names\": [ "
-            + '"'
+        "{\"label\":\"Replication\", \"ref_inputs\": [ "
+            + "{\"ref_name\":\""
             + refName
-            + "\",\""
-            + refs.get(1)
-            + "\" ], \"async\":false}";
+            + "\", \"is_delete\":false}"
+            + ",{\"ref_name\":\""
+            + refs.get(1).refName()
+            + "\", \"is_delete\":"
+            + refs.get(1).isDelete()
+            + "}"
+            + " ], \"async\":false}";
     assertThat(readPayload(httpPost)).isEqualTo(expectedPayload);
   }
 
@@ -366,7 +377,7 @@
         .thenReturn(new String[] {"^refs\\/heads\\/test"});
     syncRefsFilter = new SyncRefsFilter(replicationConfig);
     String testRef = RefNames.REFS_HEADS + "test";
-    List<String> refs = List.of(refName, testRef);
+    List<RefInput> refs = refInputs(refName, testRef);
     objectUnderTest =
         new FetchRestApiClient(
             credentials,
@@ -383,8 +394,16 @@
 
     HttpPost httpPosts = httpPostCaptor.getValue();
     String expectedSyncPayload =
-        "{\"label\":\"Replication\", \"refs_names\": [ "
-            + refs.stream().map(r -> '"' + r + '"').collect(Collectors.joining(","))
+        "{\"label\":\"Replication\", \"ref_inputs\": [ "
+            + refs.stream()
+                .map(
+                    r ->
+                        "{\"ref_name\":\""
+                            + r.refName()
+                            + "\", \"is_delete\":"
+                            + r.isDelete()
+                            + "}")
+                .collect(Collectors.joining(","))
             + " ], \"async\":false}";
 
     assertThat(readPayload(httpPosts)).isEqualTo(expectedSyncPayload);
@@ -405,7 +424,8 @@
   @Test
   public void shouldSetContentTypeHeaderInBatchFetch() throws Exception {
 
-    objectUnderTest.callBatchFetch(Project.nameKey("test_repo"), List.of(refName), new URIish(api));
+    objectUnderTest.callBatchFetch(
+        Project.nameKey("test_repo"), refInputs(refName), new URIish(api));
 
     verify(httpClient, times(1)).execute(httpPostCaptor.capture(), any());
 
@@ -733,4 +753,8 @@
     return createSampleRevisionData(
         commitObjectId, commitObject, treeObjectId, treeObject, blobObjectId, blobObject);
   }
+
+  private List<RefInput> refInputs(String... refs) {
+    return Stream.of(refs).map(RefInput::create).collect(Collectors.toList());
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
index 0f4bd33..c81820e 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListenerTest.java
@@ -186,7 +186,7 @@
 
     FetchAction.BatchInput batchInput = batchInputCaptor.getValue();
     assertThat(batchInput.label).isEqualTo(REMOTE_INSTANCE_ID);
-    assertThat(batchInput.refsNames).contains(TEST_REF_NAME);
+    assertThat(batchInput.refInputs).contains(FetchAction.RefInput.create(TEST_REF_NAME));
 
     verify(executor).submit(any(FetchJob.class));
   }
@@ -259,7 +259,7 @@
 
     FetchAction.BatchInput input = batchInputCaptor.getValue();
     assertThat(input.label).isEqualTo(REMOTE_INSTANCE_ID);
-    assertThat(input.refsNames).contains(FetchOne.ALL_REFS);
+    assertThat(input.refInputs).contains(FetchAction.RefInput.create(FetchOne.ALL_REFS));
 
     verify(executor).submit(any(FetchJob.class));
   }