blob: f05f87953b2fe630ecd07c6de163b378c69fe3b6 [file] [log] [blame]
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.lucene;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexCollection;
import com.google.gerrit.server.index.IndexDefinition;
import com.google.gerrit.server.index.IndexDefinition.IndexFactory;
import com.google.gerrit.server.index.OnlineReindexer;
import com.google.gerrit.server.index.Schema;
import com.google.inject.Inject;
import com.google.inject.ProvisionException;
import com.google.inject.Singleton;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@Singleton
public class LuceneVersionManager implements LifecycleListener {
private static final Logger log = LoggerFactory
.getLogger(LuceneVersionManager.class);
static final String CHANGES_PREFIX = "changes_";
private static class Version<V> {
private final Schema<V> schema;
private final int version;
private final boolean exists;
private final boolean ready;
private Version(Schema<V> schema, int version, boolean exists,
boolean ready) {
checkArgument(schema == null || schema.getVersion() == version);
this.schema = schema;
this.version = version;
this.exists = exists;
this.ready = ready;
}
}
static Path getDir(SitePaths sitePaths, String prefix, Schema<?> schema) {
return sitePaths.index_dir.resolve(String.format("%s%04d",
prefix, schema.getVersion()));
}
private final SitePaths sitePaths;
private final Map<String, IndexDefinition<?, ?, ?>> defs;
private final Map<String, OnlineReindexer<?, ?, ?>> reindexers;
private final boolean onlineUpgrade;
private final String runReindexMsg;
@Inject
LuceneVersionManager(
@GerritServerConfig Config cfg,
SitePaths sitePaths,
Collection<IndexDefinition<?, ?, ?>> defs) {
this.sitePaths = sitePaths;
this.defs = Maps.newHashMapWithExpectedSize(defs.size());
for (IndexDefinition<?, ?, ?> def : defs) {
this.defs.put(def.getName(), def);
}
reindexers = Maps.newHashMapWithExpectedSize(defs.size());
onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true);
runReindexMsg =
"No index versions ready; run java -jar " +
sitePaths.gerrit_war.toAbsolutePath() +
" reindex";
}
@Override
public void start() {
GerritIndexStatus cfg;
try {
cfg = new GerritIndexStatus(sitePaths);
} catch (ConfigInvalidException | IOException e) {
throw fail(e);
}
if (!Files.exists(sitePaths.index_dir)) {
throw new ProvisionException(runReindexMsg);
} else if (!Files.exists(sitePaths.index_dir)) {
log.warn("Not a directory: {}", sitePaths.index_dir.toAbsolutePath());
throw new ProvisionException(runReindexMsg);
}
for (IndexDefinition<?, ?, ?> def : defs.values()) {
initIndex(def, cfg);
}
}
private <K, V, I extends Index<K, V>> void initIndex(
IndexDefinition<K, V, I> def, GerritIndexStatus cfg) {
TreeMap<Integer, Version<V>> versions = scanVersions(def, cfg);
// Search from the most recent ready version.
// Write to the most recent ready version and the most recent version.
Version<V> search = null;
List<Version<V>> write = Lists.newArrayListWithCapacity(2);
for (Version<V> v : versions.descendingMap().values()) {
if (v.schema == null) {
continue;
}
if (write.isEmpty() && onlineUpgrade) {
write.add(v);
}
if (v.ready) {
search = v;
if (!write.contains(v)) {
write.add(v);
}
break;
}
}
if (search == null) {
throw new ProvisionException(runReindexMsg);
}
IndexFactory<K, V, I> factory = def.getIndexFactory();
I searchIndex = factory.create(search.schema);
IndexCollection<K, V, I> indexes = def.getIndexCollection();
indexes.setSearchIndex(searchIndex);
for (Version<V> v : write) {
if (v.schema != null) {
if (v.version != search.version) {
indexes.addWriteIndex(factory.create(v.schema));
} else {
indexes.addWriteIndex(searchIndex);
}
}
}
markNotReady(cfg, def.getName(), versions.values(), write);
int latest = write.get(0).version;
OnlineReindexer<K, V, I> reindexer = new OnlineReindexer<>(def, latest);
synchronized (this) {
if (!reindexers.containsKey(def.getName())) {
reindexers.put(def.getName(), reindexer);
if (onlineUpgrade && latest != search.version) {
reindexer.start();
}
}
}
}
/**
* Start the online reindexer if the current index is not already the latest.
*
* @param force start re-index
* @return true if started, otherwise false.
* @throws ReindexerAlreadyRunningException
*/
public synchronized boolean startReindexer(String name, boolean force)
throws ReindexerAlreadyRunningException {
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
validateReindexerNotRunning(reindexer);
if (force || !isCurrentIndexVersionLatest(name, reindexer)) {
reindexer.start();
return true;
}
return false;
}
/**
* Activate the latest index if the current index is not already the latest.
*
* @return true if index was activate, otherwise false.
* @throws ReindexerAlreadyRunningException
*/
public synchronized boolean activateLatestIndex(String name)
throws ReindexerAlreadyRunningException {
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
validateReindexerNotRunning(reindexer);
if (!isCurrentIndexVersionLatest(name, reindexer)) {
reindexer.activateIndex();
return true;
}
return false;
}
private boolean isCurrentIndexVersionLatest(
String name, OnlineReindexer<?, ?, ?> reindexer) {
int readVersion = defs.get(name).getIndexCollection().getSearchIndex()
.getSchema().getVersion();
return reindexer == null
|| reindexer.getVersion() == readVersion;
}
private static void validateReindexerNotRunning(
OnlineReindexer<?, ?, ?> reindexer)
throws ReindexerAlreadyRunningException {
if (reindexer != null && reindexer.isRunning()) {
throw new ReindexerAlreadyRunningException();
}
}
private <K, V, I extends Index<K, V>> TreeMap<Integer, Version<V>>
scanVersions(IndexDefinition<K, V, I> def, GerritIndexStatus cfg) {
TreeMap<Integer, Version<V>> versions = new TreeMap<>();
for (Schema<V> schema : def.getSchemas().values()) {
// This part is Lucene-specific.
Path p = getDir(sitePaths, def.getName(), schema);
boolean isDir = Files.isDirectory(p);
if (Files.exists(p) && !isDir) {
log.warn("Not a directory: {}", p.toAbsolutePath());
}
int v = schema.getVersion();
versions.put(v, new Version<>(
schema, v, isDir, cfg.getReady(def.getName(), v)));
}
String prefix = def.getName() + "_";
try (DirectoryStream<Path> paths =
Files.newDirectoryStream(sitePaths.index_dir)) {
for (Path p : paths) {
String n = p.getFileName().toString();
if (!n.startsWith(prefix)) {
continue;
}
String versionStr = n.substring(prefix.length());
Integer v = Ints.tryParse(versionStr);
if (v == null || versionStr.length() != 4) {
log.warn("Unrecognized version in index directory: {}",
p.toAbsolutePath());
continue;
}
if (!versions.containsKey(v)) {
versions.put(v, new Version<V>(
null, v, true, cfg.getReady(def.getName(), v)));
}
}
} catch (IOException e) {
log.error("Error scanning index directory: " + sitePaths.index_dir, e);
}
return versions;
}
private <V> void markNotReady(GerritIndexStatus cfg, String name,
Iterable<Version<V>> versions, Collection<Version<V>> inUse) {
boolean dirty = false;
for (Version<V> v : versions) {
if (!inUse.contains(v) && v.exists) {
cfg.setReady(name, v.version, false);
dirty = true;
}
}
if (dirty) {
try {
cfg.save();
} catch (IOException e) {
throw fail(e);
}
}
}
private ProvisionException fail(Throwable t) {
ProvisionException e = new ProvisionException("Error scanning indexes");
e.initCause(t);
throw e;
}
@Override
public void stop() {
// Do nothing; indexes are closed on demand by IndexCollection.
}
}