Break IntraLineDiff threads into their own module

This small refactoring allows me to manually bind the
IntraLineWorkerPool within the gerrit-review server build,
offering better control over how the threads are managed
by this pool.

Change-Id: I26c56ef3c57a86189e1c6a1b668b464e4f079995
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
index 6e825ef..9654804 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
@@ -51,13 +51,14 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
 import com.google.gerrit.server.mail.SmtpEmailSender;
+import com.google.gerrit.server.patch.IntraLineWorkerPool;
 import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
 import com.google.gerrit.server.plugins.PluginModule;
 import com.google.gerrit.server.schema.SchemaUpdater;
 import com.google.gerrit.server.schema.SchemaVersionCheck;
 import com.google.gerrit.server.schema.UpdateUI;
-import com.google.gerrit.server.ssh.NoSshModule;
 import com.google.gerrit.server.ssh.NoSshKeyCache;
+import com.google.gerrit.server.ssh.NoSshModule;
 import com.google.gerrit.sshd.SshKeyCacheImpl;
 import com.google.gerrit.sshd.SshModule;
 import com.google.gerrit.sshd.commands.MasterCommandModule;
@@ -310,6 +311,7 @@
     modules.add(new WorkQueue.Module());
     modules.add(new ChangeHookRunner.Module());
     modules.add(new ReceiveCommitsExecutorModule());
+    modules.add(new IntraLineWorkerPool.Module());
     modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
     modules.add(new DefaultCacheFactory.Module());
     modules.add(new SmtpEmailSender.Module());
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
index 5b65920..b95994f 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
@@ -29,10 +29,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
@@ -44,16 +41,12 @@
   private static final Pattern CONTROL_BLOCK_START_RE = Pattern
       .compile("[{:][ \\t]*$");
 
-  private final BlockingQueue<Worker> workerPool;
+  private final IntraLineWorkerPool workerPool;
   private final long timeoutMillis;
 
   @Inject
-  IntraLineLoader(final @GerritServerConfig Config cfg) {
-    final int workers =
-        cfg.getInt("cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
-            Runtime.getRuntime().availableProcessors() * 3 / 2);
-    workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
-
+  IntraLineLoader(IntraLineWorkerPool pool, @GerritServerConfig Config cfg) {
+    workerPool = pool;
     timeoutMillis =
         ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME,
             "timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS),
@@ -62,26 +55,17 @@
 
   @Override
   public IntraLineDiff load(IntraLineDiffKey key) throws Exception {
-    Worker w = workerPool.poll();
-    if (w == null) {
-      w = new Worker();
-    }
+    IntraLineWorkerPool.Worker w = workerPool.acquire();
+    IntraLineWorkerPool.Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
 
-    Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
-
-    if (r == Worker.Result.TIMEOUT) {
+    if (r == IntraLineWorkerPool.Worker.Result.TIMEOUT) {
       // Don't keep this thread. We have to murder it unsafely, which
       // means its unable to be reused in the future. Return back a
       // null result, indicating the cache cannot load this key.
       //
       return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT);
     }
-
-    if (!workerPool.offer(w)) {
-      // If the idle worker pool is full, terminate this thread.
-      //
-      w.end();
-    }
+    workerPool.release(w);
 
     if (r.error != null) {
       // If there was an error computing the result, carry it
@@ -93,127 +77,7 @@
     return r.diff;
   }
 
-  private static class Worker {
-    private static final AtomicInteger count = new AtomicInteger(1);
-
-    private final ArrayBlockingQueue<Input> input;
-    private final ArrayBlockingQueue<Result> result;
-    private final Thread thread;
-
-    Worker() {
-      input = new ArrayBlockingQueue<Input>(1);
-      result = new ArrayBlockingQueue<Result>(1);
-
-      thread = new Thread(new Runnable() {
-        public void run() {
-          workerLoop();
-        }
-      });
-      thread.setName("IntraLineDiff-" + count.getAndIncrement());
-      thread.setDaemon(true);
-      thread.start();
-    }
-
-    Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
-        throws Exception {
-      if (!input.offer(new Input(key))) {
-        log.error("Cannot enqueue task to thread " + thread.getName());
-        return Result.TIMEOUT;
-      }
-
-      Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
-      if (r != null) {
-        return r;
-      } else {
-        log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
-            + " in project " + key.getProject().get() //
-            + " on commit " + key.getCommit().name() //
-            + " for path " + key.getPath() //
-            + " comparing " + key.getBlobA().name() //
-            + ".." + key.getBlobB().name() //
-            + ".  Killing " + thread.getName());
-        forcefullyKillThreadInAnUglyWay();
-        return Result.TIMEOUT;
-      }
-    }
-
-    @SuppressWarnings("deprecation")
-    private void forcefullyKillThreadInAnUglyWay() {
-      try {
-        thread.stop();
-      } catch (Throwable error) {
-        // Ignore any reason the thread won't stop.
-        log.error("Cannot stop runaway thread " + thread.getName(), error);
-      }
-    }
-
-    void end() {
-      if (!input.offer(Input.END_THREAD)) {
-        log.error("Cannot gracefully stop thread " + thread.getName());
-      }
-    }
-
-    private void workerLoop() {
-      try {
-        for (;;) {
-          Input in;
-          try {
-            in = input.take();
-          } catch (InterruptedException e) {
-            log.error("Unexpected interrupt on " + thread.getName());
-            continue;
-          }
-
-          if (in == Input.END_THREAD) {
-            return;
-          }
-
-          Result r;
-          try {
-            r = new Result(IntraLineLoader.compute(in.key));
-          } catch (Exception error) {
-            r = new Result(error);
-          }
-
-          if (!result.offer(r)) {
-            log.error("Cannot return result from " + thread.getName());
-          }
-        }
-      } catch (ThreadDeath iHaveBeenShot) {
-        // Handle thread death by gracefully returning to the caller,
-        // allowing the thread to be destroyed.
-      }
-    }
-
-    private static class Input {
-      static final Input END_THREAD = new Input(null);
-
-      final IntraLineDiffKey key;
-
-      Input(IntraLineDiffKey key) {
-        this.key = key;
-      }
-    }
-
-    static class Result {
-      static final Result TIMEOUT = new Result((IntraLineDiff) null);
-
-      final IntraLineDiff diff;
-      final Exception error;
-
-      Result(IntraLineDiff diff) {
-        this.diff = diff;
-        this.error = null;
-      }
-
-      Result(Exception error) {
-        this.diff = null;
-        this.error = error;
-      }
-    }
-  }
-
-  private static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
+  static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
     List<Edit> edits = new ArrayList<Edit>(key.getEdits());
     Text aContent = key.getTextA();
     Text bContent = key.getTextB();
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java
new file mode 100644
index 0000000..5c6338fe
--- /dev/null
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java
@@ -0,0 +1,182 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package com.google.gerrit.server.patch;
+
+import static com.google.gerrit.server.patch.IntraLineLoader.log;
+
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.eclipse.jgit.lib.Config;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+public class IntraLineWorkerPool {
+  public static class Module extends AbstractModule {
+    @Override
+    protected void configure() {
+      bind(IntraLineWorkerPool.class);
+    }
+  }
+
+  private final BlockingQueue<Worker> workerPool;
+
+  @Inject
+  public IntraLineWorkerPool(@GerritServerConfig Config cfg) {
+    int workers = cfg.getInt(
+        "cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
+        Runtime.getRuntime().availableProcessors() * 3 / 2);
+    workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
+  }
+
+  Worker acquire() {
+    Worker w = workerPool.poll();
+    if (w == null) {
+      // If no worker is immediately available, start a new one.
+      // Maximum parallelism is controlled by the web server.
+      w = new Worker();
+      w.start();
+    }
+    return w;
+  }
+
+  void release(Worker w) {
+    if (!workerPool.offer(w)) {
+      // If the idle worker pool is full, terminate the worker.
+      w.shutdownGracefully();
+    }
+  }
+
+  static class Worker extends Thread {
+    private static final AtomicInteger count = new AtomicInteger(1);
+
+    private final ArrayBlockingQueue<Input> input;
+    private final ArrayBlockingQueue<Result> result;
+
+    Worker() {
+      input = new ArrayBlockingQueue<Input>(1);
+      result = new ArrayBlockingQueue<Result>(1);
+
+      setName("IntraLineDiff-" + count.getAndIncrement());
+      setDaemon(true);
+    }
+
+    Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
+        throws Exception {
+      if (!input.offer(new Input(key))) {
+        log.error("Cannot enqueue task to thread " + getName());
+        return Result.TIMEOUT;
+      }
+
+      Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+      if (r != null) {
+        return r;
+      } else {
+        log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
+            + " in project " + key.getProject().get()
+            + " on commit " + key.getCommit().name()
+            + " for path " + key.getPath()
+            + " comparing " + key.getBlobA().name()
+            + ".." + key.getBlobB().name()
+            + ".  Killing " + getName());
+        forcefullyKillThreadInAnUglyWay();
+        return Result.TIMEOUT;
+      }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void forcefullyKillThreadInAnUglyWay() {
+      try {
+        stop();
+      } catch (Throwable error) {
+        // Ignore any reason the thread won't stop.
+        log.error("Cannot stop runaway thread " + getName(), error);
+      }
+    }
+
+    private void shutdownGracefully() {
+      if (!input.offer(Input.END_THREAD)) {
+        log.error("Cannot gracefully stop thread " + getName());
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        for (;;) {
+          Input in;
+          try {
+            in = input.take();
+          } catch (InterruptedException e) {
+            log.error("Unexpected interrupt on " + getName());
+            continue;
+          }
+
+          if (in == Input.END_THREAD) {
+            return;
+          }
+
+          Result r;
+          try {
+            r = new Result(IntraLineLoader.compute(in.key));
+          } catch (Exception error) {
+            r = new Result(error);
+          }
+
+          if (!result.offer(r)) {
+            log.error("Cannot return result from " + getName());
+          }
+        }
+      } catch (ThreadDeath iHaveBeenShot) {
+        // Handle thread death by gracefully returning to the caller,
+        // allowing the thread to be destroyed.
+      }
+    }
+
+    private static class Input {
+      static final Input END_THREAD = new Input(null);
+
+      final IntraLineDiffKey key;
+
+      Input(IntraLineDiffKey key) {
+        this.key = key;
+      }
+    }
+
+    static class Result {
+      static final Result TIMEOUT = new Result((IntraLineDiff) null);
+
+      final IntraLineDiff diff;
+      final Exception error;
+
+      Result(IntraLineDiff diff) {
+        this.diff = diff;
+        this.error = null;
+      }
+
+      Result(Exception error) {
+        this.diff = null;
+        this.error = error;
+      }
+    }
+  }
+}
diff --git a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
index 3059dfa..d0b8c38 100644
--- a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
+++ b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
@@ -39,6 +39,7 @@
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
 import com.google.gerrit.server.mail.SmtpEmailSender;
+import com.google.gerrit.server.patch.IntraLineWorkerPool;
 import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
 import com.google.gerrit.server.plugins.PluginModule;
 import com.google.gerrit.server.schema.DataSourceModule;
@@ -229,6 +230,7 @@
     modules.add(new WorkQueue.Module());
     modules.add(new ChangeHookRunner.Module());
     modules.add(new ReceiveCommitsExecutorModule());
+    modules.add(new IntraLineWorkerPool.Module());
     modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
     modules.add(new DefaultCacheFactory.Module());
     modules.add(new SmtpEmailSender.Module());