blob: 3aec4be6dad4bca024c78775a298185ae4427979 [file] [log] [blame]
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.pgm;
import static com.google.gerrit.lucene.IndexVersionCheck.SCHEMA_VERSIONS;
import static com.google.gerrit.lucene.IndexVersionCheck.gerritIndexConfig;
import static com.google.gerrit.lucene.LuceneChangeIndex.LUCENE_VERSION;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleManager;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.lucene.LuceneIndexModule;
import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.patch.PatchListCacheImpl;
import com.google.gerrit.server.patch.PatchListLoader;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.eclipse.jgit.diff.DiffEntry;
import org.eclipse.jgit.diff.DiffFormatter;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Reindex extends SiteProgram {
private static final Logger log = LoggerFactory.getLogger(Reindex.class);
@Option(name = "--threads", usage = "Number of threads to use for indexing")
private int threads = Runtime.getRuntime().availableProcessors();
@Option(name = "--dry-run", usage = "Dry run: don't write anything to index")
private boolean dryRun;
@Option(name = "--verbose", usage = "Output debug information for each change")
private boolean verbose;
private Injector dbInjector;
private Injector sysInjector;
private SitePaths sitePaths;
@Override
public int run() throws Exception {
mustHaveValidSite();
dbInjector = createDbInjector(MULTI_USER);
if (!IndexModule.isEnabled(dbInjector)) {
throw die("Secondary index not enabled");
}
LifecycleManager dbManager = new LifecycleManager();
dbManager.add(dbInjector);
dbManager.start();
sitePaths = dbInjector.getInstance(SitePaths.class);
// Delete before any index may be created depending on this data.
deleteAll();
sysInjector = createSysInjector();
LifecycleManager sysManager = new LifecycleManager();
sysManager.add(sysInjector);
sysManager.start();
int result = indexAll();
writeVersion();
sysManager.stop();
dbManager.stop();
return result;
}
private Injector createSysInjector() {
List<Module> modules = Lists.newArrayList();
modules.add(PatchListCacheImpl.module());
modules.add(new LuceneIndexModule(false, threads, dryRun));
modules.add(new ReviewDbModule());
modules.add(new AbstractModule() {
@SuppressWarnings("rawtypes")
@Override
protected void configure() {
// Plugins are not loaded and we're just running through each change
// once, so don't worry about cache removal.
bind(new TypeLiteral<DynamicSet<CacheRemovalListener>>() {})
.toInstance(DynamicSet.<CacheRemovalListener> emptySet());
install(new DefaultCacheFactory.Module());
}
});
return dbInjector.createChildInjector(modules);
}
private class ReviewDbModule extends LifecycleModule {
@Override
protected void configure() {
final SchemaFactory<ReviewDb> schema = dbInjector.getInstance(
Key.get(new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
final List<ReviewDb> dbs = Collections.synchronizedList(
Lists.<ReviewDb> newArrayListWithCapacity(threads + 1));
final ThreadLocal<ReviewDb> localDb = new ThreadLocal<ReviewDb>();
bind(ReviewDb.class).toProvider(new Provider<ReviewDb>() {
@Override
public ReviewDb get() {
ReviewDb db = localDb.get();
if (db == null) {
try {
db = schema.open();
dbs.add(db);
localDb.set(db);
} catch (OrmException e) {
throw new ProvisionException("unable to open ReviewDb", e);
}
}
return db;
}
});
listener().toInstance(new LifecycleListener() {
@Override
public void start() {
// Do nothing.
}
@Override
public void stop() {
for (ReviewDb db : dbs) {
db.close();
}
}
});
}
}
private void deleteAll() throws IOException {
if (dryRun) {
return;
}
for (String index : SCHEMA_VERSIONS.keySet()) {
File file = new File(sitePaths.index_dir, index);
if (file.exists()) {
Directory dir = FSDirectory.open(file);
try {
for (String name : dir.listAll()) {
dir.deleteFile(name);
}
} finally {
dir.close();
}
}
}
}
private int indexAll() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
ListeningScheduledExecutorService executor = sysInjector.getInstance(
Key.get(ListeningScheduledExecutorService.class, IndexExecutor.class));
ProgressMonitor pm = new TextProgressMonitor();
pm.start(1);
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
Set<Project.NameKey> projects = Sets.newTreeSet();
int changeCount = 0;
try {
for (Change change : db.changes().all()) {
changeCount++;
if (projects.add(change.getProject())) {
pm.update(1);
}
}
} finally {
db.close();
}
pm.endTask();
final MultiProgressMonitor mpm =
new MultiProgressMonitor(System.err, "Reindexing changes");
final Task projTask = mpm.beginSubTask("projects", projects.size());
final Task doneTask = mpm.beginSubTask(null, changeCount);
final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
Stopwatch sw = new Stopwatch().start();
final List<ListenableFuture<?>> futures =
Lists.newArrayListWithCapacity(projects.size());
final AtomicBoolean ok = new AtomicBoolean(true);
for (final Project.NameKey project : projects) {
final ListenableFuture<?> future = executor.submit(
new ReindexProject(project, doneTask, failedTask));
futures.add(future);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (InterruptedException e) {
fail(project, e);
} catch (ExecutionException e) {
ok.set(false); // Logged by indexer.
} catch (RuntimeException e) {
failAndThrow(project, e);
} catch (Error e) {
failAndThrow(project, e);
} finally {
projTask.update(1);
}
}
private void fail(Project.NameKey project, Throwable t) {
log.error("Failed to index project " + project, t);
ok.set(false);
}
private void failAndThrow(Project.NameKey project, RuntimeException e) {
fail(project, e);
throw e;
}
private void failAndThrow(Project.NameKey project, Error e) {
fail(project, e);
throw e;
}
}, MoreExecutors.sameThreadExecutor());
}
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public ListenableFuture<Void> apply(List<?> input) throws Exception {
mpm.end();
return Futures.immediateFuture(null);
}
}));
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
int n = doneTask.getCount() + failedTask.getCount();
System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n",
n, elapsed, n/elapsed);
return ok.get() ? 0 : 1;
}
private class ReindexProject implements Callable<Void> {
private final ChangeIndexer indexer;
private final Project.NameKey project;
private final ListMultimap<ObjectId, ChangeData> byId;
private final Task done;
private final Task failed;
private Repository repo;
private RevWalk walk;
private ReindexProject(Project.NameKey project, Task done, Task failed) {
this.indexer = sysInjector.getInstance(ChangeIndexer.class);
this.project = project;
this.byId = ArrayListMultimap.create();
this.done = done;
this.failed = failed;
}
@Override
public Void call() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
GitRepositoryManager mgr = sysInjector.getInstance(GitRepositoryManager.class);
repo = mgr.openRepository(project);
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
Ref r = refs.get(c.currentPatchSetId().toRefName());
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
}
}
walk();
} finally {
repo.close();
RepositoryCache.close(repo); // Only used once per Reindex call.
}
return null;
}
private void walk() throws Exception {
walk = new RevWalk(repo);
try {
// Walk only refs first to cover as many changes as we can without having
// to mark every single change.
for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) {
RevObject o = walk.parseAny(ref.getObjectId());
if (o instanceof RevCommit) {
walk.markStart((RevCommit) o);
}
}
RevCommit bCommit;
while ((bCommit = walk.next()) != null && !byId.isEmpty()) {
if (byId.containsKey(bCommit)) {
getPathsAndIndex(bCommit);
byId.removeAll(bCommit);
}
}
for (ObjectId id : byId.keySet()) {
getPathsAndIndex(walk.parseCommit(id));
}
} finally {
walk.release();
}
}
private void getPathsAndIndex(RevCommit bCommit) throws Exception {
RevTree bTree = bCommit.getTree();
try {
RevTree aTree = aFor(bCommit, walk);
if (aTree == null) {
return;
}
DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE);
try {
df.setRepository(repo);
List<ChangeData> cds = byId.get(bCommit);
if (!cds.isEmpty()) {
List<String> paths = getPaths(df.scan(aTree, bTree));
for (ChangeData cd : cds) {
cd.setCurrentFilePaths(paths);
indexer.indexTask(cd).call();
done.update(1);
if (verbose) {
System.out.println("Reindexed change " + cd.getId());
}
}
}
} finally {
df.release();
}
} catch (Exception e) {
fail("Failed to index commit " + bCommit.name(), e);
}
}
private List<String> getPaths(List<DiffEntry> filenames) {
Set<String> paths = Sets.newTreeSet();
for (DiffEntry e : filenames) {
if (e.getOldPath() != null) {
paths.add(e.getOldPath());
}
if (e.getNewPath() != null) {
paths.add(e.getNewPath());
}
}
return ImmutableList.copyOf(paths);
}
private RevTree aFor(RevCommit b, RevWalk walk) throws IOException {
switch (b.getParentCount()) {
case 0:
return walk.parseTree(emptyTree());
case 1:
RevCommit a = b.getParent(0);
walk.parseBody(a);
return walk.parseTree(a.getTree());
case 2:
return PatchListLoader.automerge(repo, walk, b);
default:
return null;
}
}
private ObjectId emptyTree() throws IOException {
ObjectInserter oi = repo.newObjectInserter();
try {
ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {});
oi.flush();
return id;
} finally {
oi.release();
}
}
private void fail(String error, Exception e) {
log.warn(error, e);
if (verbose) {
System.out.println(error);
}
}
}
private void writeVersion() throws IOException,
ConfigInvalidException {
if (dryRun) {
return;
}
FileBasedConfig cfg =
new FileBasedConfig(gerritIndexConfig(sitePaths), FS.detect());
cfg.load();
for (Map.Entry<String, Integer> e : SCHEMA_VERSIONS.entrySet()) {
cfg.setInt("index", e.getKey(), "schemaVersion", e.getValue());
}
cfg.setEnum("lucene", null, "version", LUCENE_VERSION);
cfg.save();
}
}