Merge "Add an interface to allow executing pending draft updates together"
diff --git a/java/com/google/gerrit/server/ChangeDraftUpdate.java b/java/com/google/gerrit/server/ChangeDraftUpdate.java
index eb33fb5..71e2da3 100644
--- a/java/com/google/gerrit/server/ChangeDraftUpdate.java
+++ b/java/com/google/gerrit/server/ChangeDraftUpdate.java
@@ -49,11 +49,7 @@
    * Marks a comment for deletion. Called when the comment is deleted because the user published it.
    *
    * <p>NOTE for implementers: The actual deletion of a published draft should only happen after the
-   * published comment is successfully updated. For more context, see {@link
-   * com.google.gerrit.server.notedb.NoteDbUpdateManager#execute(boolean)}.
-   *
-   * <p>TODO(nitzan) - add generalized support for the above sync issue. The implementation should
-   * support deletion of published drafts from multiple ChangeDraftUpdateFactory instances.
+   * published comment is successfully updated. Please use {@link ChangeDraftUpdateExecutor}.
    */
   void markDraftCommentAsPublished(HumanComment c);
 
@@ -67,4 +63,12 @@
    * comments storage and the drafts one.
    */
   void addAllDraftCommentsForDeletion(List<Comment> comments);
+
+  /** Whether all updates in this updater can run asynchronously. */
+  boolean canRunAsync();
+
+  /**
+   * A unique identifier for the draft, used by the storage system. For example, NoteDB's ref name.
+   */
+  String getStorageKey();
 }
diff --git a/java/com/google/gerrit/server/ChangeDraftUpdateExecutor.java b/java/com/google/gerrit/server/ChangeDraftUpdateExecutor.java
new file mode 100644
index 0000000..3ab3a13
--- /dev/null
+++ b/java/com/google/gerrit/server/ChangeDraftUpdateExecutor.java
@@ -0,0 +1,121 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.server;
+
+import static autovalue.shaded.com.google$.common.collect.$ImmutableList.toImmutableList;
+
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.MultimapBuilder;
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.server.update.BatchUpdateListener;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Function;
+import org.eclipse.jgit.lib.BatchRefUpdate;
+import org.eclipse.jgit.lib.PersonIdent;
+import org.eclipse.jgit.transport.PushCertificate;
+
+/**
+ * An interface for executing updates of multiple {@link ChangeDraftUpdate} instances.
+ *
+ * <p>Expected usage flow:
+ *
+ * <ol>
+ *   <li>Inject an instance of {@link AbstractFactory}.
+ *   <li>Create an instance of this interface using the factory.
+ *   <li>Call ({@link #queueAllDraftUpdates} or {@link #queueDeletionForChangeDrafts} for all
+ *       expected updates. The changes are marked to be executed either synchronously or
+ *       asynchronously, based on {@link #canRunAsync}.
+ *   <li>Call both {@link #executeAllSyncUpdates} and {@link #executeAllAsyncUpdates} methods.
+ *       Running these methods with no pending updates is a no-op.
+ * </ol>
+ */
+public interface ChangeDraftUpdateExecutor {
+  interface AbstractFactory<T extends ChangeDraftUpdateExecutor> {
+    T create();
+  }
+
+  /**
+   * Queues all provided updates for later execution.
+   *
+   * <p>The updates are queued to either run synchronously just after change repositories updates,
+   * or to run asynchronously afterwards, based on {@link #canRunAsync}.
+   */
+  void queueAllDraftUpdates(ListMultimap<String, ChangeDraftUpdate> updates) throws IOException;
+
+  /**
+   * Extracts all drafts (of all authors) for the given change and queue their deletion.
+   *
+   * <p>See {@link #canRunAsync} for whether the deletions are scheduled as synchronous or
+   * asynchronous.
+   */
+  void queueDeletionForChangeDrafts(Change.Id id) throws IOException;
+
+  /**
+   * Execute all previously queued sync updates.
+   *
+   * <p>NOTE that {@link BatchUpdateListener#beforeUpdateRefs} events are not fired by this method.
+   * post-update events can be fired by the caller only for implementations that return a valid
+   * {@link BatchRefUpdate}.
+   *
+   * @param dryRun whether this is a dry run - i.e. no updates should be made
+   * @param refLogIdent user to log as the update creator
+   * @param refLogMessage message to put in the updates log
+   * @return the executed update, if supported by the implementing class
+   * @throws IOException in case of an update failure.
+   */
+  Optional<BatchRefUpdate> executeAllSyncUpdates(
+      boolean dryRun, @Nullable PersonIdent refLogIdent, @Nullable String refLogMessage)
+      throws IOException;
+
+  /**
+   * Execute all previously queued async updates.
+   *
+   * @param refLogIdent user to log as the update creator
+   * @param refLogMessage message to put in the updates log
+   * @param pushCert to use for the update
+   */
+  void executeAllAsyncUpdates(
+      @Nullable PersonIdent refLogIdent,
+      @Nullable String refLogMessage,
+      @Nullable PushCertificate pushCert);
+
+  /** Returns whether any updates are queued. */
+  boolean isEmpty();
+
+  /** Returns the given updates that match the provided type. */
+  default <UpdateT extends ChangeDraftUpdate> ListMultimap<String, UpdateT> filterTypedUpdates(
+      ListMultimap<String, ChangeDraftUpdate> updates,
+      Function<ChangeDraftUpdate, Boolean> isSubtype,
+      Function<ChangeDraftUpdate, UpdateT> toSubtype) {
+    ListMultimap<String, UpdateT> res = MultimapBuilder.hashKeys().arrayListValues().build();
+    for (String key : updates.keySet()) {
+      res.putAll(
+          key,
+          updates.get(key).stream()
+              .filter(u -> isSubtype.apply(u))
+              .map(u -> toSubtype.apply(u))
+              .collect(toImmutableList()));
+    }
+    return res;
+  }
+
+  /** Returns whether all provided updates can run asynchronously. */
+  default boolean canRunAsync(Collection<? extends ChangeDraftUpdate> updates) {
+    return updates.stream().allMatch(u -> u.canRunAsync());
+  }
+}
diff --git a/java/com/google/gerrit/server/notedb/ChangeDraftNotesUpdate.java b/java/com/google/gerrit/server/notedb/ChangeDraftNotesUpdate.java
index 8faca67..b32158b 100644
--- a/java/com/google/gerrit/server/notedb/ChangeDraftNotesUpdate.java
+++ b/java/com/google/gerrit/server/notedb/ChangeDraftNotesUpdate.java
@@ -16,9 +16,12 @@
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.gerrit.server.logging.TraceContext.newTimer;
 import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.entities.Account;
 import com.google.gerrit.entities.Change;
@@ -28,8 +31,14 @@
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.server.ChangeDraftUpdate;
+import com.google.gerrit.server.ChangeDraftUpdateExecutor;
 import com.google.gerrit.server.GerritPersonIdent;
 import com.google.gerrit.server.config.AllUsersName;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.logging.Metadata;
+import com.google.gerrit.server.logging.TraceContext;
+import com.google.gerrit.server.update.BatchUpdateListener;
+import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 import java.io.IOException;
@@ -41,12 +50,16 @@
 import java.util.Map;
 import java.util.Optional;
 import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.BatchRefUpdate;
 import org.eclipse.jgit.lib.CommitBuilder;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectInserter;
 import org.eclipse.jgit.lib.PersonIdent;
+import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.notes.NoteMap;
 import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.transport.PushCertificate;
+import org.eclipse.jgit.transport.ReceiveCommand;
 
 /**
  * A single delta to apply atomically to a change.
@@ -92,6 +105,114 @@
     return new AutoValue_ChangeDraftNotesUpdate_Key(c.getCommitId(), c.key);
   }
 
+  static class Executor implements ChangeDraftUpdateExecutor, AutoCloseable {
+    interface Factory extends ChangeDraftUpdateExecutor.AbstractFactory<Executor> {}
+
+    private final GitRepositoryManager repoManager;
+    private final AllUsersName allUsersName;
+    private final NoteDbUpdateExecutor noteDbUpdateExecutor;
+    private final AllUsersAsyncUpdate updateAllUsersAsync;
+    private OpenRepo allUsersRepo;
+    private boolean shouldAllowFastForward = false;
+
+    @Inject
+    Executor(
+        GitRepositoryManager repoManager,
+        AllUsersName allUsersName,
+        NoteDbUpdateExecutor noteDbUpdateExecutor,
+        AllUsersAsyncUpdate updateAllUsersAsync) {
+      this.updateAllUsersAsync = updateAllUsersAsync;
+      this.repoManager = repoManager;
+      this.allUsersName = allUsersName;
+      this.noteDbUpdateExecutor = noteDbUpdateExecutor;
+    }
+
+    @Override
+    public void queueAllDraftUpdates(ListMultimap<String, ChangeDraftUpdate> updaters)
+        throws IOException {
+      ListMultimap<String, ChangeDraftNotesUpdate> noteDbUpdaters =
+          filterTypedUpdates(
+              updaters, u -> u instanceof ChangeDraftNotesUpdate, u -> (ChangeDraftNotesUpdate) u);
+      if (canRunAsync(noteDbUpdaters.values())) {
+        updateAllUsersAsync.setDraftUpdates(noteDbUpdaters);
+      } else {
+        initAllUsersRepoIfNull();
+        shouldAllowFastForward = true;
+        allUsersRepo.addUpdatesNoLimits(noteDbUpdaters);
+      }
+    }
+
+    @Override
+    public void queueDeletionForChangeDrafts(Change.Id id) throws IOException {
+      initAllUsersRepoIfNull();
+      // Just scan repo for ref names, but get "old" values from cmds.
+      for (Ref r :
+          allUsersRepo
+              .repo
+              .getRefDatabase()
+              .getRefsByPrefix(RefNames.refsDraftCommentsPrefix(id))) {
+        Optional<ObjectId> old = allUsersRepo.cmds.get(r.getName());
+        old.ifPresent(
+            objectId ->
+                allUsersRepo.cmds.add(
+                    new ReceiveCommand(objectId, ObjectId.zeroId(), r.getName())));
+      }
+    }
+
+    /**
+     * Note this method does not fire {@link BatchUpdateListener#beforeUpdateRefs} events. However,
+     * since the {@link BatchRefUpdate} object is returned, {@link
+     * BatchUpdateListener#afterUpdateRefs} can be fired by the caller.
+     */
+    @Override
+    public Optional<BatchRefUpdate> executeAllSyncUpdates(
+        boolean dryRun, @Nullable PersonIdent refLogIdent, @Nullable String refLogMessage)
+        throws IOException {
+      if (allUsersRepo == null) {
+        return Optional.empty();
+      }
+      try (TraceContext.TraceTimer ignored =
+          newTimer("ChangeDraftNotesUpdate#Executor#updateAllUsersSync", Metadata.empty())) {
+        return noteDbUpdateExecutor.execute(
+            allUsersRepo,
+            dryRun,
+            shouldAllowFastForward,
+            /* batchUpdateListeners= */ ImmutableList.of(),
+            /* pushCert= */ null,
+            refLogIdent,
+            refLogMessage);
+      }
+    }
+
+    @Override
+    public void executeAllAsyncUpdates(
+        @Nullable PersonIdent refLogIdent,
+        @Nullable String refLogMessage,
+        @Nullable PushCertificate pushCert) {
+      updateAllUsersAsync.execute(refLogIdent, refLogMessage, pushCert);
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return (allUsersRepo == null || allUsersRepo.cmds.isEmpty()) && updateAllUsersAsync.isEmpty();
+    }
+
+    @Override
+    public void close() throws Exception {
+      if (allUsersRepo != null) {
+        OpenRepo r = allUsersRepo;
+        allUsersRepo = null;
+        r.close();
+      }
+    }
+
+    private void initAllUsersRepoIfNull() throws IOException {
+      if (allUsersRepo == null) {
+        allUsersRepo = OpenRepo.open(repoManager, allUsersName);
+      }
+    }
+  }
+
   private final AllUsersName draftsProject;
 
   private List<HumanComment> put = new ArrayList<>();
@@ -157,6 +278,14 @@
         });
   }
 
+  /**
+   * Returns whether all the updates in this instance can run asynchronously.
+   *
+   * <p>An update can run asynchronously only if it contains nothing but {@code PUBLISHED} or {@code
+   * FIXED} draft deletions. User-initiated inversions/deletions must run synchronously in order to
+   * return status.
+   */
+  @Override
   public boolean canRunAsync() {
     return put.isEmpty()
         && delete.values().stream()
@@ -287,6 +416,11 @@
   }
 
   @Override
+  public String getStorageKey() {
+    return getRefName();
+  }
+
+  @Override
   protected void setParentCommit(CommitBuilder cb, ObjectId parentCommitId) {
     cb.setParentIds(); // Draft updates should not keep history of parent commits
   }
@@ -295,15 +429,4 @@
   public boolean isEmpty() {
     return delete.isEmpty() && put.isEmpty();
   }
-
-  public static Optional<ChangeDraftNotesUpdate> asChangeDraftNotesUpdate(
-      @Nullable ChangeDraftUpdate obj) {
-    if (obj == null) {
-      return Optional.empty();
-    }
-    if (obj instanceof ChangeDraftNotesUpdate) {
-      return Optional.of((ChangeDraftNotesUpdate) obj);
-    }
-    return Optional.empty();
-  }
 }
diff --git a/java/com/google/gerrit/server/notedb/NoteDbModule.java b/java/com/google/gerrit/server/notedb/NoteDbModule.java
index c7e16ed..8a73f7b 100644
--- a/java/com/google/gerrit/server/notedb/NoteDbModule.java
+++ b/java/com/google/gerrit/server/notedb/NoteDbModule.java
@@ -17,6 +17,7 @@
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.server.ChangeDraftUpdateExecutor;
 import com.google.gerrit.server.DraftCommentsReader;
 import com.google.gerrit.server.StarredChangesReader;
 import com.google.gerrit.server.StarredChangesWriter;
@@ -42,6 +43,7 @@
   @Override
   public void configure() {
     factory(ChangeDraftNotesUpdate.Factory.class);
+    factory(ChangeDraftNotesUpdate.Executor.Factory.class);
     factory(ChangeUpdate.Factory.class);
     factory(DeleteCommentRewriter.Factory.class);
     factory(DraftCommentNotes.Factory.class);
@@ -51,6 +53,9 @@
     bind(StarredChangesReader.class).to(StarredChangesUtilNoteDbImpl.class).in(Singleton.class);
     bind(StarredChangesWriter.class).to(StarredChangesUtilNoteDbImpl.class).in(Singleton.class);
     bind(DraftCommentsReader.class).to(DraftCommentsNotesReader.class).in(Singleton.class);
+    bind(ChangeDraftUpdateExecutor.AbstractFactory.class)
+        .to(ChangeDraftNotesUpdate.Executor.Factory.class)
+        .in(Singleton.class);
 
     if (!useTestBindings) {
       install(ChangeNotesCache.module());
diff --git a/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java b/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java
index def2763..ae18bb7 100644
--- a/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java
+++ b/java/com/google/gerrit/server/notedb/NoteDbUpdateManager.java
@@ -33,6 +33,8 @@
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.exceptions.StorageException;
 import com.google.gerrit.metrics.Timer0;
+import com.google.gerrit.server.ChangeDraftUpdate;
+import com.google.gerrit.server.ChangeDraftUpdateExecutor;
 import com.google.gerrit.server.cancellation.RequestStateContext;
 import com.google.gerrit.server.cancellation.RequestStateContext.NonCancellableOperationContext;
 import com.google.gerrit.server.config.AllUsersName;
@@ -56,7 +58,6 @@
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.lib.ObjectInserter;
 import org.eclipse.jgit.lib.PersonIdent;
-import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevWalk;
 import org.eclipse.jgit.transport.PushCertificate;
@@ -87,20 +88,20 @@
   private final int maxUpdates;
   private final int maxPatchSets;
   private final ListMultimap<String, ChangeUpdate> changeUpdates;
-  private final ListMultimap<String, ChangeDraftNotesUpdate> draftUpdates;
+  private final ListMultimap<String, ChangeDraftUpdate> draftUpdates;
+  private final NoteDbUpdateExecutor noteDbUpdateExecutor;
+  private final ChangeDraftUpdateExecutor.AbstractFactory draftUpdatesExecutorFactory;
   private final ListMultimap<String, RobotCommentUpdate> robotCommentUpdates;
   private final ListMultimap<String, NoteDbRewriter> rewriters;
   private final Set<Change.Id> changesToDelete;
-  private final NoteDbUpdateExecutor noteDbUpdateExecutor;
 
   private OpenRepo changeRepo;
-  private OpenRepo allUsersRepo;
-  private AllUsersAsyncUpdate updateAllUsersAsync;
   private boolean executed;
   private String refLogMessage;
   private PersonIdent refLogIdent;
   private PushCertificate pushCert;
   private ImmutableList<BatchUpdateListener> batchUpdateListeners;
+  private ChangeDraftUpdateExecutor draftUpdatesExecutor;
 
   @Inject
   NoteDbUpdateManager(
@@ -108,15 +109,15 @@
       GitRepositoryManager repoManager,
       AllUsersName allUsersName,
       NoteDbMetrics metrics,
-      AllUsersAsyncUpdate updateAllUsersAsync,
       @Assisted Project.NameKey projectName,
-      NoteDbUpdateExecutor noteDbUpdateExecutor) {
+      NoteDbUpdateExecutor noteDbUpdateExecutor,
+      ChangeDraftUpdateExecutor.AbstractFactory draftUpdatesExecutorFactory) {
     this.repoManager = repoManager;
     this.allUsersName = allUsersName;
     this.metrics = metrics;
-    this.updateAllUsersAsync = updateAllUsersAsync;
     this.projectName = projectName;
     this.noteDbUpdateExecutor = noteDbUpdateExecutor;
+    this.draftUpdatesExecutorFactory = draftUpdatesExecutorFactory;
     maxUpdates = cfg.getInt("change", null, "maxUpdates", MAX_UPDATES_DEFAULT);
     maxPatchSets = cfg.getInt("change", null, "maxPatchSets", MAX_PATCH_SETS_DEFAULT);
     changeUpdates = MultimapBuilder.hashKeys().arrayListValues().build();
@@ -129,18 +130,10 @@
 
   @Override
   public void close() {
-    try {
-      if (allUsersRepo != null) {
-        OpenRepo r = allUsersRepo;
-        allUsersRepo = null;
-        r.close();
-      }
-    } finally {
-      if (changeRepo != null) {
-        OpenRepo r = changeRepo;
-        changeRepo = null;
-        r.close();
-      }
+    if (changeRepo != null) {
+      OpenRepo r = changeRepo;
+      changeRepo = null;
+      r.close();
     }
   }
 
@@ -197,12 +190,6 @@
     }
   }
 
-  private void initAllUsersRepo() throws IOException {
-    if (allUsersRepo == null) {
-      allUsersRepo = OpenRepo.open(repoManager, allUsersName);
-    }
-  }
-
   private boolean isEmpty() {
     return changeUpdates.isEmpty()
         && draftUpdates.isEmpty()
@@ -210,8 +197,7 @@
         && rewriters.isEmpty()
         && changesToDelete.isEmpty()
         && !hasCommands(changeRepo)
-        && !hasCommands(allUsersRepo)
-        && updateAllUsersAsync.isEmpty();
+        && (draftUpdatesExecutor == null || draftUpdatesExecutor.isEmpty());
   }
 
   private static boolean hasCommands(@Nullable OpenRepo or) {
@@ -238,10 +224,9 @@
         "cannot update & rewrite ref %s in one BatchUpdate",
         update.getRefName());
 
-    Optional<ChangeDraftNotesUpdate> du =
-        ChangeDraftNotesUpdate.asChangeDraftNotesUpdate(update.getDraftUpdate());
-    if (du.isPresent()) {
-      draftUpdates.put(du.get().getRefName(), du.get());
+    ChangeDraftUpdate du = update.getDraftUpdate();
+    if (du != null) {
+      draftUpdates.put(du.getStorageKey(), du);
     }
     RobotCommentUpdate rcu = update.getRobotCommentUpdate();
     if (rcu != null) {
@@ -279,9 +264,9 @@
     changeUpdates.put(update.getRefName(), update);
   }
 
-  public void add(ChangeDraftNotesUpdate draftUpdate) {
+  public void add(ChangeDraftUpdate draftUpdate) {
     checkNotExecuted();
-    draftUpdates.put(draftUpdate.getRefName(), draftUpdate);
+    draftUpdates.put(draftUpdate.getStorageKey(), draftUpdate);
   }
 
   public void deleteChange(Change.Id id) {
@@ -302,7 +287,7 @@
 
       initChangeRepo();
       if (!draftUpdates.isEmpty() || !changesToDelete.isEmpty()) {
-        initAllUsersRepo();
+        draftUpdatesExecutor = draftUpdatesExecutorFactory.create();
       }
       addCommands();
     }
@@ -325,7 +310,7 @@
         NonCancellableOperationContext nonCancellableOperationContext =
             RequestStateContext.startNonCancellableOperation()) {
       stage();
-      // ChangeUpdates must execute before ChangeDraftNotesUpdates.
+      // ChangeUpdates must execute before ChangeDraftUpdates.
       //
       // ChangeUpdate will automatically delete draft comments for any published
       // comments, but the updates to the two repos don't happen atomically.
@@ -337,16 +322,18 @@
           newTimer("NoteDbUpdateManager#updateRepo", Metadata.empty())) {
         execute(changeRepo, dryrun, pushCert).ifPresent(bru -> resultBuilder.put(projectName, bru));
       }
-      try (TraceContext.TraceTimer ignored =
-          newTimer("NoteDbUpdateManager#updateAllUsersSync", Metadata.empty())) {
-        execute(allUsersRepo, dryrun, null).ifPresent(bru -> resultBuilder.put(allUsersName, bru));
-      }
-      if (!dryrun) {
-        // Only execute the asynchronous operation if we are not in dry-run mode: The dry run would
-        // have to run synchronous to be of any value at all. For the removal of draft comments from
-        // All-Users we don't care much of the operation succeeds, so we are skipping the dry run
-        // altogether.
-        updateAllUsersAsync.execute(refLogIdent, refLogMessage, pushCert);
+
+      if (draftUpdatesExecutor != null) {
+        draftUpdatesExecutor
+            .executeAllSyncUpdates(dryrun, refLogIdent, refLogMessage)
+            .ifPresent(bru -> resultBuilder.put(allUsersName, bru));
+        if (!dryrun) {
+          // Only execute the asynchronous operation if we are not in dry-run mode: The dry run
+          // would have to run synchronous to be of any value at all. For the removal of draft
+          // comments from All-Users we don't care much of the operation succeeds, so we are
+          // skipping the dry run altogether.
+          draftUpdatesExecutor.executeAllAsyncUpdates(refLogIdent, refLogMessage, pushCert);
+        }
       }
       executed = true;
       return resultBuilder.build();
@@ -378,13 +365,7 @@
   private void addCommands() throws IOException {
     changeRepo.addUpdates(changeUpdates, Optional.of(maxUpdates), Optional.of(maxPatchSets));
     if (!draftUpdates.isEmpty()) {
-      boolean publishOnly =
-          draftUpdates.values().stream().allMatch(ChangeDraftNotesUpdate::canRunAsync);
-      if (publishOnly) {
-        updateAllUsersAsync.setDraftUpdates(draftUpdates);
-      } else {
-        allUsersRepo.addUpdatesNoLimits(draftUpdates);
-      }
+      draftUpdatesExecutor.queueAllDraftUpdates(draftUpdates);
     }
     if (!robotCommentUpdates.isEmpty()) {
       changeRepo.addUpdatesNoLimits(robotCommentUpdates);
@@ -404,14 +385,7 @@
     old.ifPresent(
         objectId -> changeRepo.cmds.add(new ReceiveCommand(objectId, ObjectId.zeroId(), metaRef)));
 
-    // Just scan repo for ref names, but get "old" values from cmds.
-    for (Ref r :
-        allUsersRepo.repo.getRefDatabase().getRefsByPrefix(RefNames.refsDraftCommentsPrefix(id))) {
-      old = allUsersRepo.cmds.get(r.getName());
-      old.ifPresent(
-          objectId ->
-              allUsersRepo.cmds.add(new ReceiveCommand(objectId, ObjectId.zeroId(), r.getName())));
-    }
+    draftUpdatesExecutor.queueDeletionForChangeDrafts(id);
   }
 
   private void checkNotExecuted() {
diff --git a/javatests/com/google/gerrit/server/notedb/ChangeNotesTest.java b/javatests/com/google/gerrit/server/notedb/ChangeNotesTest.java
index a4846be..c456353 100644
--- a/javatests/com/google/gerrit/server/notedb/ChangeNotesTest.java
+++ b/javatests/com/google/gerrit/server/notedb/ChangeNotesTest.java
@@ -56,6 +56,7 @@
 import com.google.gerrit.entities.SubmissionId;
 import com.google.gerrit.entities.SubmitRecord;
 import com.google.gerrit.exceptions.StorageException;
+import com.google.gerrit.server.ChangeDraftUpdate;
 import com.google.gerrit.server.CurrentUser;
 import com.google.gerrit.server.DraftCommentsReader;
 import com.google.gerrit.server.IdentifiedUser;
@@ -3472,13 +3473,11 @@
     // Re-add draft version of comment2 back to draft ref without updating
     // change ref. Simulates the case where deleting the draft failed
     // non-atomically after adding the published comment succeeded.
-    Optional<ChangeDraftNotesUpdate> draftUpdate =
-        ChangeDraftNotesUpdate.asChangeDraftNotesUpdate(
-            newUpdate(c, otherUser).createDraftUpdateIfNull());
-    if (draftUpdate.isPresent()) {
-      draftUpdate.get().putDraftComment(comment2);
+    ChangeDraftUpdate draftUpdate = newUpdate(c, otherUser).createDraftUpdateIfNull();
+    if (draftUpdate != null) {
+      draftUpdate.putDraftComment(comment2);
       try (NoteDbUpdateManager manager = updateManagerFactory.create(c.getProject())) {
-        manager.add(draftUpdate.get());
+        manager.add(draftUpdate);
         testRefAction(() -> manager.execute());
       }
     }