Replace SearcherManager with NRTManager

The NRTManager allows Gerrit to wait for a specific document mutation
to be visible to searchers before trying to run a new search.  The
NRTManager comes with its own background thread to manage reopens,
replacing the thread that reopened the index every 100 ms.

The change index API now returns a ListenableFuture the caller can
wait on to learn when new queries will return the updates.

Change-Id: I1b3c5ba036241ffd54c88a16ee8b2ffb3d3bf5f2
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
index b9e03b3..4aced04 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java
@@ -20,10 +20,12 @@
 import static org.apache.lucene.search.BooleanClause.Occur.MUST_NOT;
 import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.reviewdb.client.Change;
@@ -118,7 +120,6 @@
     return writerConfig;
   }
 
-  private final RefreshThread refreshThread;
   private final FillArgs fillArgs;
   private final ExecutorService executor;
   private final boolean readOnly;
@@ -128,7 +129,6 @@
   LuceneChangeIndex(Config cfg, SitePaths sitePaths,
       ListeningScheduledExecutorService executor, FillArgs fillArgs,
       boolean readOnly) throws IOException {
-    this.refreshThread = new RefreshThread();
     this.fillArgs = fillArgs;
     this.executor = executor;
     this.readOnly = readOnly;
@@ -140,12 +140,10 @@
 
   @Override
   public void start() {
-    refreshThread.start();
   }
 
   @Override
   public void stop() {
-    refreshThread.halt();
     List<Future<?>> closeFutures = Lists.newArrayListWithCapacity(2);
     closeFutures.add(executor.submit(new Runnable() {
       @Override
@@ -164,49 +162,66 @@
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void insert(ChangeData cd) throws IOException {
+  public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
     Term id = idTerm(cd);
     Document doc = toDocument(cd);
     if (readOnly) {
-      return;
+      return Futures.immediateFuture(null);
     }
+
     if (cd.getChange().getStatus().isOpen()) {
-      closedIndex.delete(id);
-      openIndex.insert(doc);
+      return allOf(
+          closedIndex.delete(id),
+          openIndex.insert(doc));
     } else {
-      openIndex.delete(id);
-      closedIndex.insert(doc);
+      return allOf(
+          openIndex.delete(id),
+          closedIndex.insert(doc));
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void replace(ChangeData cd) throws IOException {
+  public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
     Term id = idTerm(cd);
     Document doc = toDocument(cd);
     if (readOnly) {
-      return;
+      return Futures.immediateFuture(null);
     }
     if (cd.getChange().getStatus().isOpen()) {
-      closedIndex.delete(id);
-      openIndex.replace(id, doc);
+      return allOf(
+          closedIndex.delete(id),
+          openIndex.replace(id, doc));
     } else {
-      openIndex.delete(id);
-      closedIndex.replace(id, doc);
+      return allOf(
+          openIndex.delete(id),
+          closedIndex.replace(id, doc));
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void delete(ChangeData cd) throws IOException {
+  public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
     Term id = idTerm(cd);
     if (readOnly) {
-      return;
+      return Futures.immediateFuture(null);
     }
-    if (cd.getChange().getStatus().isOpen()) {
-      openIndex.delete(id);
-    } else {
-      closedIndex.delete(id);
-    }
+    return allOf(
+        openIndex.delete(id),
+        closedIndex.delete(id));
+  }
+
+  private static <V> ListenableFuture<Void> allOf(ListenableFuture<V>... f) {
+    return Futures.transform(
+        Futures.allAsList(f),
+        new Function<List<V>, Void>() {
+          @Override
+          public Void apply(List<V> input) {
+            return null;
+          }
+        });
   }
 
   @Override
@@ -485,35 +500,4 @@
   private static IllegalArgumentException badFieldType(FieldType<?> t) {
     return new IllegalArgumentException("unknown index field type " + t);
   }
-
-  private class RefreshThread extends Thread {
-    private boolean stop;
-
-    @Override
-    public void run() {
-      while (!stop) {
-        openIndex.maybeRefresh();
-        closedIndex.maybeRefresh();
-        synchronized (this) {
-          try {
-            wait(100);
-          } catch (InterruptedException e) {
-            log.warn("error refreshing index searchers", e);
-          }
-        }
-      }
-    }
-
-    void halt() {
-      synchronized (this) {
-        stop = true;
-        notify();
-      }
-      try {
-        join();
-      } catch (InterruptedException e) {
-        log.warn("error stopping refresh thread", e);
-      }
-    }
-  }
 }
diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
index d909f7a..2ae8d3a 100644
--- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
+++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java
@@ -14,12 +14,20 @@
 
 package com.google.gerrit.lucene;
 
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gwt.thirdparty.guava.common.collect.Maps;
+
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.NRTManager;
+import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
+import org.apache.lucene.search.NRTManagerReopenThread;
+import org.apache.lucene.search.ReferenceManager.RefreshListener;
+import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.slf4j.Logger;
@@ -27,29 +35,63 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** Piece of the change index that is implemented as a separate Lucene index. */
 class SubIndex {
   private static final Logger log = LoggerFactory.getLogger(SubIndex.class);
 
   private final Directory dir;
-  private final IndexWriter writer;
-  private final SearcherManager searcherManager;
+  private final TrackingIndexWriter writer;
+  private final NRTManager nrtManager;
+  private final NRTManagerReopenThread reopenThread;
+  private final ConcurrentMap<RefreshListener, Boolean> refreshListeners;
 
   SubIndex(File file, IndexWriterConfig writerConfig) throws IOException {
     dir = FSDirectory.open(file);
-    writer = new IndexWriter(dir, writerConfig);
-    searcherManager = new SearcherManager(writer, true, null);
+    writer = new NRTManager.TrackingIndexWriter(new IndexWriter(dir, writerConfig));
+    nrtManager = new NRTManager(writer, new SearcherFactory());
+
+    refreshListeners = Maps.newConcurrentMap();
+    nrtManager.addListener(new RefreshListener() {
+      @Override
+      public void beforeRefresh() throws IOException {
+      }
+
+      @Override
+      public void afterRefresh(boolean didRefresh) throws IOException {
+        for (RefreshListener l : refreshListeners.keySet()) {
+          l.afterRefresh(didRefresh);
+        }
+      }
+    });
+
+    reopenThread = new NRTManagerReopenThread(
+        nrtManager,
+        0.500 /* maximum stale age (seconds) */,
+        0.010 /* minimum stale age (seconds) */);
+    reopenThread.setName("NRT " + file.getName());
+    reopenThread.setPriority(Math.min(
+        Thread.currentThread().getPriority() + 2,
+        Thread.MAX_PRIORITY));
+    reopenThread.setDaemon(true);
+    reopenThread.start();
   }
 
   void close() {
+    reopenThread.close();
     try {
-      searcherManager.close();
+      nrtManager.close();
     } catch (IOException e) {
       log.warn("error closing Lucene searcher", e);
     }
     try {
-      writer.close();
+      writer.getIndexWriter().close();
     } catch (IOException e) {
       log.warn("error closing Lucene writer", e);
     }
@@ -60,31 +102,91 @@
     }
   }
 
-  void insert(Document doc) throws IOException {
-    writer.addDocument(doc);
+  ListenableFuture<Void> insert(Document doc) throws IOException {
+    return new NrtFuture(writer.addDocument(doc));
   }
 
-  void replace(Term term, Document doc) throws IOException {
-    writer.updateDocument(term, doc);
+  ListenableFuture<Void> replace(Term term, Document doc) throws IOException {
+    return new NrtFuture(writer.updateDocument(term, doc));
   }
 
-  void delete(Term term) throws IOException {
-    writer.deleteDocuments(term);
+  ListenableFuture<Void> delete(Term term) throws IOException {
+    return new NrtFuture(writer.deleteDocuments(term));
   }
 
   IndexSearcher acquire() throws IOException {
-    return searcherManager.acquire();
+    return nrtManager.acquire();
   }
 
   void release(IndexSearcher searcher) throws IOException {
-    searcherManager.release(searcher);
+    nrtManager.release(searcher);
   }
 
-  void maybeRefresh() {
-    try {
-      searcherManager.maybeRefresh();
-    } catch (IOException e) {
-      log.warn("error refreshing indexer", e);
+  private final class NrtFuture extends AbstractFuture<Void>
+      implements RefreshListener {
+    private final long gen;
+    private final AtomicBoolean hasListeners = new AtomicBoolean();
+
+    NrtFuture(long gen) {
+      this.gen = gen;
+    }
+
+    @Override
+    public Void get() throws InterruptedException, ExecutionException {
+      if (!isDone()) {
+        nrtManager.waitForGeneration(gen);
+        set(null);
+      }
+      return super.get();
+    }
+
+    @Override
+    public Void get(long timeout, TimeUnit unit) throws InterruptedException,
+        TimeoutException, ExecutionException {
+      if (!isDone()) {
+        nrtManager.waitForGeneration(gen, timeout, unit);
+        set(null);
+      }
+      return super.get(timeout, unit);
+    }
+
+    @Override
+    public boolean isDone() {
+      if (super.isDone()) {
+        return true;
+      } else if (gen <= nrtManager.getCurrentSearchingGen()) {
+        set(null);
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void addListener(Runnable listener, Executor executor) {
+      if (hasListeners.compareAndSet(false, true) && !isDone()) {
+        nrtManager.addListener(this);
+      }
+      super.addListener(listener, executor);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      if (hasListeners.get()) {
+        refreshListeners.put(this, true);
+      }
+      return super.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public void beforeRefresh() throws IOException {
+    }
+
+    @Override
+    public void afterRefresh(boolean didRefresh) throws IOException {
+      if (gen <= nrtManager.getCurrentSearchingGen()) {
+        refreshListeners.remove(this);
+        set(null);
+      }
     }
   }
 }
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java
index 1b1eead..cae5d7d 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java
@@ -14,6 +14,8 @@
 
 package com.google.gerrit.server.index;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.query.Predicate;
 import com.google.gerrit.server.query.QueryParseException;
 import com.google.gerrit.server.query.change.ChangeData;
@@ -35,18 +37,18 @@
   /** Instance indicating secondary index is disabled. */
   public static final ChangeIndex DISABLED = new ChangeIndex() {
     @Override
-    public void insert(ChangeData cd) throws IOException {
-      // Do nothing.
+    public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
+      return Futures.immediateFuture(null);
     }
 
     @Override
-    public void replace(ChangeData cd) throws IOException {
-      // Do nothing.
+    public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
+      return Futures.immediateFuture(null);
     }
 
     @Override
-    public void delete(ChangeData cd) throws IOException {
-      // Do nothing.
+    public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
+      return Futures.immediateFuture(null);
     }
 
     @Override
@@ -67,7 +69,7 @@
    *
    * @throws IOException if the change could not be inserted.
    */
-  public void insert(ChangeData cd) throws IOException;
+  public ListenableFuture<Void> insert(ChangeData cd) throws IOException;
 
   /**
    * Update a change document in the index.
@@ -81,7 +83,7 @@
    *
    * @throws IOException
    */
-  public void replace(ChangeData cd) throws IOException;
+  public ListenableFuture<Void> replace(ChangeData cd) throws IOException;
 
   /**
    * Delete a change document from the index.
@@ -90,7 +92,7 @@
    *
    * @throws IOException
    */
-  public void delete(ChangeData cd) throws IOException;
+  public ListenableFuture<Void> delete(ChangeData cd) throws IOException;
 
   /**
    * Convert the given operator predicate into a source searching the index and
diff --git a/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java b/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java
index 295c1cb..5e60533 100644
--- a/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java
+++ b/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java
@@ -21,6 +21,7 @@
 import static com.google.gerrit.reviewdb.client.Change.Status.SUBMITTED;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.server.index.ChangeIndex;
 import com.google.gerrit.server.index.PredicateWrapper;
@@ -34,7 +35,6 @@
 
 import junit.framework.TestCase;
 
-import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Set;
 
@@ -42,17 +42,17 @@
 public class IndexRewriteTest extends TestCase {
   private static class DummyIndex implements ChangeIndex {
     @Override
-    public void insert(ChangeData cd) throws IOException {
+    public ListenableFuture<Void> insert(ChangeData cd) {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public void replace(ChangeData cd) throws IOException {
+    public ListenableFuture<Void> replace(ChangeData cd) {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public void delete(ChangeData cd) throws IOException {
+    public ListenableFuture<Void> delete(ChangeData cd) {
       throw new UnsupportedOperationException();
     }