Add a helper for retrying BatchUpdates under NoteDb
With a SQL or SQL-like backend for ReviewDb, two transactions on the
same Change entity initiated around the same time will generally both
succeed, due to the low-level implementation waiting for a lock or
retrying. NoteDb, being Git-backed, has no notion of locking, and the
only atomic operation is a compare-and-swap. This means that concurrent
writes carry a higher risk of exceptions in the Gerrit level when
compared with ReviewDb, and it will be worth it to implement a retrying
mechanism in Gerrit.
The question becomes: what is the appropriate unit of work to retry? The
implementation in this change encourages retrying at the highest level
of an entire end-user operation, like a REST API handler.
The main reason not to limit retrying to a lower level, like a single
BatchUpdate or its Ops, is that the op implementations may depend on
repository state that was read prior to entering the retry loop. This
potentially includes pretty much any caller of
BatchUpdate#setRepository, but the most notable is MergeOp: the initial
branch tips, which are ultimately used as old IDs in the final ref
updates, are read outside of the BatchUpdate. If we retried the
BatchUpdate on LOCK_FAILURE but not the outer code, retrying would be
guaranteed to fail.
The next question is: under what conditions should we retry? The safest
approach, implemented here, is to look specifically for LOCK_FAILUREs
only in the disabled-ReviewDb case, and only when the underlying ref
backend performs atomic multi-ref transactions. If transactions are not
atomic, then it is infeasible to find out which portions of the code
would need to be retried; if they are atomic, then we can assume that a
failed transaction means the operation had no side effects, so retrying
is safe.
There is certainly an argument to be made that it may be worth retrying
even after non-atomic partially-successful operations, under the
assumption that if an error propagates back to the user, probably the
next thing they were going to anyway is just retry manually. But
decisions about when to loosen up our initially tight safety assumptions
can be deferred.
Change-Id: Ic7a9df9ba1bfdb01784cd1fce2b2ce82511e1068
diff --git a/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/NoteDbOnlyIT.java b/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/NoteDbOnlyIT.java
index 8b5c68a..2c61e64 100644
--- a/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/NoteDbOnlyIT.java
+++ b/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/server/notedb/NoteDbOnlyIT.java
@@ -20,28 +20,46 @@
import static com.google.common.truth.TruthJUnit.assume;
import static java.util.stream.Collectors.toList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
import com.google.gerrit.acceptance.AbstractDaemonTest;
import com.google.gerrit.acceptance.PushOneCommit;
import com.google.gerrit.common.TimeUtil;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.client.ListChangesOption;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
+import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.update.BatchUpdate;
+import com.google.gerrit.server.update.BatchUpdateListener;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.RepoContext;
+import com.google.gerrit.server.update.RetryHelper;
+import com.google.inject.Inject;
import java.io.IOException;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jgit.internal.storage.dfs.InMemoryRepository;
+import org.eclipse.jgit.lib.CommitBuilder;
+import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectInserter;
+import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevSort;
+import org.eclipse.jgit.revwalk.RevWalk;
import org.junit.Before;
import org.junit.Test;
public class NoteDbOnlyIT extends AbstractDaemonTest {
+ @Inject private RetryHelper retryHelper;
+
@Before
public void setUp() throws Exception {
assume().that(notesMigration.disableChangeReviewDb()).isTrue();
@@ -78,7 +96,7 @@
}
};
- try (BatchUpdate bu = newBatchUpdate()) {
+ try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
bu.addOp(id, backupMasterOp);
bu.execute();
}
@@ -94,7 +112,7 @@
assertThat(master2).isNotEqualTo(master1);
int msgCount = getMessages(id).size();
- try (BatchUpdate bu = newBatchUpdate()) {
+ try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
// This time, we attempt to back up master, but we fail during updateChange.
bu.addOp(id, backupMasterOp);
String msg = "Change is bad";
@@ -119,9 +137,175 @@
assertThat(getMessages(id)).hasSize(msgCount);
}
- private BatchUpdate newBatchUpdate() {
- return batchUpdateFactory.create(
- db, project, identifiedUserFactory.create(user.getId()), TimeUtil.nowTs());
+ @Test
+ public void retryOnLockFailureWithAtomicUpdates() throws Exception {
+ assume().that(notesMigration.fuseUpdates()).isTrue();
+ PushOneCommit.Result r = createChange();
+ Change.Id id = r.getChange().getId();
+ String master = "refs/heads/master";
+ ObjectId initial;
+ try (Repository repo = repoManager.openRepository(project)) {
+ ((InMemoryRepository) repo).setPerformsAtomicTransactions(true);
+ initial = repo.exactRef(master).getObjectId();
+ }
+
+ AtomicInteger updateRepoCalledCount = new AtomicInteger();
+ AtomicInteger updateChangeCalledCount = new AtomicInteger();
+ AtomicInteger afterUpdateReposCalledCount = new AtomicInteger();
+
+ String result =
+ retryHelper.execute(
+ batchUpdateFactory -> {
+ try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
+ bu.addOp(
+ id,
+ new UpdateRefAndAddMessageOp(updateRepoCalledCount, updateChangeCalledCount));
+ bu.execute(new ConcurrentWritingListener(afterUpdateReposCalledCount));
+ }
+ return "Done";
+ });
+
+ assertThat(result).isEqualTo("Done");
+ assertThat(updateRepoCalledCount.get()).isEqualTo(2);
+ assertThat(afterUpdateReposCalledCount.get()).isEqualTo(2);
+ assertThat(updateChangeCalledCount.get()).isEqualTo(2);
+
+ List<String> messages = getMessages(id);
+ assertThat(Iterables.getLast(messages)).isEqualTo(UpdateRefAndAddMessageOp.CHANGE_MESSAGE);
+ assertThat(Collections.frequency(messages, UpdateRefAndAddMessageOp.CHANGE_MESSAGE))
+ .isEqualTo(1);
+
+ try (Repository repo = repoManager.openRepository(project)) {
+ // Op lost the race, so the other writer's commit happened first. Then op retried and wrote
+ // its commit with the other writer's commit as parent.
+ assertThat(commitMessages(repo, initial, repo.exactRef(master).getObjectId()))
+ .containsExactly(
+ ConcurrentWritingListener.MSG_PREFIX + "1", UpdateRefAndAddMessageOp.COMMIT_MESSAGE)
+ .inOrder();
+ }
+ }
+
+ @Test
+ public void noRetryOnLockFailureWithoutAtomicUpdates() throws Exception {
+ assume().that(notesMigration.fuseUpdates()).isFalse();
+
+ PushOneCommit.Result r = createChange();
+ Change.Id id = r.getChange().getId();
+ String master = "refs/heads/master";
+ ObjectId initial;
+ try (Repository repo = repoManager.openRepository(project)) {
+ initial = repo.exactRef(master).getObjectId();
+ }
+
+ AtomicInteger updateRepoCalledCount = new AtomicInteger();
+ AtomicInteger updateChangeCalledCount = new AtomicInteger();
+ AtomicInteger afterUpdateReposCalledCount = new AtomicInteger();
+
+ try {
+ retryHelper.execute(
+ batchUpdateFactory -> {
+ try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
+ bu.addOp(
+ id, new UpdateRefAndAddMessageOp(updateRepoCalledCount, updateChangeCalledCount));
+ bu.execute(new ConcurrentWritingListener(afterUpdateReposCalledCount));
+ }
+ return null;
+ });
+ assert_().fail("expected RestApiException");
+ } catch (RestApiException e) {
+ // Expected.
+ }
+
+ assertThat(updateRepoCalledCount.get()).isEqualTo(1);
+ assertThat(afterUpdateReposCalledCount.get()).isEqualTo(1);
+ assertThat(updateChangeCalledCount.get()).isEqualTo(0);
+
+ // updateChange was never called, so no message was ever added.
+ assertThat(getMessages(id)).doesNotContain(UpdateRefAndAddMessageOp.CHANGE_MESSAGE);
+
+ try (Repository repo = repoManager.openRepository(project)) {
+ // Op lost the race, so the other writer's commit happened first. Op didn't retry, because the
+ // ref updates weren't atomic, so it didn't throw LockFailureException on failure.
+ assertThat(commitMessages(repo, initial, repo.exactRef(master).getObjectId()))
+ .containsExactly(ConcurrentWritingListener.MSG_PREFIX + "1");
+ }
+ }
+
+ private class ConcurrentWritingListener implements BatchUpdateListener {
+ static final String MSG_PREFIX = "Other writer ";
+
+ private final AtomicInteger calledCount;
+
+ private ConcurrentWritingListener(AtomicInteger calledCount) {
+ this.calledCount = calledCount;
+ }
+
+ @Override
+ public void afterUpdateRepos() throws Exception {
+ // Reopen repo and update ref, to simulate a concurrent write in another
+ // thread. Only do this the first time the listener is called.
+ if (calledCount.getAndIncrement() > 0) {
+ return;
+ }
+ try (Repository repo = repoManager.openRepository(project);
+ RevWalk rw = new RevWalk(repo);
+ ObjectInserter ins = repo.newObjectInserter()) {
+ String master = "refs/heads/master";
+ ObjectId oldId = repo.exactRef(master).getObjectId();
+ ObjectId newId = newCommit(rw, ins, oldId, MSG_PREFIX + calledCount.get());
+ ins.flush();
+ RefUpdate ru = repo.updateRef(master);
+ ru.setExpectedOldObjectId(oldId);
+ ru.setNewObjectId(newId);
+ assertThat(ru.update(rw)).isEqualTo(RefUpdate.Result.FAST_FORWARD);
+ }
+ }
+ }
+
+ private class UpdateRefAndAddMessageOp implements BatchUpdateOp {
+ static final String COMMIT_MESSAGE = "A commit";
+ static final String CHANGE_MESSAGE = "A change message";
+
+ private final AtomicInteger updateRepoCalledCount;
+ private final AtomicInteger updateChangeCalledCount;
+
+ private UpdateRefAndAddMessageOp(
+ AtomicInteger updateRepoCalledCount, AtomicInteger updateChangeCalledCount) {
+ this.updateRepoCalledCount = updateRepoCalledCount;
+ this.updateChangeCalledCount = updateChangeCalledCount;
+ }
+
+ @Override
+ public void updateRepo(RepoContext ctx) throws Exception {
+ String master = "refs/heads/master";
+ ObjectId oldId = ctx.getRepoView().getRef(master).get();
+ ObjectId newId = newCommit(ctx.getRevWalk(), ctx.getInserter(), oldId, COMMIT_MESSAGE);
+ ctx.addRefUpdate(oldId, newId, master);
+ updateRepoCalledCount.incrementAndGet();
+ }
+
+ @Override
+ public boolean updateChange(ChangeContext ctx) throws Exception {
+ ctx.getUpdate(ctx.getChange().currentPatchSetId()).setChangeMessage(CHANGE_MESSAGE);
+ updateChangeCalledCount.incrementAndGet();
+ return true;
+ }
+ }
+
+ private ObjectId newCommit(RevWalk rw, ObjectInserter ins, ObjectId parent, String msg)
+ throws IOException {
+ PersonIdent ident = serverIdent.get();
+ CommitBuilder cb = new CommitBuilder();
+ cb.setParentId(parent);
+ cb.setTreeId(rw.parseCommit(parent).getTree());
+ cb.setMessage(msg);
+ cb.setAuthor(ident);
+ cb.setCommitter(ident);
+ return ins.insert(Constants.OBJ_COMMIT, cb.build());
+ }
+
+ private BatchUpdate newBatchUpdate(BatchUpdate.Factory buf) {
+ return buf.create(db, project, identifiedUserFactory.create(user.getId()), TimeUtil.nowTs());
}
private Optional<ObjectId> getRef(String name) throws Exception {
@@ -139,4 +323,15 @@
.map(m -> m.message)
.collect(toList());
}
+
+ private static List<String> commitMessages(
+ Repository repo, ObjectId fromExclusive, ObjectId toInclusive) throws Exception {
+ try (RevWalk rw = new RevWalk(repo)) {
+ rw.markStart(rw.parseCommit(toInclusive));
+ rw.markUninteresting(rw.parseCommit(fromExclusive));
+ rw.sort(RevSort.REVERSE);
+ rw.setRetainBody(true);
+ return Streams.stream(rw).map(c -> c.getShortMessage()).collect(toList());
+ }
+ }
}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/update/BatchUpdate.java b/gerrit-server/src/main/java/com/google/gerrit/server/update/BatchUpdate.java
index 22329fd..1d420bf 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/update/BatchUpdate.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/update/BatchUpdate.java
@@ -107,6 +107,7 @@
private final FusedNoteDbBatchUpdate.AssistedFactory fusedNoteDbBatchUpdateFactory;
private final UnfusedNoteDbBatchUpdate.AssistedFactory unfusedNoteDbBatchUpdateFactory;
+ // TODO(dborowitz): Make this non-injectable to force all callers to use RetryHelper.
@Inject
Factory(
NotesMigration migration,
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/update/RetryHelper.java b/gerrit-server/src/main/java/com/google/gerrit/server/update/RetryHelper.java
new file mode 100644
index 0000000..dbbce2b
--- /dev/null
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/update/RetryHelper.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2017 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.server.update;
+
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Throwables;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.server.git.LockFailureException;
+import com.google.gerrit.server.notedb.NotesMigration;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Singleton
+public class RetryHelper {
+ public interface Action<T> {
+ T call(BatchUpdate.Factory updateFactory) throws Exception;
+ }
+
+ private final BatchUpdate.Factory updateFactory;
+
+ @Inject
+ RetryHelper(
+ NotesMigration migration,
+ ReviewDbBatchUpdate.AssistedFactory reviewDbBatchUpdateFactory,
+ FusedNoteDbBatchUpdate.AssistedFactory fusedNoteDbBatchUpdateFactory,
+ UnfusedNoteDbBatchUpdate.AssistedFactory unfusedNoteDbBatchUpdateFactory) {
+ this.updateFactory =
+ new BatchUpdate.Factory(
+ migration,
+ reviewDbBatchUpdateFactory,
+ fusedNoteDbBatchUpdateFactory,
+ unfusedNoteDbBatchUpdateFactory);
+ }
+
+ public <T> T execute(Action<T> action) throws RestApiException, UpdateException {
+ try {
+ // TODO(dborowitz): Make configurable.
+ return RetryerBuilder.<T>newBuilder()
+ .withStopStrategy(StopStrategies.stopAfterDelay(20, TimeUnit.SECONDS))
+ .withWaitStrategy(
+ WaitStrategies.join(
+ WaitStrategies.exponentialWait(5, TimeUnit.SECONDS),
+ WaitStrategies.randomWait(50, TimeUnit.MILLISECONDS)))
+ .retryIfException(RetryHelper::isLockFailure)
+ .build()
+ .call(() -> action.call(updateFactory));
+ } catch (ExecutionException | RetryException e) {
+ if (e.getCause() != null) {
+ Throwables.throwIfInstanceOf(e.getCause(), UpdateException.class);
+ Throwables.throwIfInstanceOf(e.getCause(), RestApiException.class);
+ }
+ throw new UpdateException(e);
+ }
+ }
+
+ private static boolean isLockFailure(Throwable t) {
+ if (t instanceof UpdateException) {
+ t = t.getCause();
+ }
+ return t instanceof LockFailureException;
+ }
+}