Merge "PaginatingSource: Stop matching changes after desired limit is reached" into stable-3.5
diff --git a/java/com/google/gerrit/server/git/receive/ReceiveCommits.java b/java/com/google/gerrit/server/git/receive/ReceiveCommits.java
index 6be072a..951eb55 100644
--- a/java/com/google/gerrit/server/git/receive/ReceiveCommits.java
+++ b/java/com/google/gerrit/server/git/receive/ReceiveCommits.java
@@ -1019,111 +1019,29 @@
         return;
       }
       try {
-        retryHelper
-            .changeUpdate(
-                "insertChangesAndPatchSets",
-                updateFactory -> {
-                  try (BatchUpdate bu =
-                          updateFactory.create(
-                              project.getNameKey(), user.materializedCopy(), TimeUtil.nowTs());
-                      ObjectInserter ins = repo.newObjectInserter();
-                      ObjectReader reader = ins.newReader();
-                      RevWalk rw = new RevWalk(reader)) {
-                    bu.setRepository(repo, rw, ins);
-                    bu.setRefLogMessage("push");
-                    if (magicBranch != null) {
-                      bu.setNotify(magicBranch.getNotifyForNewChange());
-                    }
-
-                    logger.atFine().log("Adding %d replace requests", newChanges.size());
-                    for (ReplaceRequest replace : replaceByChange.values()) {
-                      replace.addOps(bu, replaceProgress);
-                      if (magicBranch != null) {
-                        bu.setNotifyHandling(
-                            replace.ontoChange, magicBranch.getNotifyHandling(replace.notes));
-                        if (magicBranch.shouldPublishComments()) {
-                          bu.addOp(
-                              replace.notes.getChangeId(),
-                              publishCommentsOp.create(replace.psId, project.getNameKey()));
-                          Optional<ChangeNotes> changeNotes =
-                              getChangeNotes(replace.notes.getChangeId());
-                          if (!changeNotes.isPresent()) {
-                            // If not present, no need to update attention set here since this is a
-                            // new change.
-                            continue;
-                          }
-                          List<HumanComment> drafts =
-                              commentsUtil.draftByChangeAuthor(
-                                  changeNotes.get(), user.getAccountId());
-                          if (drafts.isEmpty()) {
-                            // If no comments, attention set shouldn't update since the user didn't
-                            // reply.
-                            continue;
-                          }
-                          replyAttentionSetUpdates.processAutomaticAttentionSetRulesOnReply(
-                              bu,
-                              changeNotes.get(),
-                              isReadyForReview(changeNotes.get()),
-                              user,
-                              drafts);
-                        }
-                      }
-                    }
-
-                    logger.atFine().log("Adding %d create requests", newChanges.size());
-                    for (CreateRequest create : newChanges) {
-                      create.addOps(bu);
-                    }
-
-                    logger.atFine().log("Adding %d group update requests", newChanges.size());
-                    updateGroups.forEach(r -> r.addOps(bu));
-
-                    logger.atFine().log("Executing batch");
-                    try {
-                      bu.execute();
-                    } catch (UpdateException e) {
-                      throw asRestApiException(e);
-                    }
-
-                    replaceByChange.values().stream()
-                        .forEach(
-                            req ->
-                                result.addChange(
-                                    ReceiveCommitsResult.ChangeStatus.REPLACED, req.ontoChange));
-                    newChanges.stream()
-                        .forEach(
-                            req ->
-                                result.addChange(
-                                    ReceiveCommitsResult.ChangeStatus.CREATED, req.changeId));
-
-                    if (magicBranchCmd != null) {
-                      magicBranchCmd.setResult(OK);
-                    }
-                    for (ReplaceRequest replace : replaceByChange.values()) {
-                      String rejectMessage = replace.getRejectMessage();
-                      if (rejectMessage == null) {
-                        if (replace.inputCommand.getResult() == NOT_ATTEMPTED) {
-                          // Not necessarily the magic branch, so need to set OK on the original
-                          // value.
-                          replace.inputCommand.setResult(OK);
-                        }
-                      } else {
-                        logger.atFine().log("Rejecting due to message from ReplaceOp");
-                        reject(replace.inputCommand, rejectMessage);
-                      }
-                    }
-                  }
-                  return null;
-                })
-            .defaultTimeoutMultiplier(5)
-            .call();
+        if (!newChanges.isEmpty()) {
+          // TODO: Retry lock failures on new change insertions. The retry will
+          //  likely have to move to a higher layer to be able to achieve that
+          //  due to state that needs to be reset with each retry attempt.
+          insertChangesAndPatchSets(magicBranchCmd, newChanges, replaceProgress);
+        } else {
+          retryHelper
+              .changeUpdate(
+                  "insertPatchSets",
+                  updateFactory -> {
+                    insertChangesAndPatchSets(magicBranchCmd, newChanges, replaceProgress);
+                    return null;
+                  })
+              .defaultTimeoutMultiplier(5)
+              .call();
+        }
       } catch (ResourceConflictException e) {
         addError(e.getMessage());
         reject(magicBranchCmd, "conflict");
       } catch (BadRequestException | UnprocessableEntityException | AuthException e) {
         logger.atFine().withCause(e).log("Rejecting due to client error");
         reject(magicBranchCmd, e.getMessage());
-      } catch (RestApiException | UpdateException e) {
+      } catch (RestApiException | IOException | UpdateException e) {
         throw new StorageException("Can't insert change/patch set for " + project.getName(), e);
       }
 
@@ -1146,6 +1064,87 @@
     }
   }
 
+  private void insertChangesAndPatchSets(
+      ReceiveCommand magicBranchCmd, List<CreateRequest> newChanges, Task replaceProgress)
+      throws RestApiException, IOException {
+    try (BatchUpdate bu =
+            batchUpdateFactory.create(
+                project.getNameKey(), user.materializedCopy(), TimeUtil.nowTs());
+        ObjectInserter ins = repo.newObjectInserter();
+        ObjectReader reader = ins.newReader();
+        RevWalk rw = new RevWalk(reader)) {
+      bu.setRepository(repo, rw, ins);
+      bu.setRefLogMessage("push");
+      if (magicBranch != null) {
+        bu.setNotify(magicBranch.getNotifyForNewChange());
+      }
+
+      logger.atFine().log("Adding %d replace requests", newChanges.size());
+      for (ReplaceRequest replace : replaceByChange.values()) {
+        replace.addOps(bu, replaceProgress);
+        if (magicBranch != null) {
+          bu.setNotifyHandling(replace.ontoChange, magicBranch.getNotifyHandling(replace.notes));
+          if (magicBranch.shouldPublishComments()) {
+            bu.addOp(
+                replace.notes.getChangeId(),
+                publishCommentsOp.create(replace.psId, project.getNameKey()));
+            Optional<ChangeNotes> changeNotes = getChangeNotes(replace.notes.getChangeId());
+            if (!changeNotes.isPresent()) {
+              // If not present, no need to update attention set here since this is a new change.
+              continue;
+            }
+            List<HumanComment> drafts =
+                commentsUtil.draftByChangeAuthor(changeNotes.get(), user.getAccountId());
+            if (drafts.isEmpty()) {
+              // If no comments, attention set shouldn't update since the user didn't reply.
+              continue;
+            }
+            replyAttentionSetUpdates.processAutomaticAttentionSetRulesOnReply(
+                bu, changeNotes.get(), isReadyForReview(changeNotes.get()), user, drafts);
+          }
+        }
+      }
+
+      logger.atFine().log("Adding %d create requests", newChanges.size());
+      for (CreateRequest create : newChanges) {
+        create.addOps(bu);
+      }
+
+      logger.atFine().log("Adding %d group update requests", newChanges.size());
+      updateGroups.forEach(r -> r.addOps(bu));
+
+      logger.atFine().log("Executing batch");
+      try {
+        bu.execute();
+      } catch (UpdateException e) {
+        throw asRestApiException(e);
+      }
+
+      replaceByChange.values().stream()
+          .forEach(
+              req -> result.addChange(ReceiveCommitsResult.ChangeStatus.REPLACED, req.ontoChange));
+      newChanges.stream()
+          .forEach(
+              req -> result.addChange(ReceiveCommitsResult.ChangeStatus.CREATED, req.changeId));
+
+      if (magicBranchCmd != null) {
+        magicBranchCmd.setResult(OK);
+      }
+      for (ReplaceRequest replace : replaceByChange.values()) {
+        String rejectMessage = replace.getRejectMessage();
+        if (rejectMessage == null) {
+          if (replace.inputCommand.getResult() == NOT_ATTEMPTED) {
+            // Not necessarily the magic branch, so need to set OK on the original value.
+            replace.inputCommand.setResult(OK);
+          }
+        } else {
+          logger.atFine().log("Rejecting due to message from ReplaceOp");
+          reject(replace.inputCommand, rejectMessage);
+        }
+      }
+    }
+  }
+
   private boolean isReadyForReview(ChangeNotes changeNotes) {
     return (!changeNotes.getChange().isWorkInProgress() && !magicBranch.workInProgress)
         || magicBranch.ready;