| // Copyright (C) 2015 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.gerrit.server.update; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static java.util.Comparator.comparing; |
| import static java.util.concurrent.TimeUnit.NANOSECONDS; |
| |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.util.concurrent.CheckedFuture; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.gerrit.common.Nullable; |
| import com.google.gerrit.extensions.restapi.ResourceConflictException; |
| import com.google.gerrit.extensions.restapi.ResourceNotFoundException; |
| import com.google.gerrit.extensions.restapi.RestApiException; |
| import com.google.gerrit.metrics.Description; |
| import com.google.gerrit.metrics.Description.Units; |
| import com.google.gerrit.metrics.Field; |
| import com.google.gerrit.metrics.MetricMaker; |
| import com.google.gerrit.metrics.Timer1; |
| import com.google.gerrit.reviewdb.client.Change; |
| import com.google.gerrit.reviewdb.client.PatchSet; |
| import com.google.gerrit.reviewdb.client.Project; |
| import com.google.gerrit.reviewdb.server.ReviewDb; |
| import com.google.gerrit.reviewdb.server.ReviewDbWrapper; |
| import com.google.gerrit.server.CurrentUser; |
| import com.google.gerrit.server.GerritPersonIdent; |
| import com.google.gerrit.server.config.AllUsersName; |
| import com.google.gerrit.server.config.GerritServerConfig; |
| import com.google.gerrit.server.extensions.events.GitReferenceUpdated; |
| import com.google.gerrit.server.git.GitRepositoryManager; |
| import com.google.gerrit.server.git.InsertedObject; |
| import com.google.gerrit.server.git.LockFailureException; |
| import com.google.gerrit.server.index.change.ChangeIndexer; |
| import com.google.gerrit.server.notedb.ChangeNotes; |
| import com.google.gerrit.server.notedb.ChangeUpdate; |
| import com.google.gerrit.server.notedb.NoteDbChangeState; |
| import com.google.gerrit.server.notedb.NoteDbChangeState.PrimaryStorage; |
| import com.google.gerrit.server.notedb.NoteDbUpdateManager; |
| import com.google.gerrit.server.notedb.NoteDbUpdateManager.MismatchedStateException; |
| import com.google.gerrit.server.notedb.NotesMigration; |
| import com.google.gerrit.server.project.ChangeControl; |
| import com.google.gerrit.server.project.InvalidChangeOperationException; |
| import com.google.gerrit.server.project.NoSuchChangeException; |
| import com.google.gerrit.server.project.NoSuchProjectException; |
| import com.google.gerrit.server.project.NoSuchRefException; |
| import com.google.gerrit.server.util.RequestId; |
| import com.google.gwtorm.server.OrmException; |
| import com.google.gwtorm.server.SchemaFactory; |
| import com.google.inject.Inject; |
| import com.google.inject.Singleton; |
| import com.google.inject.assistedinject.Assisted; |
| import com.google.inject.assistedinject.AssistedInject; |
| import java.io.IOException; |
| import java.sql.Timestamp; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TimeZone; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import org.eclipse.jgit.lib.BatchRefUpdate; |
| import org.eclipse.jgit.lib.Config; |
| import org.eclipse.jgit.lib.NullProgressMonitor; |
| import org.eclipse.jgit.lib.ObjectInserter; |
| import org.eclipse.jgit.lib.ObjectReader; |
| import org.eclipse.jgit.lib.PersonIdent; |
| import org.eclipse.jgit.lib.Repository; |
| import org.eclipse.jgit.revwalk.RevWalk; |
| import org.eclipse.jgit.transport.ReceiveCommand; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * {@link BatchUpdate} implementation that supports mixed ReviewDb/NoteDb operations, depending on |
| * the migration state specified in {@link NotesMigration}. |
| * |
| * <p>When performing change updates in a mixed ReviewDb/NoteDb environment with ReviewDb primary, |
| * the order of operations is very subtle: |
| * |
| * <ol> |
| * <li>Stage NoteDb updates to get the new NoteDb state, but do not write to the repo. |
| * <li>Write the new state in the Change entity, and commit this to ReviewDb. |
| * <li>Update NoteDb, ignoring any write failures. |
| * </ol> |
| * |
| * The implementation in this class is well-tested, and it is strongly recommended that you not |
| * attempt to reimplement this logic. Use {@code BatchUpdate} if at all possible. |
| */ |
| class ReviewDbBatchUpdate extends BatchUpdate { |
| private static final Logger log = LoggerFactory.getLogger(ReviewDbBatchUpdate.class); |
| |
| interface AssistedFactory { |
| ReviewDbBatchUpdate create( |
| ReviewDb db, Project.NameKey project, CurrentUser user, Timestamp when); |
| } |
| |
| class ContextImpl implements Context { |
| private Repository repoWrapper; |
| |
| @Override |
| public Repository getRepository() throws IOException { |
| if (repoWrapper == null) { |
| repoWrapper = new ReadOnlyRepository(ReviewDbBatchUpdate.this.getRepository()); |
| } |
| return repoWrapper; |
| } |
| |
| @Override |
| public RevWalk getRevWalk() throws IOException { |
| return ReviewDbBatchUpdate.this.getRevWalk(); |
| } |
| |
| @Override |
| public Project.NameKey getProject() { |
| return project; |
| } |
| |
| @Override |
| public Timestamp getWhen() { |
| return when; |
| } |
| |
| @Override |
| public TimeZone getTimeZone() { |
| return tz; |
| } |
| |
| @Override |
| public ReviewDb getDb() { |
| return db; |
| } |
| |
| @Override |
| public CurrentUser getUser() { |
| return user; |
| } |
| |
| @Override |
| public Order getOrder() { |
| return order; |
| } |
| } |
| |
| private class RepoContextImpl extends ContextImpl implements RepoContext { |
| @Override |
| public Repository getRepository() throws IOException { |
| return ReviewDbBatchUpdate.this.getRepository(); |
| } |
| |
| @Override |
| public ObjectInserter getInserter() throws IOException { |
| return ReviewDbBatchUpdate.this.getObjectInserter(); |
| } |
| |
| @Override |
| public void addRefUpdate(ReceiveCommand cmd) throws IOException { |
| initRepository(); |
| commands.add(cmd); |
| } |
| } |
| |
| private class ChangeContextImpl extends ContextImpl implements ChangeContext { |
| private final ChangeControl ctl; |
| private final Map<PatchSet.Id, ChangeUpdate> updates; |
| private final ReviewDbWrapper dbWrapper; |
| private final Repository threadLocalRepo; |
| private final RevWalk threadLocalRevWalk; |
| |
| private boolean deleted; |
| private boolean bumpLastUpdatedOn = true; |
| |
| protected ChangeContextImpl( |
| ChangeControl ctl, ReviewDbWrapper dbWrapper, Repository repo, RevWalk rw) { |
| this.ctl = ctl; |
| this.dbWrapper = dbWrapper; |
| this.threadLocalRepo = repo; |
| this.threadLocalRevWalk = rw; |
| updates = new TreeMap<>(comparing(PatchSet.Id::get)); |
| } |
| |
| @Override |
| public ReviewDb getDb() { |
| checkNotNull(dbWrapper); |
| return dbWrapper; |
| } |
| |
| @Override |
| public Repository getRepository() { |
| return threadLocalRepo; |
| } |
| |
| @Override |
| public RevWalk getRevWalk() { |
| return threadLocalRevWalk; |
| } |
| |
| @Override |
| public ChangeUpdate getUpdate(PatchSet.Id psId) { |
| ChangeUpdate u = updates.get(psId); |
| if (u == null) { |
| u = changeUpdateFactory.create(ctl, when); |
| if (newChanges.containsKey(ctl.getId())) { |
| u.setAllowWriteToNewRef(true); |
| } |
| u.setPatchSetId(psId); |
| updates.put(psId, u); |
| } |
| return u; |
| } |
| |
| @Override |
| public ChangeControl getControl() { |
| checkNotNull(ctl); |
| return ctl; |
| } |
| |
| @Override |
| public void bumpLastUpdatedOn(boolean bump) { |
| bumpLastUpdatedOn = bump; |
| } |
| |
| @Override |
| public void deleteChange() { |
| deleted = true; |
| } |
| } |
| |
| @Singleton |
| private static class Metrics { |
| final Timer1<Boolean> executeChangeOpsLatency; |
| |
| @Inject |
| Metrics(MetricMaker metricMaker) { |
| executeChangeOpsLatency = |
| metricMaker.newTimer( |
| "batch_update/execute_change_ops", |
| new Description("BatchUpdate change update latency, excluding reindexing") |
| .setCumulative() |
| .setUnit(Units.MILLISECONDS), |
| Field.ofBoolean("success")); |
| } |
| } |
| |
| static void execute( |
| ImmutableList<ReviewDbBatchUpdate> updates, |
| BatchUpdateListener listener, |
| @Nullable RequestId requestId, |
| boolean dryrun) |
| throws UpdateException, RestApiException { |
| if (updates.isEmpty()) { |
| return; |
| } |
| if (requestId != null) { |
| for (BatchUpdate u : updates) { |
| checkArgument( |
| u.requestId == null || u.requestId == requestId, |
| "refusing to overwrite RequestId %s in update with %s", |
| u.requestId, |
| requestId); |
| u.setRequestId(requestId); |
| } |
| } |
| try { |
| Order order = getOrder(updates); |
| boolean updateChangesInParallel = getUpdateChangesInParallel(updates); |
| switch (order) { |
| case REPO_BEFORE_DB: |
| for (ReviewDbBatchUpdate u : updates) { |
| u.executeUpdateRepo(); |
| } |
| listener.afterUpdateRepos(); |
| for (ReviewDbBatchUpdate u : updates) { |
| u.executeRefUpdates(dryrun); |
| } |
| listener.afterUpdateRefs(); |
| for (ReviewDbBatchUpdate u : updates) { |
| u.reindexChanges(u.executeChangeOps(updateChangesInParallel, dryrun)); |
| } |
| listener.afterUpdateChanges(); |
| break; |
| case DB_BEFORE_REPO: |
| for (ReviewDbBatchUpdate u : updates) { |
| u.reindexChanges(u.executeChangeOps(updateChangesInParallel, dryrun)); |
| } |
| listener.afterUpdateChanges(); |
| for (ReviewDbBatchUpdate u : updates) { |
| u.executeUpdateRepo(); |
| } |
| listener.afterUpdateRepos(); |
| for (ReviewDbBatchUpdate u : updates) { |
| u.executeRefUpdates(dryrun); |
| } |
| listener.afterUpdateRefs(); |
| break; |
| default: |
| throw new IllegalStateException("invalid execution order: " + order); |
| } |
| |
| List<CheckedFuture<?, IOException>> indexFutures = new ArrayList<>(); |
| for (ReviewDbBatchUpdate u : updates) { |
| indexFutures.addAll(u.indexFutures); |
| } |
| ChangeIndexer.allAsList(indexFutures).get(); |
| |
| for (ReviewDbBatchUpdate u : updates) { |
| if (u.batchRefUpdate != null) { |
| // Fire ref update events only after all mutations are finished, since |
| // callers may assume a patch set ref being created means the change |
| // was created, or a branch advancing meaning some changes were |
| // closed. |
| u.gitRefUpdated.fire( |
| u.project, |
| u.batchRefUpdate, |
| u.getUser().isIdentifiedUser() ? u.getUser().asIdentifiedUser().getAccount() : null); |
| } |
| } |
| if (!dryrun) { |
| for (ReviewDbBatchUpdate u : updates) { |
| u.executePostOps(); |
| } |
| } |
| } catch (UpdateException | RestApiException e) { |
| // Propagate REST API exceptions thrown by operations; they commonly throw |
| // exceptions like ResourceConflictException to indicate an atomic update |
| // failure. |
| throw e; |
| |
| // Convert other common non-REST exception types with user-visible |
| // messages to corresponding REST exception types |
| } catch (InvalidChangeOperationException e) { |
| throw new ResourceConflictException(e.getMessage(), e); |
| } catch (NoSuchChangeException | NoSuchRefException | NoSuchProjectException e) { |
| throw new ResourceNotFoundException(e.getMessage(), e); |
| |
| } catch (Exception e) { |
| Throwables.throwIfUnchecked(e); |
| throw new UpdateException(e); |
| } |
| } |
| |
| private final AllUsersName allUsers; |
| private final ChangeControl.GenericFactory changeControlFactory; |
| private final ChangeIndexer indexer; |
| private final ChangeNotes.Factory changeNotesFactory; |
| private final ChangeUpdate.Factory changeUpdateFactory; |
| private final GitReferenceUpdated gitRefUpdated; |
| private final ListeningExecutorService changeUpdateExector; |
| private final Metrics metrics; |
| private final NoteDbUpdateManager.Factory updateManagerFactory; |
| private final NotesMigration notesMigration; |
| private final ReviewDb db; |
| private final SchemaFactory<ReviewDb> schemaFactory; |
| private final long skewMs; |
| private final List<CheckedFuture<?, IOException>> indexFutures = new ArrayList<>(); |
| |
| @AssistedInject |
| ReviewDbBatchUpdate( |
| @GerritServerConfig Config cfg, |
| AllUsersName allUsers, |
| ChangeControl.GenericFactory changeControlFactory, |
| ChangeIndexer indexer, |
| ChangeNotes.Factory changeNotesFactory, |
| @ChangeUpdateExecutor ListeningExecutorService changeUpdateExector, |
| ChangeUpdate.Factory changeUpdateFactory, |
| @GerritPersonIdent PersonIdent serverIdent, |
| GitReferenceUpdated gitRefUpdated, |
| GitRepositoryManager repoManager, |
| Metrics metrics, |
| NoteDbUpdateManager.Factory updateManagerFactory, |
| NotesMigration notesMigration, |
| SchemaFactory<ReviewDb> schemaFactory, |
| @Assisted ReviewDb db, |
| @Assisted Project.NameKey project, |
| @Assisted CurrentUser user, |
| @Assisted Timestamp when) { |
| super(repoManager, serverIdent, project, user, when); |
| this.allUsers = allUsers; |
| this.changeControlFactory = changeControlFactory; |
| this.changeNotesFactory = changeNotesFactory; |
| this.changeUpdateExector = changeUpdateExector; |
| this.changeUpdateFactory = changeUpdateFactory; |
| this.gitRefUpdated = gitRefUpdated; |
| this.indexer = indexer; |
| this.metrics = metrics; |
| this.notesMigration = notesMigration; |
| this.schemaFactory = schemaFactory; |
| this.updateManagerFactory = updateManagerFactory; |
| this.db = db; |
| skewMs = NoteDbChangeState.getReadOnlySkew(cfg); |
| } |
| |
| @Override |
| public void execute() throws UpdateException, RestApiException { |
| execute(BatchUpdateListener.NONE); |
| } |
| |
| @Override |
| public void execute(BatchUpdateListener listener) throws UpdateException, RestApiException { |
| execute(ImmutableList.of(this), listener, requestId, false); |
| } |
| |
| @Override |
| protected Context newContext() { |
| return new ContextImpl(); |
| } |
| |
| private void executeUpdateRepo() throws UpdateException, RestApiException { |
| try { |
| logDebug("Executing updateRepo on {} ops", ops.size()); |
| RepoContextImpl ctx = new RepoContextImpl(); |
| for (BatchUpdateOp op : ops.values()) { |
| op.updateRepo(ctx); |
| } |
| |
| logDebug("Executing updateRepo on {} RepoOnlyOps", repoOnlyOps.size()); |
| for (RepoOnlyOp op : repoOnlyOps) { |
| op.updateRepo(ctx); |
| } |
| |
| if (onSubmitValidators != null && commands != null && !commands.isEmpty()) { |
| try (ObjectReader reader = ctx.getInserter().newReader()) { |
| // Validation of refs has to take place here and not at the beginning |
| // executeRefUpdates. Otherwise failing validation in a second BatchUpdate object will |
| // happen *after* first object's executeRefUpdates has finished, hence after first repo's |
| // refs have been updated, which is too late. |
| onSubmitValidators.validate( |
| project, new ReadOnlyRepository(getRepository()), reader, commands.getCommands()); |
| } |
| } |
| |
| if (inserter != null) { |
| logDebug("Flushing inserter"); |
| inserter.flush(); |
| } else { |
| logDebug("No objects to flush"); |
| } |
| } catch (Exception e) { |
| Throwables.throwIfInstanceOf(e, RestApiException.class); |
| throw new UpdateException(e); |
| } |
| } |
| |
| private void executeRefUpdates(boolean dryrun) throws IOException, RestApiException { |
| if (commands == null || commands.isEmpty()) { |
| logDebug("No ref updates to execute"); |
| return; |
| } |
| // May not be opened if the caller added ref updates but no new objects. |
| initRepository(); |
| batchRefUpdate = repo.getRefDatabase().newBatchUpdate(); |
| batchRefUpdate.setRefLogMessage(refLogMessage, true); |
| if (user.isIdentifiedUser()) { |
| batchRefUpdate.setRefLogIdent(user.asIdentifiedUser().newRefLogIdent(when, tz)); |
| } |
| commands.addTo(batchRefUpdate); |
| logDebug("Executing batch of {} ref updates", batchRefUpdate.getCommands().size()); |
| if (dryrun) { |
| return; |
| } |
| |
| batchRefUpdate.execute(revWalk, NullProgressMonitor.INSTANCE); |
| boolean ok = true; |
| for (ReceiveCommand cmd : batchRefUpdate.getCommands()) { |
| if (cmd.getResult() != ReceiveCommand.Result.OK) { |
| ok = false; |
| break; |
| } |
| } |
| if (!ok) { |
| throw new RestApiException("BatchRefUpdate failed: " + batchRefUpdate); |
| } |
| } |
| |
| private List<ChangeTask> executeChangeOps(boolean parallel, boolean dryrun) |
| throws UpdateException, RestApiException { |
| List<ChangeTask> tasks; |
| boolean success = false; |
| Stopwatch sw = Stopwatch.createStarted(); |
| try { |
| logDebug("Executing change ops (parallel? {})", parallel); |
| ListeningExecutorService executor = |
| parallel ? changeUpdateExector : MoreExecutors.newDirectExecutorService(); |
| |
| tasks = new ArrayList<>(ops.keySet().size()); |
| try { |
| if (notesMigration.commitChangeWrites() && repo != null) { |
| // A NoteDb change may have been rebuilt since the repo was originally |
| // opened, so make sure we see that. |
| logDebug("Preemptively scanning for repo changes"); |
| repo.scanForRepoChanges(); |
| } |
| if (!ops.isEmpty() && notesMigration.failChangeWrites()) { |
| // Fail fast before attempting any writes if changes are read-only, as |
| // this is a programmer error. |
| logDebug("Failing early due to read-only Changes table"); |
| throw new OrmException(NoteDbUpdateManager.CHANGES_READ_ONLY); |
| } |
| List<ListenableFuture<?>> futures = new ArrayList<>(ops.keySet().size()); |
| for (Map.Entry<Change.Id, Collection<BatchUpdateOp>> e : ops.asMap().entrySet()) { |
| ChangeTask task = |
| new ChangeTask(e.getKey(), e.getValue(), Thread.currentThread(), dryrun); |
| tasks.add(task); |
| if (!parallel) { |
| logDebug("Direct execution of task for ops: {}", ops); |
| } |
| futures.add(executor.submit(task)); |
| } |
| if (parallel) { |
| logDebug( |
| "Waiting on futures for {} ops spanning {} changes", ops.size(), ops.keySet().size()); |
| } |
| Futures.allAsList(futures).get(); |
| |
| if (notesMigration.commitChangeWrites()) { |
| if (!dryrun) { |
| executeNoteDbUpdates(tasks); |
| } |
| } |
| success = true; |
| } catch (ExecutionException | InterruptedException e) { |
| Throwables.throwIfInstanceOf(e.getCause(), UpdateException.class); |
| Throwables.throwIfInstanceOf(e.getCause(), RestApiException.class); |
| throw new UpdateException(e); |
| } catch (OrmException | IOException e) { |
| throw new UpdateException(e); |
| } |
| } finally { |
| metrics.executeChangeOpsLatency.record(success, sw.elapsed(NANOSECONDS), NANOSECONDS); |
| } |
| return tasks; |
| } |
| |
| private void reindexChanges(List<ChangeTask> tasks) { |
| // Reindex changes. |
| for (ChangeTask task : tasks) { |
| if (task.deleted) { |
| indexFutures.add(indexer.deleteAsync(task.id)); |
| } else if (task.dirty) { |
| indexFutures.add(indexer.indexAsync(project, task.id)); |
| } |
| } |
| } |
| |
| private void executeNoteDbUpdates(List<ChangeTask> tasks) |
| throws ResourceConflictException, IOException { |
| // Aggregate together all NoteDb ref updates from the ops we executed, |
| // possibly in parallel. Each task had its own NoteDbUpdateManager instance |
| // with its own thread-local copy of the repo(s), but each of those was just |
| // used for staging updates and was never executed. |
| // |
| // Use a new BatchRefUpdate as the original batchRefUpdate field is intended |
| // for use only by the updateRepo phase. |
| // |
| // See the comments in NoteDbUpdateManager#execute() for why we execute the |
| // updates on the change repo first. |
| logDebug("Executing NoteDb updates for {} changes", tasks.size()); |
| try { |
| BatchRefUpdate changeRefUpdate = getRepository().getRefDatabase().newBatchUpdate(); |
| boolean hasAllUsersCommands = false; |
| try (ObjectInserter ins = getRepository().newObjectInserter()) { |
| int objs = 0; |
| for (ChangeTask task : tasks) { |
| if (task.noteDbResult == null) { |
| logDebug("No-op update to {}", task.id); |
| continue; |
| } |
| for (ReceiveCommand cmd : task.noteDbResult.changeCommands()) { |
| changeRefUpdate.addCommand(cmd); |
| } |
| for (InsertedObject obj : task.noteDbResult.changeObjects()) { |
| objs++; |
| ins.insert(obj.type(), obj.data().toByteArray()); |
| } |
| hasAllUsersCommands |= !task.noteDbResult.allUsersCommands().isEmpty(); |
| } |
| logDebug( |
| "Collected {} objects and {} ref updates to change repo", |
| objs, |
| changeRefUpdate.getCommands().size()); |
| executeNoteDbUpdate(getRevWalk(), ins, changeRefUpdate); |
| } |
| |
| if (hasAllUsersCommands) { |
| try (Repository allUsersRepo = repoManager.openRepository(allUsers); |
| RevWalk allUsersRw = new RevWalk(allUsersRepo); |
| ObjectInserter allUsersIns = allUsersRepo.newObjectInserter()) { |
| int objs = 0; |
| BatchRefUpdate allUsersRefUpdate = allUsersRepo.getRefDatabase().newBatchUpdate(); |
| for (ChangeTask task : tasks) { |
| for (ReceiveCommand cmd : task.noteDbResult.allUsersCommands()) { |
| allUsersRefUpdate.addCommand(cmd); |
| } |
| for (InsertedObject obj : task.noteDbResult.allUsersObjects()) { |
| allUsersIns.insert(obj.type(), obj.data().toByteArray()); |
| } |
| } |
| logDebug( |
| "Collected {} objects and {} ref updates to All-Users", |
| objs, |
| allUsersRefUpdate.getCommands().size()); |
| executeNoteDbUpdate(allUsersRw, allUsersIns, allUsersRefUpdate); |
| } |
| } else { |
| logDebug("No All-Users updates"); |
| } |
| } catch (IOException e) { |
| if (tasks.stream().allMatch(t -> t.storage == PrimaryStorage.REVIEW_DB)) { |
| // Ignore all errors trying to update NoteDb at this point. We've already written the |
| // NoteDbChangeStates to ReviewDb, which means if any state is out of date it will be |
| // rebuilt the next time it is needed. |
| // |
| // Always log even without RequestId. |
| log.debug("Ignoring NoteDb update error after ReviewDb write", e); |
| |
| // Otherwise, we can't prove it's safe to ignore the error, either because some change had |
| // NOTE_DB primary, or a task failed before determining the primary storage. |
| } else if (e instanceof LockFailureException) { |
| // LOCK_FAILURE is a special case indicating there was a conflicting write to a meta ref, |
| // although it happened too late for us to produce anything but a generic error message. |
| throw new ResourceConflictException("Updating change failed due to conflicting write", e); |
| } |
| throw e; |
| } |
| } |
| |
| private void executeNoteDbUpdate(RevWalk rw, ObjectInserter ins, BatchRefUpdate bru) |
| throws IOException { |
| if (bru.getCommands().isEmpty()) { |
| logDebug("No commands, skipping flush and ref update"); |
| return; |
| } |
| ins.flush(); |
| bru.setAllowNonFastForwards(true); |
| bru.execute(rw, NullProgressMonitor.INSTANCE); |
| for (ReceiveCommand cmd : bru.getCommands()) { |
| // TODO(dborowitz): LOCK_FAILURE for NoteDb primary should be retried. |
| if (cmd.getResult() != ReceiveCommand.Result.OK) { |
| throw new IOException("Update failed: " + bru); |
| } |
| } |
| } |
| |
| private class ChangeTask implements Callable<Void> { |
| final Change.Id id; |
| private final Collection<BatchUpdateOp> changeOps; |
| private final Thread mainThread; |
| private final boolean dryrun; |
| |
| PrimaryStorage storage; |
| NoteDbUpdateManager.StagedResult noteDbResult; |
| boolean dirty; |
| boolean deleted; |
| private String taskId; |
| |
| private ChangeTask( |
| Change.Id id, Collection<BatchUpdateOp> changeOps, Thread mainThread, boolean dryrun) { |
| this.id = id; |
| this.changeOps = changeOps; |
| this.mainThread = mainThread; |
| this.dryrun = dryrun; |
| } |
| |
| @Override |
| public Void call() throws Exception { |
| taskId = id.toString() + "-" + Thread.currentThread().getId(); |
| if (Thread.currentThread() == mainThread) { |
| Repository repo = getRepository(); |
| try (RevWalk rw = new RevWalk(repo)) { |
| call(ReviewDbBatchUpdate.this.db, repo, rw); |
| } |
| } else { |
| // Possible optimization: allow Ops to declare whether they need to |
| // access the repo from updateChange, and don't open in this thread |
| // unless we need it. However, as of this writing the only operations |
| // that are executed in parallel are during ReceiveCommits, and they |
| // all need the repo open anyway. (The non-parallel case above does not |
| // reopen the repo.) |
| try (ReviewDb threadLocalDb = schemaFactory.open(); |
| Repository repo = repoManager.openRepository(project); |
| RevWalk rw = new RevWalk(repo)) { |
| call(threadLocalDb, repo, rw); |
| } |
| } |
| return null; |
| } |
| |
| private void call(ReviewDb db, Repository repo, RevWalk rw) throws Exception { |
| @SuppressWarnings("resource") // Not always opened. |
| NoteDbUpdateManager updateManager = null; |
| try { |
| db.changes().beginTransaction(id); |
| try { |
| ChangeContextImpl ctx = newChangeContext(db, repo, rw, id); |
| NoteDbChangeState oldState = NoteDbChangeState.parse(ctx.getChange()); |
| NoteDbChangeState.checkNotReadOnly(oldState, skewMs); |
| |
| storage = PrimaryStorage.of(oldState); |
| if (storage == PrimaryStorage.NOTE_DB && !notesMigration.readChanges()) { |
| throw new OrmException("must have NoteDb enabled to update change " + id); |
| } |
| |
| // Call updateChange on each op. |
| logDebug("Calling updateChange on {} ops", changeOps.size()); |
| for (BatchUpdateOp op : changeOps) { |
| dirty |= op.updateChange(ctx); |
| } |
| if (!dirty) { |
| logDebug("No ops reported dirty, short-circuiting"); |
| return; |
| } |
| deleted = ctx.deleted; |
| if (deleted) { |
| logDebug("Change was deleted"); |
| } |
| |
| // Stage the NoteDb update and store its state in the Change. |
| if (notesMigration.commitChangeWrites()) { |
| updateManager = stageNoteDbUpdate(ctx, deleted); |
| } |
| |
| if (storage == PrimaryStorage.REVIEW_DB) { |
| // If primary storage of this change is in ReviewDb, bump |
| // lastUpdatedOn or rowVersion and commit. Otherwise, don't waste |
| // time updating ReviewDb at all. |
| Iterable<Change> cs = changesToUpdate(ctx); |
| if (isNewChange(id)) { |
| // Insert rather than upsert in case of a race on change IDs. |
| logDebug("Inserting change"); |
| db.changes().insert(cs); |
| } else if (deleted) { |
| logDebug("Deleting change"); |
| db.changes().delete(cs); |
| } else { |
| logDebug("Updating change"); |
| db.changes().update(cs); |
| } |
| if (!dryrun) { |
| db.commit(); |
| } |
| } else { |
| logDebug("Skipping ReviewDb write since primary storage is {}", storage); |
| } |
| } finally { |
| db.rollback(); |
| } |
| |
| // Do not execute the NoteDbUpdateManager, as we don't want too much |
| // contention on the underlying repo, and we would rather use a single |
| // ObjectInserter/BatchRefUpdate later. |
| // |
| // TODO(dborowitz): May or may not be worth trying to batch together |
| // flushed inserters as well. |
| if (storage == PrimaryStorage.NOTE_DB) { |
| // Should have failed above if NoteDb is disabled. |
| checkState(notesMigration.commitChangeWrites()); |
| noteDbResult = updateManager.stage().get(id); |
| } else if (notesMigration.commitChangeWrites()) { |
| try { |
| noteDbResult = updateManager.stage().get(id); |
| } catch (IOException ex) { |
| // Ignore all errors trying to update NoteDb at this point. We've |
| // already written the NoteDbChangeState to ReviewDb, which means |
| // if the state is out of date it will be rebuilt the next time it |
| // is needed. |
| log.debug("Ignoring NoteDb update error after ReviewDb write", ex); |
| } |
| } |
| } catch (Exception e) { |
| logDebug("Error updating change (should be rethrown)", e); |
| Throwables.propagateIfPossible(e, RestApiException.class); |
| throw new UpdateException(e); |
| } finally { |
| if (updateManager != null) { |
| updateManager.close(); |
| } |
| } |
| } |
| |
| private ChangeContextImpl newChangeContext( |
| ReviewDb db, Repository repo, RevWalk rw, Change.Id id) throws OrmException { |
| Change c = newChanges.get(id); |
| boolean isNew = c != null; |
| if (isNew) { |
| // New change: populate noteDbState. |
| checkState(c.getNoteDbState() == null, "noteDbState should not be filled in by callers"); |
| if (notesMigration.changePrimaryStorage() == PrimaryStorage.NOTE_DB) { |
| c.setNoteDbState(NoteDbChangeState.NOTE_DB_PRIMARY_STATE); |
| } |
| } else { |
| // Existing change. |
| c = ChangeNotes.readOneReviewDbChange(db, id); |
| if (c == null) { |
| // Not in ReviewDb, but new changes are created with default primary |
| // storage as NOTE_DB, so we can assume that a missing change is |
| // NoteDb primary. Pass a synthetic change into ChangeNotes.Factory, |
| // which lets ChangeNotes take care of the existence check. |
| // |
| // TODO(dborowitz): This assumption is potentially risky, because |
| // it means once we turn this option on and start creating changes |
| // without writing anything to ReviewDb, we can't turn this option |
| // back off without making those changes inaccessible. The problem |
| // is we have no way of distinguishing a change that only exists in |
| // NoteDb because it only ever existed in NoteDb, from a change that |
| // only exists in NoteDb because it used to exist in ReviewDb and |
| // deleting from ReviewDb succeeded but deleting from NoteDb failed. |
| // |
| // TODO(dborowitz): We actually still have that problem anyway. Maybe |
| // we need a cutoff timestamp? Or maybe we need to start leaving |
| // tombstones in ReviewDb? |
| c = ChangeNotes.Factory.newNoteDbOnlyChange(project, id); |
| } |
| NoteDbChangeState.checkNotReadOnly(c, skewMs); |
| } |
| ChangeNotes notes = changeNotesFactory.createForBatchUpdate(c, !isNew); |
| ChangeControl ctl = changeControlFactory.controlFor(notes, user); |
| return new ChangeContextImpl(ctl, new BatchUpdateReviewDb(db), repo, rw); |
| } |
| |
| private NoteDbUpdateManager stageNoteDbUpdate(ChangeContextImpl ctx, boolean deleted) |
| throws OrmException, IOException { |
| logDebug("Staging NoteDb update"); |
| NoteDbUpdateManager updateManager = |
| updateManagerFactory |
| .create(ctx.getProject()) |
| .setChangeRepo( |
| ctx.getRepository(), ctx.getRevWalk(), null, new ChainedReceiveCommands(repo)); |
| if (ctx.getUser().isIdentifiedUser()) { |
| updateManager.setRefLogIdent( |
| ctx.getUser().asIdentifiedUser().newRefLogIdent(ctx.getWhen(), tz)); |
| } |
| for (ChangeUpdate u : ctx.updates.values()) { |
| updateManager.add(u); |
| } |
| |
| Change c = ctx.getChange(); |
| if (deleted) { |
| updateManager.deleteChange(c.getId()); |
| } |
| try { |
| updateManager.stageAndApplyDelta(c); |
| } catch (MismatchedStateException ex) { |
| // Refused to apply update because NoteDb was out of sync, which can |
| // only happen if ReviewDb is the primary storage for this change. |
| // |
| // Go ahead with this ReviewDb update; it's still out of sync, but this |
| // is no worse than before, and it will eventually get rebuilt. |
| logDebug("Ignoring MismatchedStateException while staging"); |
| } |
| |
| return updateManager; |
| } |
| |
| private boolean isNewChange(Change.Id id) { |
| return newChanges.containsKey(id); |
| } |
| |
| private void logDebug(String msg, Throwable t) { |
| if (log.isDebugEnabled()) { |
| ReviewDbBatchUpdate.this.logDebug("[" + taskId + "]" + msg, t); |
| } |
| } |
| |
| private void logDebug(String msg, Object... args) { |
| if (log.isDebugEnabled()) { |
| ReviewDbBatchUpdate.this.logDebug("[" + taskId + "]" + msg, args); |
| } |
| } |
| } |
| |
| private static Iterable<Change> changesToUpdate(ChangeContextImpl ctx) { |
| Change c = ctx.getChange(); |
| if (ctx.bumpLastUpdatedOn && c.getLastUpdatedOn().before(ctx.getWhen())) { |
| c.setLastUpdatedOn(ctx.getWhen()); |
| } |
| return Collections.singleton(c); |
| } |
| |
| private void executePostOps() throws Exception { |
| ContextImpl ctx = new ContextImpl(); |
| for (BatchUpdateOp op : ops.values()) { |
| op.postUpdate(ctx); |
| } |
| |
| for (RepoOnlyOp op : repoOnlyOps) { |
| op.postUpdate(ctx); |
| } |
| } |
| } |