Break IntraLineDiff threads into their own module
This small refactoring allows me to manually bind the
IntraLineWorkerPool within the gerrit-review server build,
offering better control over how the threads are managed
by this pool.
Change-Id: I26c56ef3c57a86189e1c6a1b668b464e4f079995
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
index 6e825ef..9654804 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
@@ -51,13 +51,14 @@
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
import com.google.gerrit.server.mail.SmtpEmailSender;
+import com.google.gerrit.server.patch.IntraLineWorkerPool;
import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
import com.google.gerrit.server.plugins.PluginModule;
import com.google.gerrit.server.schema.SchemaUpdater;
import com.google.gerrit.server.schema.SchemaVersionCheck;
import com.google.gerrit.server.schema.UpdateUI;
-import com.google.gerrit.server.ssh.NoSshModule;
import com.google.gerrit.server.ssh.NoSshKeyCache;
+import com.google.gerrit.server.ssh.NoSshModule;
import com.google.gerrit.sshd.SshKeyCacheImpl;
import com.google.gerrit.sshd.SshModule;
import com.google.gerrit.sshd.commands.MasterCommandModule;
@@ -310,6 +311,7 @@
modules.add(new WorkQueue.Module());
modules.add(new ChangeHookRunner.Module());
modules.add(new ReceiveCommitsExecutorModule());
+ modules.add(new IntraLineWorkerPool.Module());
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
modules.add(new DefaultCacheFactory.Module());
modules.add(new SmtpEmailSender.Module());
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
index 5b65920..b95994f 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java
@@ -29,10 +29,7 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
@@ -44,16 +41,12 @@
private static final Pattern CONTROL_BLOCK_START_RE = Pattern
.compile("[{:][ \\t]*$");
- private final BlockingQueue<Worker> workerPool;
+ private final IntraLineWorkerPool workerPool;
private final long timeoutMillis;
@Inject
- IntraLineLoader(final @GerritServerConfig Config cfg) {
- final int workers =
- cfg.getInt("cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
- Runtime.getRuntime().availableProcessors() * 3 / 2);
- workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
-
+ IntraLineLoader(IntraLineWorkerPool pool, @GerritServerConfig Config cfg) {
+ workerPool = pool;
timeoutMillis =
ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME,
"timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS),
@@ -62,26 +55,17 @@
@Override
public IntraLineDiff load(IntraLineDiffKey key) throws Exception {
- Worker w = workerPool.poll();
- if (w == null) {
- w = new Worker();
- }
+ IntraLineWorkerPool.Worker w = workerPool.acquire();
+ IntraLineWorkerPool.Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
- Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
-
- if (r == Worker.Result.TIMEOUT) {
+ if (r == IntraLineWorkerPool.Worker.Result.TIMEOUT) {
// Don't keep this thread. We have to murder it unsafely, which
// means its unable to be reused in the future. Return back a
// null result, indicating the cache cannot load this key.
//
return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT);
}
-
- if (!workerPool.offer(w)) {
- // If the idle worker pool is full, terminate this thread.
- //
- w.end();
- }
+ workerPool.release(w);
if (r.error != null) {
// If there was an error computing the result, carry it
@@ -93,127 +77,7 @@
return r.diff;
}
- private static class Worker {
- private static final AtomicInteger count = new AtomicInteger(1);
-
- private final ArrayBlockingQueue<Input> input;
- private final ArrayBlockingQueue<Result> result;
- private final Thread thread;
-
- Worker() {
- input = new ArrayBlockingQueue<Input>(1);
- result = new ArrayBlockingQueue<Result>(1);
-
- thread = new Thread(new Runnable() {
- public void run() {
- workerLoop();
- }
- });
- thread.setName("IntraLineDiff-" + count.getAndIncrement());
- thread.setDaemon(true);
- thread.start();
- }
-
- Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
- throws Exception {
- if (!input.offer(new Input(key))) {
- log.error("Cannot enqueue task to thread " + thread.getName());
- return Result.TIMEOUT;
- }
-
- Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
- if (r != null) {
- return r;
- } else {
- log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
- + " in project " + key.getProject().get() //
- + " on commit " + key.getCommit().name() //
- + " for path " + key.getPath() //
- + " comparing " + key.getBlobA().name() //
- + ".." + key.getBlobB().name() //
- + ". Killing " + thread.getName());
- forcefullyKillThreadInAnUglyWay();
- return Result.TIMEOUT;
- }
- }
-
- @SuppressWarnings("deprecation")
- private void forcefullyKillThreadInAnUglyWay() {
- try {
- thread.stop();
- } catch (Throwable error) {
- // Ignore any reason the thread won't stop.
- log.error("Cannot stop runaway thread " + thread.getName(), error);
- }
- }
-
- void end() {
- if (!input.offer(Input.END_THREAD)) {
- log.error("Cannot gracefully stop thread " + thread.getName());
- }
- }
-
- private void workerLoop() {
- try {
- for (;;) {
- Input in;
- try {
- in = input.take();
- } catch (InterruptedException e) {
- log.error("Unexpected interrupt on " + thread.getName());
- continue;
- }
-
- if (in == Input.END_THREAD) {
- return;
- }
-
- Result r;
- try {
- r = new Result(IntraLineLoader.compute(in.key));
- } catch (Exception error) {
- r = new Result(error);
- }
-
- if (!result.offer(r)) {
- log.error("Cannot return result from " + thread.getName());
- }
- }
- } catch (ThreadDeath iHaveBeenShot) {
- // Handle thread death by gracefully returning to the caller,
- // allowing the thread to be destroyed.
- }
- }
-
- private static class Input {
- static final Input END_THREAD = new Input(null);
-
- final IntraLineDiffKey key;
-
- Input(IntraLineDiffKey key) {
- this.key = key;
- }
- }
-
- static class Result {
- static final Result TIMEOUT = new Result((IntraLineDiff) null);
-
- final IntraLineDiff diff;
- final Exception error;
-
- Result(IntraLineDiff diff) {
- this.diff = diff;
- this.error = null;
- }
-
- Result(Exception error) {
- this.diff = null;
- this.error = error;
- }
- }
- }
-
- private static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
+ static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
List<Edit> edits = new ArrayList<Edit>(key.getEdits());
Text aContent = key.getTextA();
Text bContent = key.getTextB();
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java
new file mode 100644
index 0000000..5c6338fe
--- /dev/null
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java
@@ -0,0 +1,182 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+package com.google.gerrit.server.patch;
+
+import static com.google.gerrit.server.patch.IntraLineLoader.log;
+
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import org.eclipse.jgit.lib.Config;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Singleton
+public class IntraLineWorkerPool {
+ public static class Module extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(IntraLineWorkerPool.class);
+ }
+ }
+
+ private final BlockingQueue<Worker> workerPool;
+
+ @Inject
+ public IntraLineWorkerPool(@GerritServerConfig Config cfg) {
+ int workers = cfg.getInt(
+ "cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
+ Runtime.getRuntime().availableProcessors() * 3 / 2);
+ workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
+ }
+
+ Worker acquire() {
+ Worker w = workerPool.poll();
+ if (w == null) {
+ // If no worker is immediately available, start a new one.
+ // Maximum parallelism is controlled by the web server.
+ w = new Worker();
+ w.start();
+ }
+ return w;
+ }
+
+ void release(Worker w) {
+ if (!workerPool.offer(w)) {
+ // If the idle worker pool is full, terminate the worker.
+ w.shutdownGracefully();
+ }
+ }
+
+ static class Worker extends Thread {
+ private static final AtomicInteger count = new AtomicInteger(1);
+
+ private final ArrayBlockingQueue<Input> input;
+ private final ArrayBlockingQueue<Result> result;
+
+ Worker() {
+ input = new ArrayBlockingQueue<Input>(1);
+ result = new ArrayBlockingQueue<Result>(1);
+
+ setName("IntraLineDiff-" + count.getAndIncrement());
+ setDaemon(true);
+ }
+
+ Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
+ throws Exception {
+ if (!input.offer(new Input(key))) {
+ log.error("Cannot enqueue task to thread " + getName());
+ return Result.TIMEOUT;
+ }
+
+ Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (r != null) {
+ return r;
+ } else {
+ log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
+ + " in project " + key.getProject().get()
+ + " on commit " + key.getCommit().name()
+ + " for path " + key.getPath()
+ + " comparing " + key.getBlobA().name()
+ + ".." + key.getBlobB().name()
+ + ". Killing " + getName());
+ forcefullyKillThreadInAnUglyWay();
+ return Result.TIMEOUT;
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private void forcefullyKillThreadInAnUglyWay() {
+ try {
+ stop();
+ } catch (Throwable error) {
+ // Ignore any reason the thread won't stop.
+ log.error("Cannot stop runaway thread " + getName(), error);
+ }
+ }
+
+ private void shutdownGracefully() {
+ if (!input.offer(Input.END_THREAD)) {
+ log.error("Cannot gracefully stop thread " + getName());
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ Input in;
+ try {
+ in = input.take();
+ } catch (InterruptedException e) {
+ log.error("Unexpected interrupt on " + getName());
+ continue;
+ }
+
+ if (in == Input.END_THREAD) {
+ return;
+ }
+
+ Result r;
+ try {
+ r = new Result(IntraLineLoader.compute(in.key));
+ } catch (Exception error) {
+ r = new Result(error);
+ }
+
+ if (!result.offer(r)) {
+ log.error("Cannot return result from " + getName());
+ }
+ }
+ } catch (ThreadDeath iHaveBeenShot) {
+ // Handle thread death by gracefully returning to the caller,
+ // allowing the thread to be destroyed.
+ }
+ }
+
+ private static class Input {
+ static final Input END_THREAD = new Input(null);
+
+ final IntraLineDiffKey key;
+
+ Input(IntraLineDiffKey key) {
+ this.key = key;
+ }
+ }
+
+ static class Result {
+ static final Result TIMEOUT = new Result((IntraLineDiff) null);
+
+ final IntraLineDiff diff;
+ final Exception error;
+
+ Result(IntraLineDiff diff) {
+ this.diff = diff;
+ this.error = null;
+ }
+
+ Result(Exception error) {
+ this.diff = null;
+ this.error = error;
+ }
+ }
+ }
+}
diff --git a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
index 3059dfa..d0b8c38 100644
--- a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
+++ b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
@@ -39,6 +39,7 @@
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
import com.google.gerrit.server.mail.SmtpEmailSender;
+import com.google.gerrit.server.patch.IntraLineWorkerPool;
import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
import com.google.gerrit.server.plugins.PluginModule;
import com.google.gerrit.server.schema.DataSourceModule;
@@ -229,6 +230,7 @@
modules.add(new WorkQueue.Module());
modules.add(new ChangeHookRunner.Module());
modules.add(new ReceiveCommitsExecutorModule());
+ modules.add(new IntraLineWorkerPool.Module());
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
modules.add(new DefaultCacheFactory.Module());
modules.add(new SmtpEmailSender.Module());