| // Copyright (C) 2017 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.gerrit.server.notedb.rebuild; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static com.google.gerrit.reviewdb.server.ReviewDbUtil.unwrapDb; |
| import static com.google.gerrit.server.notedb.NotesMigration.SECTION_NOTE_DB; |
| import static com.google.gerrit.server.notedb.NotesMigrationState.NOTE_DB; |
| import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_NO_SEQUENCE; |
| import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY; |
| import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY; |
| import static com.google.gerrit.server.notedb.NotesMigrationState.WRITE; |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static java.util.Comparator.comparing; |
| import static java.util.Objects.requireNonNull; |
| import static java.util.stream.Collectors.joining; |
| import static java.util.stream.Collectors.toList; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableListMultimap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.MultimapBuilder; |
| import com.google.common.collect.Ordering; |
| import com.google.common.collect.SetMultimap; |
| import com.google.common.collect.Streams; |
| import com.google.common.flogger.FluentLogger; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.gerrit.common.FormatUtil; |
| import com.google.gerrit.common.Nullable; |
| import com.google.gerrit.extensions.registration.DynamicSet; |
| import com.google.gerrit.reviewdb.client.Change; |
| import com.google.gerrit.reviewdb.client.Change.Id; |
| import com.google.gerrit.reviewdb.client.Project; |
| import com.google.gerrit.reviewdb.server.ReviewDb; |
| import com.google.gerrit.reviewdb.server.ReviewDbWrapper; |
| import com.google.gerrit.server.GerritPersonIdent; |
| import com.google.gerrit.server.InternalUser; |
| import com.google.gerrit.server.Sequences; |
| import com.google.gerrit.server.config.AllProjectsName; |
| import com.google.gerrit.server.config.AllUsersName; |
| import com.google.gerrit.server.config.GerritServerConfigProvider; |
| import com.google.gerrit.server.config.SitePaths; |
| import com.google.gerrit.server.extensions.events.GitReferenceUpdated; |
| import com.google.gerrit.server.git.GitRepositoryManager; |
| import com.google.gerrit.server.git.LockFailureException; |
| import com.google.gerrit.server.git.WorkQueue; |
| import com.google.gerrit.server.notedb.ChangeBundleReader; |
| import com.google.gerrit.server.notedb.ChangeNotes; |
| import com.google.gerrit.server.notedb.MutableNotesMigration; |
| import com.google.gerrit.server.notedb.NoteDbTable; |
| import com.google.gerrit.server.notedb.NoteDbUpdateManager; |
| import com.google.gerrit.server.notedb.NotesMigrationState; |
| import com.google.gerrit.server.notedb.PrimaryStorageMigrator; |
| import com.google.gerrit.server.notedb.RepoSequence; |
| import com.google.gerrit.server.notedb.rebuild.ChangeRebuilder.NoPatchSetsException; |
| import com.google.gerrit.server.project.NoSuchChangeException; |
| import com.google.gerrit.server.project.ProjectCache; |
| import com.google.gerrit.server.update.ChainedReceiveCommands; |
| import com.google.gerrit.server.update.RefUpdateUtil; |
| import com.google.gerrit.server.util.ManualRequestContext; |
| import com.google.gerrit.server.util.ThreadLocalRequestContext; |
| import com.google.gwtorm.server.OrmException; |
| import com.google.gwtorm.server.SchemaFactory; |
| import com.google.inject.Inject; |
| import com.google.inject.Provider; |
| import java.io.BufferedWriter; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.io.PrintWriter; |
| import java.text.ParseException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Consumer; |
| import java.util.function.Predicate; |
| import org.eclipse.jgit.errors.ConfigInvalidException; |
| import org.eclipse.jgit.errors.RepositoryNotFoundException; |
| import org.eclipse.jgit.internal.storage.file.FileRepository; |
| import org.eclipse.jgit.internal.storage.file.GC; |
| import org.eclipse.jgit.internal.storage.file.PackInserter; |
| import org.eclipse.jgit.lib.BatchRefUpdate; |
| import org.eclipse.jgit.lib.Config; |
| import org.eclipse.jgit.lib.ObjectInserter; |
| import org.eclipse.jgit.lib.ObjectReader; |
| import org.eclipse.jgit.lib.PersonIdent; |
| import org.eclipse.jgit.lib.ProgressMonitor; |
| import org.eclipse.jgit.lib.Repository; |
| import org.eclipse.jgit.lib.TextProgressMonitor; |
| import org.eclipse.jgit.revwalk.RevWalk; |
| import org.eclipse.jgit.storage.file.FileBasedConfig; |
| import org.eclipse.jgit.storage.pack.PackConfig; |
| import org.eclipse.jgit.transport.ReceiveCommand; |
| import org.eclipse.jgit.util.FS; |
| |
| /** One stop shop for migrating a site's change storage from ReviewDb to NoteDb. */ |
| public class NoteDbMigrator implements AutoCloseable { |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| |
| private static final String AUTO_MIGRATE = "autoMigrate"; |
| private static final String TRIAL = "trial"; |
| |
| private static final int PROJECT_SLICE_MAX_REFS = 1000; |
| private static final int GC_INTERVAL = 10000; |
| |
| public static boolean getAutoMigrate(Config cfg) { |
| return cfg.getBoolean(SECTION_NOTE_DB, NoteDbTable.CHANGES.key(), AUTO_MIGRATE, false); |
| } |
| |
| private static void setAutoMigrate(Config cfg, boolean autoMigrate) { |
| cfg.setBoolean(SECTION_NOTE_DB, NoteDbTable.CHANGES.key(), AUTO_MIGRATE, autoMigrate); |
| } |
| |
| public static boolean getTrialMode(Config cfg) { |
| return cfg.getBoolean(SECTION_NOTE_DB, NoteDbTable.CHANGES.key(), TRIAL, false); |
| } |
| |
| public static void setTrialMode(Config cfg, boolean trial) { |
| cfg.setBoolean(SECTION_NOTE_DB, NoteDbTable.CHANGES.key(), TRIAL, trial); |
| } |
| |
| private static class NoteDbMigrationLoggerOut extends OutputStream { |
| private StringBuilder outputBuffer = new StringBuilder(); |
| |
| @Override |
| public synchronized void write(int b) throws IOException { |
| if (b == '\r' || b == '\n') { |
| logger.atInfo().log(outputBuffer.toString()); |
| outputBuffer = new StringBuilder(); |
| } else { |
| outputBuffer.append(Character.toChars(b)); |
| } |
| } |
| } |
| |
| public static class Builder { |
| private final Config cfg; |
| private final SitePaths sitePaths; |
| private final Provider<PersonIdent> serverIdent; |
| private final AllUsersName allUsers; |
| private final SchemaFactory<ReviewDb> schemaFactory; |
| private final GitRepositoryManager repoManager; |
| private final NoteDbUpdateManager.Factory updateManagerFactory; |
| private final ChangeBundleReader bundleReader; |
| private final AllProjectsName allProjects; |
| private final InternalUser.Factory userFactory; |
| private final ThreadLocalRequestContext requestContext; |
| private final ChangeRebuilderImpl rebuilder; |
| private final WorkQueue workQueue; |
| private final MutableNotesMigration globalNotesMigration; |
| private final PrimaryStorageMigrator primaryStorageMigrator; |
| private final DynamicSet<NotesMigrationStateListener> listeners; |
| private final ProjectCache projectCache; |
| |
| private int threads; |
| private ImmutableList<Project.NameKey> projects = ImmutableList.of(); |
| private ImmutableList<Project.NameKey> skipProjects = ImmutableList.of(); |
| private ImmutableList<Change.Id> changes = ImmutableList.of(); |
| private OutputStream progressOut = new NoteDbMigrationLoggerOut(); |
| private NotesMigrationState stopAtState; |
| private boolean trial; |
| private boolean forceRebuild; |
| private boolean forceStateChangeWithSkip; |
| private int sequenceGap = -1; |
| private boolean autoMigrate; |
| private boolean verbose; |
| |
| @Inject |
| Builder( |
| GerritServerConfigProvider configProvider, |
| SitePaths sitePaths, |
| @GerritPersonIdent Provider<PersonIdent> serverIdent, |
| AllUsersName allUsers, |
| SchemaFactory<ReviewDb> schemaFactory, |
| GitRepositoryManager repoManager, |
| NoteDbUpdateManager.Factory updateManagerFactory, |
| ChangeBundleReader bundleReader, |
| AllProjectsName allProjects, |
| ThreadLocalRequestContext requestContext, |
| InternalUser.Factory userFactory, |
| ChangeRebuilderImpl rebuilder, |
| WorkQueue workQueue, |
| MutableNotesMigration globalNotesMigration, |
| PrimaryStorageMigrator primaryStorageMigrator, |
| DynamicSet<NotesMigrationStateListener> listeners, |
| ProjectCache projectCache) { |
| // Reload gerrit.config/notedb.config on each migrator invocation, in case a previous |
| // migration in the same process modified the on-disk contents. This ensures the defaults for |
| // trial/autoMigrate get set correctly below. |
| this.cfg = configProvider.loadConfig(); |
| this.sitePaths = sitePaths; |
| this.serverIdent = serverIdent; |
| this.allUsers = allUsers; |
| this.schemaFactory = schemaFactory; |
| this.repoManager = repoManager; |
| this.updateManagerFactory = updateManagerFactory; |
| this.bundleReader = bundleReader; |
| this.allProjects = allProjects; |
| this.requestContext = requestContext; |
| this.userFactory = userFactory; |
| this.rebuilder = rebuilder; |
| this.workQueue = workQueue; |
| this.globalNotesMigration = globalNotesMigration; |
| this.primaryStorageMigrator = primaryStorageMigrator; |
| this.listeners = listeners; |
| this.projectCache = projectCache; |
| this.trial = getTrialMode(cfg); |
| this.autoMigrate = getAutoMigrate(cfg); |
| } |
| |
| /** |
| * Set the number of threads used by parallelizable phases of the migration, such as rebuilding |
| * all changes. |
| * |
| * <p>Not all phases are parallelizable, and calling {@link #rebuild()} directly will do |
| * substantial work in the calling thread regardless of the number of threads configured. |
| * |
| * <p>By default, all work is done in the calling thread. |
| * |
| * @param threads thread count; if less than 2, all work happens in the calling thread. |
| * @return this. |
| */ |
| public Builder setThreads(int threads) { |
| this.threads = threads; |
| return this; |
| } |
| |
| /** |
| * Limit the set of projects that are processed. |
| * |
| * <p>Incompatible with {@link #setChanges(Collection)}. |
| * |
| * <p>By default, all projects will be processed. |
| * |
| * @param projects set of projects; if null or empty, all projects will be processed. |
| * @return this. |
| */ |
| public Builder setProjects(@Nullable Collection<Project.NameKey> projects) { |
| this.projects = projects != null ? ImmutableList.copyOf(projects) : ImmutableList.of(); |
| return this; |
| } |
| |
| /** |
| * Process all projects except these |
| * |
| * <p>Incompatible with {@link #setProjects(Collection)} and {@link #setChanges(Collection)} |
| * |
| * <p>By default, all projects will be processed. |
| * |
| * @param skipProjects set of projects; if null or empty all project will be processed |
| * @return this. |
| */ |
| public Builder setSkipProjects(@Nullable Collection<Project.NameKey> skipProjects) { |
| this.skipProjects = |
| skipProjects != null ? ImmutableList.copyOf(skipProjects) : ImmutableList.of(); |
| return this; |
| } |
| |
| /** |
| * Limit the set of changes that are processed. |
| * |
| * <p>Incompatible with {@link #setProjects(Collection)}. |
| * |
| * <p>By default, all changes will be processed. |
| * |
| * @param changes set of changes; if null or empty, all changes will be processed. |
| * @return this. |
| */ |
| public Builder setChanges(@Nullable Collection<Change.Id> changes) { |
| this.changes = changes != null ? ImmutableList.copyOf(changes) : ImmutableList.of(); |
| return this; |
| } |
| |
| /** |
| * Set output stream for progress monitors. |
| * |
| * <p>By default, there is no progress monitor output (although there may be other logs). |
| * |
| * @param progressOut output stream. |
| * @return this. |
| */ |
| public Builder setProgressOut(OutputStream progressOut) { |
| this.progressOut = requireNonNull(progressOut); |
| return this; |
| } |
| |
| /** |
| * Stop at a specific migration state, for testing only. |
| * |
| * @param stopAtState state to stop at. |
| * @return this. |
| */ |
| @VisibleForTesting |
| public Builder setStopAtStateForTesting(NotesMigrationState stopAtState) { |
| this.stopAtState = stopAtState; |
| return this; |
| } |
| |
| /** |
| * Rebuild in "trial mode": configure Gerrit to write to and read from NoteDb, but leave |
| * ReviewDb as the source of truth for all changes. |
| * |
| * <p>By default, trial mode is off, and NoteDb is the source of truth for all changes following |
| * the migration. |
| * |
| * @param trial whether to rebuild in trial mode. |
| * @return this. |
| */ |
| public Builder setTrialMode(boolean trial) { |
| this.trial = trial; |
| return this; |
| } |
| |
| /** |
| * Rebuild all changes in NoteDb from ReviewDb, even if Gerrit is currently configured to read |
| * from NoteDb. |
| * |
| * <p>Only supported if ReviewDb is still the source of truth for all changes. |
| * |
| * <p>By default, force rebuilding is off. |
| * |
| * @param forceRebuild whether to force rebuilding. |
| * @return this. |
| */ |
| public Builder setForceRebuild(boolean forceRebuild) { |
| this.forceRebuild = forceRebuild; |
| return this; |
| } |
| |
| /** |
| * Force state change to next migration state if some projects were skipped. |
| * |
| * <p>This makes sense when the skipped projects are migrated in a copy of the site and migrated |
| * data will be transported using git fetch. |
| * |
| * @param forceStateChangeWithSkip whether state change to next migration state should be |
| * enforced if some projects were skipped. |
| * @return this. |
| */ |
| public Builder setForceStateChangeWithSkip(boolean forceStateChangeWithSkip) { |
| this.forceStateChangeWithSkip = forceStateChangeWithSkip; |
| return this; |
| } |
| |
| /** |
| * Gap between ReviewDb change sequence numbers and NoteDb. |
| * |
| * <p>If NoteDb sequences are enabled in a running server, there is a race between the migration |
| * step that calls {@code nextChangeId()} to seed the ref, and other threads that call {@code |
| * nextChangeId()} to create new changes. In order to prevent these operations stepping on one |
| * another, we use this value to skip some predefined sequence numbers. This is strongly |
| * recommended in a running server. |
| * |
| * <p>If the migration takes place offline, there is no race with other threads, and this option |
| * may be set to 0. However, admins may still choose to use a gap, for example to make it easier |
| * to distinguish changes that were created before and after the NoteDb migration. |
| * |
| * <p>By default, uses the value from {@code noteDb.changes.initialSequenceGap} in {@code |
| * gerrit.config}, which defaults to 1000. |
| * |
| * @param sequenceGap sequence gap size; if negative, use the default. |
| * @return this. |
| */ |
| public Builder setSequenceGap(int sequenceGap) { |
| this.sequenceGap = sequenceGap; |
| return this; |
| } |
| |
| /** |
| * Enable auto-migration on subsequent daemon launches. |
| * |
| * <p>If true, prior to running any migration steps, sets the necessary configuration in {@code |
| * gerrit.config} to make {@code gerrit.war daemon} retry the migration on next startup, if it |
| * fails. |
| * |
| * @param autoMigrate whether to set auto-migration config. |
| * @return this. |
| */ |
| public Builder setAutoMigrate(boolean autoMigrate) { |
| this.autoMigrate = autoMigrate; |
| return this; |
| } |
| |
| /** |
| * Enable verbose log output |
| * |
| * @param verbose enable verbose log output |
| * @return this. |
| */ |
| public Builder setVerbose(boolean verbose) { |
| this.verbose = verbose; |
| return this; |
| } |
| |
| public NoteDbMigrator build() throws MigrationException { |
| return new NoteDbMigrator( |
| sitePaths, |
| schemaFactory, |
| serverIdent, |
| allUsers, |
| repoManager, |
| updateManagerFactory, |
| bundleReader, |
| allProjects, |
| requestContext, |
| userFactory, |
| rebuilder, |
| globalNotesMigration, |
| primaryStorageMigrator, |
| listeners, |
| threads > 1 |
| ? MoreExecutors.listeningDecorator( |
| workQueue.createQueue(threads, "RebuildChange", true)) |
| : MoreExecutors.newDirectExecutorService(), |
| projects, |
| skipProjects, |
| changes, |
| progressOut, |
| stopAtState, |
| projectCache, |
| trial, |
| forceRebuild, |
| forceStateChangeWithSkip, |
| sequenceGap >= 0 ? sequenceGap : Sequences.getChangeSequenceGap(cfg), |
| autoMigrate, |
| verbose); |
| } |
| } |
| |
| private static class ProjectContext { |
| final ReentrantLock gcLock; |
| final Project.NameKey project; |
| int sliceCount; |
| int changeCount; |
| final AtomicLong changesMigratedCount; |
| |
| ProjectContext(Project.NameKey project, int sliceCount) { |
| this.gcLock = new ReentrantLock(); |
| this.project = project; |
| this.sliceCount = sliceCount; |
| this.changesMigratedCount = new AtomicLong(); |
| } |
| } |
| |
| private static class ProjectSlice { |
| final ProjectContext ctx; |
| final List<Id> changes; |
| int sliceNumber = 0; |
| |
| ProjectSlice(ProjectContext ctx, List<Id> changes, int sliceNumber) { |
| this.ctx = ctx; |
| this.changes = changes; |
| this.sliceNumber = sliceNumber; |
| } |
| |
| @Override |
| public String toString() { |
| return "Slice [project=" + ctx.project + "]"; |
| } |
| } |
| |
| private final FileBasedConfig gerritConfig; |
| private final FileBasedConfig noteDbConfig; |
| private final SchemaFactory<ReviewDb> schemaFactory; |
| private final Provider<PersonIdent> serverIdent; |
| private final AllUsersName allUsers; |
| private final GitRepositoryManager repoManager; |
| private final NoteDbUpdateManager.Factory updateManagerFactory; |
| private final ChangeBundleReader bundleReader; |
| private final AllProjectsName allProjects; |
| private final ThreadLocalRequestContext requestContext; |
| private final InternalUser.Factory userFactory; |
| private final ChangeRebuilderImpl rebuilder; |
| private final MutableNotesMigration globalNotesMigration; |
| private final PrimaryStorageMigrator primaryStorageMigrator; |
| private final DynamicSet<NotesMigrationStateListener> listeners; |
| |
| private final ListeningExecutorService executor; |
| private final ImmutableList<Project.NameKey> projects; |
| private final ImmutableList<Project.NameKey> skipProjects; |
| private final ImmutableList<Change.Id> changes; |
| private final OutputStream progressOut; |
| private final NotesMigrationState stopAtState; |
| private final boolean trial; |
| private final boolean forceRebuild; |
| private final boolean forceStateChangeWithSkip; |
| private final int sequenceGap; |
| private final boolean autoMigrate; |
| private final boolean verbose; |
| |
| private final AtomicLong globalChangeCounter = new AtomicLong(); |
| private long totalChangeCount; |
| |
| private NoteDbMigrator( |
| SitePaths sitePaths, |
| SchemaFactory<ReviewDb> schemaFactory, |
| Provider<PersonIdent> serverIdent, |
| AllUsersName allUsers, |
| GitRepositoryManager repoManager, |
| NoteDbUpdateManager.Factory updateManagerFactory, |
| ChangeBundleReader bundleReader, |
| AllProjectsName allProjects, |
| ThreadLocalRequestContext requestContext, |
| InternalUser.Factory userFactory, |
| ChangeRebuilderImpl rebuilder, |
| MutableNotesMigration globalNotesMigration, |
| PrimaryStorageMigrator primaryStorageMigrator, |
| DynamicSet<NotesMigrationStateListener> listeners, |
| ListeningExecutorService executor, |
| ImmutableList<Project.NameKey> projects, |
| ImmutableList<Project.NameKey> skipProjects, |
| ImmutableList<Change.Id> changes, |
| OutputStream progressOut, |
| NotesMigrationState stopAtState, |
| ProjectCache projectCache, |
| boolean trial, |
| boolean forceRebuild, |
| boolean forceStateChangeWithSkip, |
| int sequenceGap, |
| boolean autoMigrate, |
| boolean verbose) |
| throws MigrationException { |
| if (ImmutableList.of(!changes.isEmpty(), !projects.isEmpty(), !skipProjects.isEmpty()).stream() |
| .filter(e -> e) |
| .count() |
| > 1) { |
| throw new MigrationException("Cannot combine changes, projects and skipProjects"); |
| } |
| if (sequenceGap < 0) { |
| throw new MigrationException("Sequence gap must be non-negative: " + sequenceGap); |
| } |
| |
| this.schemaFactory = schemaFactory; |
| this.serverIdent = serverIdent; |
| this.allUsers = allUsers; |
| this.rebuilder = rebuilder; |
| this.repoManager = repoManager; |
| this.updateManagerFactory = updateManagerFactory; |
| this.bundleReader = bundleReader; |
| this.allProjects = allProjects; |
| this.requestContext = requestContext; |
| this.userFactory = userFactory; |
| this.globalNotesMigration = globalNotesMigration; |
| this.primaryStorageMigrator = primaryStorageMigrator; |
| this.listeners = listeners; |
| this.executor = executor; |
| this.projects = projects; |
| this.skipProjects = skipProjects; |
| this.changes = changes; |
| this.progressOut = progressOut; |
| this.stopAtState = stopAtState; |
| this.trial = trial; |
| this.forceRebuild = forceRebuild; |
| this.forceStateChangeWithSkip = forceStateChangeWithSkip; |
| this.sequenceGap = sequenceGap; |
| this.autoMigrate = autoMigrate; |
| this.verbose = verbose; |
| |
| // Stack notedb.config over gerrit.config, in the same way as GerritServerConfigProvider. |
| this.gerritConfig = new FileBasedConfig(sitePaths.gerrit_config.toFile(), FS.detect()); |
| this.noteDbConfig = |
| new FileBasedConfig(gerritConfig, sitePaths.notedb_config.toFile(), FS.detect()); |
| } |
| |
| @Override |
| public void close() { |
| executor.shutdownNow(); |
| } |
| |
| public void migrate() throws OrmException, IOException { |
| if (!changes.isEmpty() |
| || !projects.isEmpty() |
| || (!forceStateChangeWithSkip && !skipProjects.isEmpty())) { |
| throw new MigrationException( |
| "Cannot set changes or projects or skipProjects during full migration; call rebuild()" |
| + " instead"); |
| } |
| Optional<NotesMigrationState> maybeState = loadState(); |
| if (!maybeState.isPresent()) { |
| throw new MigrationException("Could not determine initial migration state"); |
| } |
| |
| NotesMigrationState state = maybeState.get(); |
| if (trial && state.compareTo(READ_WRITE_NO_SEQUENCE) > 0) { |
| throw new MigrationException( |
| "Migration has already progressed past the endpoint of the \"trial mode\" state;" |
| + " NoteDb is already the primary storage for some changes"); |
| } |
| if (forceRebuild && state.compareTo(READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY) > 0) { |
| throw new MigrationException( |
| "Cannot force rebuild changes; NoteDb is already the primary storage for some changes"); |
| } |
| setControlFlags(); |
| |
| boolean rebuilt = false; |
| while (state.compareTo(NOTE_DB) < 0) { |
| if (state.equals(stopAtState)) { |
| return; |
| } |
| boolean stillNeedsRebuild = forceRebuild && !rebuilt; |
| if (trial && state.compareTo(READ_WRITE_NO_SEQUENCE) >= 0) { |
| if (stillNeedsRebuild && state == READ_WRITE_NO_SEQUENCE) { |
| // We're at the end state of trial mode, but still need a rebuild due to forceRebuild. Let |
| // the loop go one more time. |
| } else { |
| return; |
| } |
| } |
| switch (state) { |
| case REVIEW_DB: |
| state = turnOnWrites(state); |
| break; |
| case WRITE: |
| state = rebuildAndEnableReads(state); |
| rebuilt = true; |
| break; |
| case READ_WRITE_NO_SEQUENCE: |
| if (stillNeedsRebuild) { |
| state = rebuildAndEnableReads(state); |
| rebuilt = true; |
| } else { |
| state = enableSequences(state); |
| } |
| break; |
| case READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY: |
| if (stillNeedsRebuild) { |
| state = rebuildAndEnableReads(state); |
| rebuilt = true; |
| } else { |
| state = setNoteDbPrimary(state); |
| } |
| break; |
| case READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY: |
| // The only way we can get here is if there was a failure on a previous run of |
| // setNoteDbPrimary, since that method moves to NOTE_DB if it completes |
| // successfully. Assume that not all changes were converted and re-run the step. |
| // migrateToNoteDbPrimary is a relatively fast no-op for already-migrated changes, so this |
| // isn't actually repeating work. |
| state = setNoteDbPrimary(state); |
| break; |
| case NOTE_DB: |
| // Done! |
| break; |
| default: |
| throw new MigrationException( |
| "Migration out of the following state is not supported:\n" + state.toText()); |
| } |
| } |
| } |
| |
| private NotesMigrationState turnOnWrites(NotesMigrationState prev) throws IOException { |
| return saveState(prev, WRITE); |
| } |
| |
| private NotesMigrationState rebuildAndEnableReads(NotesMigrationState prev) |
| throws OrmException, IOException { |
| rebuild(); |
| return saveState(prev, READ_WRITE_NO_SEQUENCE); |
| } |
| |
| private NotesMigrationState enableSequences(NotesMigrationState prev) |
| throws OrmException, IOException { |
| try (ReviewDb db = schemaFactory.open()) { |
| @SuppressWarnings("deprecation") |
| final int nextChangeId = db.nextChangeId(); |
| |
| RepoSequence seq = |
| new RepoSequence( |
| repoManager, |
| GitReferenceUpdated.DISABLED, |
| allProjects, |
| Sequences.NAME_CHANGES, |
| // If sequenceGap is 0, this writes into the sequence ref the same ID that is returned |
| // by the call to seq.next() below. If we actually used this as a change ID, that |
| // would be a problem, but we just discard it, so this is safe. |
| () -> nextChangeId + sequenceGap - 1, |
| 1, |
| nextChangeId); |
| seq.next(); |
| } |
| return saveState(prev, READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY); |
| } |
| |
| private NotesMigrationState setNoteDbPrimary(NotesMigrationState prev) |
| throws MigrationException, OrmException, IOException { |
| checkState( |
| projects.isEmpty() |
| && changes.isEmpty() |
| && (forceStateChangeWithSkip || skipProjects.isEmpty()), |
| "Should not have attempted setNoteDbPrimary with a subset of changes"); |
| checkState( |
| prev == READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY |
| || prev == READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY, |
| "Unexpected start state for setNoteDbPrimary: %s", |
| prev); |
| |
| // Before changing the primary storage of old changes, ensure new changes are created with |
| // NoteDb primary. |
| prev = saveState(prev, READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY); |
| |
| Stopwatch sw = Stopwatch.createStarted(); |
| logger.atInfo().log("Setting primary storage to NoteDb"); |
| List<Change.Id> allChanges; |
| try (ReviewDb db = unwrapDb(schemaFactory.open())) { |
| if (forceStateChangeWithSkip) { |
| allChanges = |
| Streams.stream(db.changes().all()) |
| .filter(c -> !skipProjects.contains(c.getProject())) |
| .map(Change::getId) |
| .collect(toList()); |
| } else { |
| allChanges = Streams.stream(db.changes().all()).map(Change::getId).collect(toList()); |
| } |
| } |
| |
| try (ContextHelper contextHelper = new ContextHelper()) { |
| List<ListenableFuture<Boolean>> futures = |
| Lists.partition(allChanges, PROJECT_SLICE_MAX_REFS).stream() |
| .map( |
| partition -> |
| executor.submit( |
| () -> { |
| try (ManualRequestContext ctx = contextHelper.open()) { |
| return primaryStorageMigrator.migrateToNoteDbPrimary(partition); |
| } |
| })) |
| .collect(toList()); |
| boolean ok = futuresToBoolean(futures, "Error migrating primary storage"); |
| double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; |
| logger.atInfo().log( |
| "Migrated primary storage of %d changes in %.01fs (%.01f/s)\n", |
| allChanges.size(), t, allChanges.size() / t); |
| if (!ok) { |
| throw new MigrationException("Migrating primary storage for some changes failed, see log"); |
| } |
| } |
| |
| return disableReviewDb(prev); |
| } |
| |
| private NotesMigrationState disableReviewDb(NotesMigrationState prev) throws IOException { |
| return saveState(prev, NOTE_DB, c -> setAutoMigrate(c, false)); |
| } |
| |
| private Optional<NotesMigrationState> loadState() throws IOException { |
| try { |
| gerritConfig.load(); |
| noteDbConfig.load(); |
| return NotesMigrationState.forConfig(noteDbConfig); |
| } catch (ConfigInvalidException | IllegalArgumentException e) { |
| logger.atWarning().withCause(e).log( |
| "error reading NoteDb migration options from %s", noteDbConfig.getFile()); |
| return Optional.empty(); |
| } |
| } |
| |
| private NotesMigrationState saveState( |
| NotesMigrationState expectedOldState, NotesMigrationState newState) throws IOException { |
| return saveState(expectedOldState, newState, c -> {}); |
| } |
| |
| private NotesMigrationState saveState( |
| NotesMigrationState expectedOldState, |
| NotesMigrationState newState, |
| Consumer<Config> additionalUpdates) |
| throws IOException { |
| synchronized (globalNotesMigration) { |
| // This read-modify-write is racy. We're counting on the fact that no other Gerrit operation |
| // modifies gerrit.config, and hoping that admins don't either. |
| Optional<NotesMigrationState> actualOldState = loadState(); |
| if (!actualOldState.equals(Optional.of(expectedOldState))) { |
| throw new MigrationException( |
| "Cannot move to new state:\n" |
| + newState.toText() |
| + "\n\n" |
| + "Expected this state in gerrit.config:\n" |
| + expectedOldState.toText() |
| + "\n\n" |
| + (actualOldState.isPresent() |
| ? "But found this state:\n" + actualOldState.get().toText() |
| : "But could not parse the current state")); |
| } |
| |
| preStateChange(expectedOldState, newState); |
| |
| newState.setConfigValues(noteDbConfig); |
| additionalUpdates.accept(noteDbConfig); |
| noteDbConfig.save(); |
| |
| // Only set in-memory state once it's been persisted to storage. |
| globalNotesMigration.setFrom(newState); |
| logger.atInfo().log("Migration state: %s => %s", expectedOldState, newState); |
| |
| return newState; |
| } |
| } |
| |
| private void preStateChange(NotesMigrationState oldState, NotesMigrationState newState) |
| throws IOException { |
| for (NotesMigrationStateListener listener : listeners) { |
| listener.preStateChange(oldState, newState); |
| } |
| } |
| |
| private void setControlFlags() throws MigrationException { |
| synchronized (globalNotesMigration) { |
| try { |
| noteDbConfig.load(); |
| setAutoMigrate(noteDbConfig, autoMigrate); |
| setTrialMode(noteDbConfig, trial); |
| noteDbConfig.save(); |
| } catch (ConfigInvalidException | IOException e) { |
| throw new MigrationException("Error saving auto-migration config", e); |
| } |
| } |
| } |
| |
| private List<ProjectSlice> slice() throws OrmException { |
| ImmutableListMultimap<Project.NameKey, Change.Id> changesByProject = getChangesByProject(); |
| List<Project.NameKey> projectNames = |
| Ordering.usingToString().sortedCopy(changesByProject.keySet()); |
| List<ProjectSlice> slices = Lists.newArrayList(); |
| for (Project.NameKey project : projectNames) { |
| int sliceNumber = 1; |
| List<List<Id>> projectSlices = |
| Lists.partition(changesByProject.get(project), PROJECT_SLICE_MAX_REFS); |
| ProjectContext ctx = new ProjectContext(project, projectSlices.size()); |
| ctx.changeCount = changesByProject.get(project).size(); |
| for (List<Id> s : projectSlices) { |
| ProjectSlice ps = new ProjectSlice(ctx, s, sliceNumber++); |
| slices.add(ps); |
| } |
| } |
| Collections.shuffle(slices); |
| totalChangeCount = changesByProject.size(); |
| return slices; |
| } |
| |
| public void rebuild() throws MigrationException, OrmException { |
| if (!globalNotesMigration.commitChangeWrites()) { |
| throw new MigrationException("Cannot rebuild without noteDb.changes.write=true"); |
| } |
| Stopwatch sw = Stopwatch.createStarted(); |
| logger.atInfo().log("Rebuilding changes in NoteDb"); |
| |
| List<ProjectSlice> slices = slice(); |
| List<ListenableFuture<Boolean>> futures = new ArrayList<>(); |
| for (ProjectSlice slice : slices) { |
| ListenableFuture<Boolean> future = |
| executor.submit( |
| () -> { |
| try (ContextHelper contextHelper = new ContextHelper()) { |
| return rebuildProjectSlice(contextHelper.getReviewDb(), slice); |
| } catch (Exception e) { |
| logger.atSevere().withCause(e).log("Error rebuilding project %s", slice.ctx); |
| return false; |
| } |
| }); |
| futures.add(future); |
| } |
| boolean ok = futuresToBoolean(futures, "Error rebuilding projects"); |
| double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; |
| logger.atInfo().log( |
| "Rebuilt %d changes in %.01fs (%.01f/s)\n", totalChangeCount, t, totalChangeCount / t); |
| if (!ok) { |
| throw new MigrationException("Rebuilding some changes failed, see log"); |
| } |
| } |
| |
| private ImmutableListMultimap<Project.NameKey, Change.Id> getChangesByProject() |
| throws OrmException { |
| // Memoize all changes so we can close the db connection and allow other threads to use the full |
| // connection pool. |
| SetMultimap<Project.NameKey, Change.Id> out = |
| MultimapBuilder.treeKeys(comparing(Project.NameKey::get)) |
| .treeSetValues(comparing(Change.Id::get)) |
| .build(); |
| try (ReviewDb db = unwrapDb(schemaFactory.open())) { |
| if (!projects.isEmpty()) { |
| return byProject(db.changes().all(), c -> projects.contains(c.getProject()), out); |
| } |
| if (!skipProjects.isEmpty()) { |
| return byProject(db.changes().all(), c -> !skipProjects.contains(c.getProject()), out); |
| } |
| if (!changes.isEmpty()) { |
| return byProject(db.changes().get(changes), c -> true, out); |
| } |
| return byProject(db.changes().all(), c -> true, out); |
| } |
| } |
| |
| private static ImmutableListMultimap<Project.NameKey, Change.Id> byProject( |
| Iterable<Change> changes, |
| Predicate<Change> pred, |
| SetMultimap<Project.NameKey, Change.Id> out) { |
| Streams.stream(changes).filter(pred).forEach(c -> out.put(c.getProject(), c.getId())); |
| return ImmutableListMultimap.copyOf(out); |
| } |
| |
| private static ObjectInserter newPackInserter(Repository repo) { |
| if (!(repo instanceof FileRepository)) { |
| return repo.newObjectInserter(); |
| } |
| PackInserter ins = ((FileRepository) repo).getObjectDatabase().newPackInserter(); |
| ins.checkExisting(false); |
| return ins; |
| } |
| |
| private boolean rebuildProjectSlice(ReviewDb db, ProjectSlice slice) { |
| ProjectContext ctx = slice.ctx; |
| boolean ok = true; |
| ProgressMonitor pm = |
| new TextProgressMonitor( |
| new PrintWriter(new BufferedWriter(new OutputStreamWriter(progressOut, UTF_8)))); |
| Project.NameKey project = ctx.project; |
| String oldThreadName = Thread.currentThread().getName(); |
| Thread.currentThread() |
| .setName( |
| String.format( |
| "Rebuild %s (slice %d/%d)", ctx.project, slice.sliceNumber, ctx.sliceCount)); |
| try (Repository changeRepo = repoManager.openRepository(project); |
| // Only use a PackInserter for the change repo, not All-Users. |
| // |
| // It's not possible to share a single inserter for All-Users across all project tasks, and |
| // we don't want to add one pack per project to All-Users. Adding many loose objects is |
| // preferable to many packs. |
| // |
| // Anyway, the number of objects inserted into All-Users is proportional to the number |
| // of pending draft comments, which should not be high (relative to the total number of |
| // changes), so the number of loose objects shouldn't be too unreasonable. |
| ObjectInserter changeIns = newPackInserter(changeRepo); |
| ObjectReader changeReader = changeIns.newReader(); |
| RevWalk changeRw = new RevWalk(changeReader); |
| Repository allUsersRepo = repoManager.openRepository(allUsers); |
| ObjectInserter allUsersIns = allUsersRepo.newObjectInserter(); |
| ObjectReader allUsersReader = allUsersIns.newReader(); |
| RevWalk allUsersRw = new RevWalk(allUsersReader)) { |
| ChainedReceiveCommands changeCmds = new ChainedReceiveCommands(changeRepo); |
| ChainedReceiveCommands allUsersCmds = new ChainedReceiveCommands(allUsersRepo); |
| |
| pm.beginTask( |
| FormatUtil.elide( |
| String.format( |
| "Rebuilding project %s slice %d/%d", |
| project.get(), slice.sliceNumber, ctx.sliceCount), |
| 60), |
| slice.changes.size()); |
| int toSave = 0; |
| try { |
| logger.atInfo().log( |
| "Starting to rebuild changes from project %s slice %d/%d", |
| project.get(), slice.sliceNumber, ctx.sliceCount); |
| long pc = 0; |
| for (Change.Id changeId : slice.changes) { |
| // NoteDbUpdateManager assumes that all commands in its OpenRepo were added by itself, so |
| // we can't share the top-level ChainedReceiveCommands. Use a new set of commands sharing |
| // the same underlying repo, and copy commands back to the top-level |
| // ChainedReceiveCommands later. This also assumes that each ref in the final list of |
| // commands was only modified by a single NoteDbUpdateManager; since we use one manager |
| // per change, and each ref corresponds to exactly one change, this assumption should be |
| // safe. |
| ChainedReceiveCommands tmpChangeCmds = |
| new ChainedReceiveCommands(changeCmds.getRepoRefCache()); |
| ChainedReceiveCommands tmpAllUsersCmds = |
| new ChainedReceiveCommands(allUsersCmds.getRepoRefCache()); |
| |
| try (NoteDbUpdateManager manager = |
| updateManagerFactory |
| .create(project) |
| .setAtomicRefUpdates(false) |
| .setSaveObjects(false) |
| .setChangeRepo(changeRepo, changeRw, changeIns, tmpChangeCmds) |
| .setAllUsersRepo(allUsersRepo, allUsersRw, allUsersIns, tmpAllUsersCmds)) { |
| rebuild(db, changeId, manager); |
| |
| // Executing with dryRun=true writes all objects to the underlying inserters and adds |
| // commands to the ChainedReceiveCommands. Afterwards, we can discard the manager, so we |
| // don't keep using any memory beyond what may be buffered in the PackInserter. |
| manager.execute(true); |
| |
| tmpChangeCmds.getCommands().values().forEach(c -> addCommand(changeCmds, c)); |
| tmpAllUsersCmds.getCommands().values().forEach(c -> addCommand(allUsersCmds, c)); |
| |
| toSave++; |
| } catch (NoPatchSetsException e) { |
| logger.atWarning().log(e.getMessage()); |
| } catch (ConflictingUpdateException ex) { |
| logger.atWarning().log( |
| "Rebuilding detected a conflicting ReviewDb update for change %s;" |
| + " will be auto-rebuilt at runtime", |
| changeId); |
| } catch (Throwable t) { |
| logger.atSevere().withCause(t).log("Failed to rebuild change %s", changeId); |
| ok = false; |
| } |
| if (verbose) { |
| logger.atInfo().log("Rebuilt change %s", changeId.get()); |
| } |
| long c = globalChangeCounter.incrementAndGet(); |
| if (c % 1000 == 0) { |
| logger.atInfo().log( |
| "Total number of rebuilt changes %d/%d (%.01f%%)", |
| c, totalChangeCount, (100.0 * c) / totalChangeCount); |
| } |
| pc = ctx.changesMigratedCount.incrementAndGet(); |
| if (pc % GC_INTERVAL == 0) { |
| gc(project, changeRepo, ctx.gcLock); |
| } |
| pm.update(1); |
| } |
| logger.atInfo().log( |
| "Finished rebuilding changes of project %s, slice %d/%d, changes %d/%d)", |
| project.get(), slice.sliceNumber, ctx.sliceCount, pc, ctx.changeCount); |
| } finally { |
| pm.endTask(); |
| } |
| |
| pm.beginTask( |
| FormatUtil.elide("Saving noteDb refs for " + project.get(), 60), ProgressMonitor.UNKNOWN); |
| try { |
| save(changeRepo, changeRw, changeIns, changeCmds); |
| save(allUsersRepo, allUsersRw, allUsersIns, allUsersCmds); |
| // This isn't really useful progress. If we passed a real ProgressMonitor to |
| // BatchRefUpdate#execute we might get something more incremental, but that doesn't allow us |
| // to specify the repo name in the task text. |
| pm.update(toSave); |
| } catch (LockFailureException e) { |
| logger.atWarning().log( |
| "Rebuilding detected a conflicting NoteDb update for the following refs, which will" |
| + " be auto-rebuilt at runtime: %s", |
| e.getFailedRefs().stream().distinct().sorted().collect(joining(", "))); |
| } catch (IOException e) { |
| logger.atSevere().withCause(e).log("Failed to save NoteDb state for %s", project); |
| } finally { |
| pm.endTask(); |
| } |
| } catch (RepositoryNotFoundException e) { |
| logger.atWarning().log("Repository %s not found", project); |
| } catch (IOException e) { |
| logger.atSevere().withCause(e).log("Failed to rebuild project %s", project); |
| } finally { |
| Thread.currentThread().setName(oldThreadName); |
| } |
| return ok; |
| } |
| |
| private void gc(Project.NameKey project, Repository repo, ReentrantLock gcLock) { |
| if (repo instanceof FileRepository && gcLock.tryLock()) { |
| try { |
| FileRepository r = (FileRepository) repo; |
| GC gc = new GC(r); |
| // known limitation in jgit 5.1: bitmap index creation is slow due to bug 562740, |
| // see https://bugs.eclipse.org/bugs/show_bug.cgi?id=562740 |
| logger.atInfo().log("Running GC on project %s", project); |
| PackConfig pconfig = new PackConfig(repo); |
| pconfig.setBuildBitmaps(false); |
| // let auto gc decide when gc needs to really do something |
| gc.setAuto(true); |
| gc.setPackConfig(pconfig); |
| gc.gc(); |
| logger.atInfo().log("Finished GC on project %s", project); |
| } catch (IOException | ParseException e) { |
| logger.atSevere().withCause(e).log("GC of project %s failed", project); |
| } finally { |
| gcLock.unlock(); |
| logger.atFine().log("Released gc lock for project %s", project); |
| } |
| } |
| } |
| |
| private void rebuild(ReviewDb db, Change.Id changeId, NoteDbUpdateManager manager) |
| throws OrmException, IOException { |
| // Match ChangeRebuilderImpl#stage, but without calling manager.stage(), since that can only be |
| // called after building updates for all changes. |
| Change change = |
| ChangeRebuilderImpl.checkNoteDbState(ChangeNotes.readOneReviewDbChange(db, changeId)); |
| if (change == null) { |
| // Could log here instead, but this matches the behavior of ChangeRebuilderImpl#stage. |
| throw new NoSuchChangeException(changeId); |
| } |
| rebuilder.buildUpdates(manager, bundleReader.fromReviewDb(db, changeId)); |
| |
| rebuilder.execute(db, changeId, manager, true, false); |
| } |
| |
| private static void addCommand(ChainedReceiveCommands cmds, ReceiveCommand cmd) { |
| // ChainedReceiveCommands doesn't allow no-ops, but these occur when rebuilding a |
| // previously-rebuilt change. |
| if (!cmd.getOldId().equals(cmd.getNewId())) { |
| cmds.add(cmd); |
| } |
| } |
| |
| private void save(Repository repo, RevWalk rw, ObjectInserter ins, ChainedReceiveCommands cmds) |
| throws IOException { |
| if (cmds.isEmpty()) { |
| return; |
| } |
| ins.flush(); |
| BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate(); |
| bru.setRefLogMessage("Migrate changes to NoteDb", false); |
| bru.setRefLogIdent(serverIdent.get()); |
| bru.setAtomic(false); |
| bru.setAllowNonFastForwards(true); |
| cmds.addTo(bru); |
| RefUpdateUtil.executeChecked(bru, rw); |
| } |
| |
| private static boolean futuresToBoolean(List<ListenableFuture<Boolean>> futures, String errMsg) { |
| try { |
| return Futures.allAsList(futures).get().stream().allMatch(b -> b); |
| } catch (InterruptedException | ExecutionException e) { |
| logger.atSevere().withCause(e).log(errMsg); |
| return false; |
| } |
| } |
| |
| private class ContextHelper implements AutoCloseable { |
| private final Thread callingThread; |
| private ReviewDb db; |
| private Runnable closeDb; |
| |
| ContextHelper() { |
| callingThread = Thread.currentThread(); |
| } |
| |
| ManualRequestContext open() throws OrmException { |
| return new ManualRequestContext( |
| userFactory.create(), |
| // Reuse the same lazily-opened ReviewDb on the original calling thread, otherwise open |
| // SchemaFactory in the normal way. |
| Thread.currentThread().equals(callingThread) ? this::getReviewDb : schemaFactory, |
| requestContext); |
| } |
| |
| synchronized ReviewDb getReviewDb() throws OrmException { |
| if (db == null) { |
| ReviewDb actual = schemaFactory.open(); |
| closeDb = actual::close; |
| db = |
| new ReviewDbWrapper(unwrapDb(actual)) { |
| @Override |
| public void close() { |
| // Closed by ContextHelper#close. |
| } |
| }; |
| } |
| return db; |
| } |
| |
| @Override |
| public synchronized void close() { |
| if (db != null) { |
| closeDb.run(); |
| db = null; |
| closeDb = null; |
| } |
| } |
| } |
| } |