Replace SearcherManager with NRTManager
The NRTManager allows Gerrit to wait for a specific document mutation
to be visible to searchers before trying to run a new search. The
NRTManager comes with its own background thread to manage reopens,
replacing the thread that reopened the index every 100 ms.
The change index API now returns a ListenableFuture the caller can
wait on to learn when new queries will return the updates.
Change-Id: I1b3c5ba036241ffd54c88a16ee8b2ffb3d3bf5f2
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
index b9e03b3..4aced04 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
@@ -20,10 +20,12 @@
import static org.apache.lucene.search.BooleanClause.Occur.MUST_NOT;
import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.reviewdb.client.Change;
@@ -118,7 +120,6 @@
return writerConfig;
}
- private final RefreshThread refreshThread;
private final FillArgs fillArgs;
private final ExecutorService executor;
private final boolean readOnly;
@@ -128,7 +129,6 @@
LuceneChangeIndex(Config cfg, SitePaths sitePaths,
ListeningScheduledExecutorService executor, FillArgs fillArgs,
boolean readOnly) throws IOException {
- this.refreshThread = new RefreshThread();
this.fillArgs = fillArgs;
this.executor = executor;
this.readOnly = readOnly;
@@ -140,12 +140,10 @@
@Override
public void start() {
- refreshThread.start();
}
@Override
public void stop() {
- refreshThread.halt();
List<Future<?>> closeFutures = Lists.newArrayListWithCapacity(2);
closeFutures.add(executor.submit(new Runnable() {
@Override
@@ -164,49 +162,66 @@
}
}
+ @SuppressWarnings("unchecked")
@Override
- public void insert(ChangeData cd) throws IOException {
+ public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
Term id = idTerm(cd);
Document doc = toDocument(cd);
if (readOnly) {
- return;
+ return Futures.immediateFuture(null);
}
+
if (cd.getChange().getStatus().isOpen()) {
- closedIndex.delete(id);
- openIndex.insert(doc);
+ return allOf(
+ closedIndex.delete(id),
+ openIndex.insert(doc));
} else {
- openIndex.delete(id);
- closedIndex.insert(doc);
+ return allOf(
+ openIndex.delete(id),
+ closedIndex.insert(doc));
}
}
+ @SuppressWarnings("unchecked")
@Override
- public void replace(ChangeData cd) throws IOException {
+ public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
Term id = idTerm(cd);
Document doc = toDocument(cd);
if (readOnly) {
- return;
+ return Futures.immediateFuture(null);
}
if (cd.getChange().getStatus().isOpen()) {
- closedIndex.delete(id);
- openIndex.replace(id, doc);
+ return allOf(
+ closedIndex.delete(id),
+ openIndex.replace(id, doc));
} else {
- openIndex.delete(id);
- closedIndex.replace(id, doc);
+ return allOf(
+ openIndex.delete(id),
+ closedIndex.replace(id, doc));
}
}
+ @SuppressWarnings("unchecked")
@Override
- public void delete(ChangeData cd) throws IOException {
+ public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
Term id = idTerm(cd);
if (readOnly) {
- return;
+ return Futures.immediateFuture(null);
}
- if (cd.getChange().getStatus().isOpen()) {
- openIndex.delete(id);
- } else {
- closedIndex.delete(id);
- }
+ return allOf(
+ openIndex.delete(id),
+ closedIndex.delete(id));
+ }
+
+ private static <V> ListenableFuture<Void> allOf(ListenableFuture<V>... f) {
+ return Futures.transform(
+ Futures.allAsList(f),
+ new Function<List<V>, Void>() {
+ @Override
+ public Void apply(List<V> input) {
+ return null;
+ }
+ });
}
@Override
@@ -485,35 +500,4 @@
private static IllegalArgumentException badFieldType(FieldType<?> t) {
return new IllegalArgumentException("unknown index field type " + t);
}
-
- private class RefreshThread extends Thread {
- private boolean stop;
-
- @Override
- public void run() {
- while (!stop) {
- openIndex.maybeRefresh();
- closedIndex.maybeRefresh();
- synchronized (this) {
- try {
- wait(100);
- } catch (InterruptedException e) {
- log.warn("error refreshing index searchers", e);
- }
- }
- }
- }
-
- void halt() {
- synchronized (this) {
- stop = true;
- notify();
- }
- try {
- join();
- } catch (InterruptedException e) {
- log.warn("error stopping refresh thread", e);
- }
- }
- }
}
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
index d909f7a..2ae8d3a 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
@@ -14,12 +14,20 @@
package com.google.gerrit.lucene;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gwt.thirdparty.guava.common.collect.Maps;
+
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.NRTManager;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.search.NRTManagerReopenThread;
+import org.apache.lucene.search.ReferenceManager.RefreshListener;
+import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
@@ -27,29 +35,63 @@
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
/** Piece of the change index that is implemented as a separate Lucene index. */
class SubIndex {
private static final Logger log = LoggerFactory.getLogger(SubIndex.class);
private final Directory dir;
- private final IndexWriter writer;
- private final SearcherManager searcherManager;
+ private final TrackingIndexWriter writer;
+ private final NRTManager nrtManager;
+ private final NRTManagerReopenThread reopenThread;
+ private final ConcurrentMap<RefreshListener, Boolean> refreshListeners;
SubIndex(File file, IndexWriterConfig writerConfig) throws IOException {
dir = FSDirectory.open(file);
- writer = new IndexWriter(dir, writerConfig);
- searcherManager = new SearcherManager(writer, true, null);
+ writer = new NRTManager.TrackingIndexWriter(new IndexWriter(dir, writerConfig));
+ nrtManager = new NRTManager(writer, new SearcherFactory());
+
+ refreshListeners = Maps.newConcurrentMap();
+ nrtManager.addListener(new RefreshListener() {
+ @Override
+ public void beforeRefresh() throws IOException {
+ }
+
+ @Override
+ public void afterRefresh(boolean didRefresh) throws IOException {
+ for (RefreshListener l : refreshListeners.keySet()) {
+ l.afterRefresh(didRefresh);
+ }
+ }
+ });
+
+ reopenThread = new NRTManagerReopenThread(
+ nrtManager,
+ 0.500 /* maximum stale age (seconds) */,
+ 0.010 /* minimum stale age (seconds) */);
+ reopenThread.setName("NRT " + file.getName());
+ reopenThread.setPriority(Math.min(
+ Thread.currentThread().getPriority() + 2,
+ Thread.MAX_PRIORITY));
+ reopenThread.setDaemon(true);
+ reopenThread.start();
}
void close() {
+ reopenThread.close();
try {
- searcherManager.close();
+ nrtManager.close();
} catch (IOException e) {
log.warn("error closing Lucene searcher", e);
}
try {
- writer.close();
+ writer.getIndexWriter().close();
} catch (IOException e) {
log.warn("error closing Lucene writer", e);
}
@@ -60,31 +102,91 @@
}
}
- void insert(Document doc) throws IOException {
- writer.addDocument(doc);
+ ListenableFuture<Void> insert(Document doc) throws IOException {
+ return new NrtFuture(writer.addDocument(doc));
}
- void replace(Term term, Document doc) throws IOException {
- writer.updateDocument(term, doc);
+ ListenableFuture<Void> replace(Term term, Document doc) throws IOException {
+ return new NrtFuture(writer.updateDocument(term, doc));
}
- void delete(Term term) throws IOException {
- writer.deleteDocuments(term);
+ ListenableFuture<Void> delete(Term term) throws IOException {
+ return new NrtFuture(writer.deleteDocuments(term));
}
IndexSearcher acquire() throws IOException {
- return searcherManager.acquire();
+ return nrtManager.acquire();
}
void release(IndexSearcher searcher) throws IOException {
- searcherManager.release(searcher);
+ nrtManager.release(searcher);
}
- void maybeRefresh() {
- try {
- searcherManager.maybeRefresh();
- } catch (IOException e) {
- log.warn("error refreshing indexer", e);
+ private final class NrtFuture extends AbstractFuture<Void>
+ implements RefreshListener {
+ private final long gen;
+ private final AtomicBoolean hasListeners = new AtomicBoolean();
+
+ NrtFuture(long gen) {
+ this.gen = gen;
+ }
+
+ @Override
+ public Void get() throws InterruptedException, ExecutionException {
+ if (!isDone()) {
+ nrtManager.waitForGeneration(gen);
+ set(null);
+ }
+ return super.get();
+ }
+
+ @Override
+ public Void get(long timeout, TimeUnit unit) throws InterruptedException,
+ TimeoutException, ExecutionException {
+ if (!isDone()) {
+ nrtManager.waitForGeneration(gen, timeout, unit);
+ set(null);
+ }
+ return super.get(timeout, unit);
+ }
+
+ @Override
+ public boolean isDone() {
+ if (super.isDone()) {
+ return true;
+ } else if (gen <= nrtManager.getCurrentSearchingGen()) {
+ set(null);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ if (hasListeners.compareAndSet(false, true) && !isDone()) {
+ nrtManager.addListener(this);
+ }
+ super.addListener(listener, executor);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (hasListeners.get()) {
+ refreshListeners.put(this, true);
+ }
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public void beforeRefresh() throws IOException {
+ }
+
+ @Override
+ public void afterRefresh(boolean didRefresh) throws IOException {
+ if (gen <= nrtManager.getCurrentSearchingGen()) {
+ refreshListeners.remove(this);
+ set(null);
+ }
}
}
}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java
index 1b1eead..cae5d7d 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java
@@ -14,6 +14,8 @@
package com.google.gerrit.server.index;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.query.Predicate;
import com.google.gerrit.server.query.QueryParseException;
import com.google.gerrit.server.query.change.ChangeData;
@@ -35,18 +37,18 @@
/** Instance indicating secondary index is disabled. */
public static final ChangeIndex DISABLED = new ChangeIndex() {
@Override
- public void insert(ChangeData cd) throws IOException {
- // Do nothing.
+ public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
+ return Futures.immediateFuture(null);
}
@Override
- public void replace(ChangeData cd) throws IOException {
- // Do nothing.
+ public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
+ return Futures.immediateFuture(null);
}
@Override
- public void delete(ChangeData cd) throws IOException {
- // Do nothing.
+ public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
+ return Futures.immediateFuture(null);
}
@Override
@@ -67,7 +69,7 @@
*
* @throws IOException if the change could not be inserted.
*/
- public void insert(ChangeData cd) throws IOException;
+ public ListenableFuture<Void> insert(ChangeData cd) throws IOException;
/**
* Update a change document in the index.
@@ -81,7 +83,7 @@
*
* @throws IOException
*/
- public void replace(ChangeData cd) throws IOException;
+ public ListenableFuture<Void> replace(ChangeData cd) throws IOException;
/**
* Delete a change document from the index.
@@ -90,7 +92,7 @@
*
* @throws IOException
*/
- public void delete(ChangeData cd) throws IOException;
+ public ListenableFuture<Void> delete(ChangeData cd) throws IOException;
/**
* Convert the given operator predicate into a source searching the index and
diff --git a/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java b/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java
index 295c1cb..5e60533 100644
--- a/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java
+++ b/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java
@@ -21,6 +21,7 @@
import static com.google.gerrit.reviewdb.client.Change.Status.SUBMITTED;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.index.ChangeIndex;
import com.google.gerrit.server.index.PredicateWrapper;
@@ -34,7 +35,6 @@
import junit.framework.TestCase;
-import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
@@ -42,17 +42,17 @@
public class IndexRewriteTest extends TestCase {
private static class DummyIndex implements ChangeIndex {
@Override
- public void insert(ChangeData cd) throws IOException {
+ public ListenableFuture<Void> insert(ChangeData cd) {
throw new UnsupportedOperationException();
}
@Override
- public void replace(ChangeData cd) throws IOException {
+ public ListenableFuture<Void> replace(ChangeData cd) {
throw new UnsupportedOperationException();
}
@Override
- public void delete(ChangeData cd) throws IOException {
+ public ListenableFuture<Void> delete(ChangeData cd) {
throw new UnsupportedOperationException();
}