blob: eef394d66dd9207aa83c49b6ad5dc2707397daf2 [file] [log] [blame]
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.index;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.Lists;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.index.Index;
import com.google.gerrit.index.IndexCollection;
import com.google.gerrit.index.IndexDefinition;
import com.google.gerrit.index.SiteIndexer;
import com.google.gerrit.server.plugincontext.PluginSetContext;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Background thread for running an index schema upgrade by reindexing all documents in an index
* using the new version. Intended to be run while Gerrit is serving traffic to prepare for a
* near-zero downtime upgrade.
*/
public class OnlineReindexer<K, V, I extends Index<K, V>> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final String name;
private final IndexCollection<K, V, I> indexes;
private final SiteIndexer<K, V, I> batchIndexer;
private final int oldVersion;
private final int newVersion;
private final PluginSetContext<OnlineUpgradeListener> listeners;
private I index;
private final AtomicBoolean running = new AtomicBoolean();
public OnlineReindexer(
IndexDefinition<K, V, I> def,
int oldVersion,
int newVersion,
PluginSetContext<OnlineUpgradeListener> listeners) {
this.name = def.getName();
this.indexes = def.getIndexCollection();
this.batchIndexer = def.getSiteIndexer();
this.oldVersion = oldVersion;
this.newVersion = newVersion;
this.listeners = listeners;
}
/** Starts the background process. */
public void start() {
if (running.compareAndSet(false, true)) {
Thread t =
new Thread(
() -> {
boolean ok = false;
try {
reindex();
ok = true;
} catch (RuntimeException e) {
logger.atSevere().withCause(e).log(
"Online reindex of %s schema version %s failed", name, version(index));
} finally {
running.set(false);
if (!ok) {
listeners.runEach(listener -> listener.onFailure(name, oldVersion, newVersion));
}
}
});
t.setName(
String.format("Reindex %s v%d-v%d", name, version(indexes.getSearchIndex()), newVersion));
t.start();
}
}
/** Returns {@code true} if the background indexer is currently running. */
public boolean isRunning() {
return running.get();
}
/** Returns the index version that this indexer is creating documents for. */
public int getVersion() {
return newVersion;
}
private static int version(Index<?, ?> i) {
return i.getSchema().getVersion();
}
private void reindex() {
listeners.runEach(listener -> listener.onStart(name, oldVersion, newVersion));
index =
requireNonNull(
indexes.getWriteIndex(newVersion),
() -> String.format("not an active write schema version: %s %s", name, newVersion));
logger.atInfo().log(
"Starting online reindex of %s from schema version %s to %s",
name, version(indexes.getSearchIndex()), version(index));
if (oldVersion != newVersion) {
index.deleteAll();
}
SiteIndexer.Result result = batchIndexer.indexAll(index);
if (!result.success()) {
logger.atSevere().log(
"Online reindex of %s schema version %s failed. Successfully"
+ " indexed %s, failed to index %s",
name, version(index), result.doneCount(), result.failedCount());
return;
}
logger.atInfo().log("Reindex %s to version %s complete", name, version(index));
activateIndex();
listeners.runEach(listener -> listener.onSuccess(name, oldVersion, newVersion));
}
/**
* Switches the search index from the old version to the new version. This method should be called
* when the new version is fully ready.
*/
public void activateIndex() {
indexes.setSearchIndex(index);
logger.atInfo().log("Using %s schema version %s", name, version(index));
index.markReady(true);
List<I> toRemove = Lists.newArrayListWithExpectedSize(1);
for (I i : indexes.getWriteIndexes()) {
if (version(i) != version(index)) {
toRemove.add(i);
}
}
for (I i : toRemove) {
i.markReady(false);
indexes.removeWriteIndex(version(i));
}
}
}