Cancel outstanding tasks with Future.cancel()
This avoids using API surface of ScheduledThreadPoolExecutor, making
it possible for createQueue to return ScheduledExecutorService instead.
Change-Id: I4f44e45d663d89b1c45ee2fc1d0d29831fd5bebd
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java
index 256164a..01c5ea4 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java
@@ -30,6 +30,8 @@
import com.google.inject.Singleton;
import com.google.inject.servlet.ServletModule;
import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -104,18 +106,16 @@
final HttpServletResponse rsp = (HttpServletResponse) response;
final Continuation cont = ContinuationSupport.getContinuation(req);
- ScheduledThreadPoolExecutor executor = getExecutor();
-
if (cont.isInitial()) {
- TaskThunk task = new TaskThunk(executor, cont, req);
+ TaskThunk task = new TaskThunk(cont, req);
if (maxWait > 0) {
cont.setTimeout(maxWait);
}
cont.suspend(rsp);
- cont.addContinuationListener(task);
cont.setAttribute(TASK, task);
- executor.submit(task);
+ Future f = getExecutor().submit(task);
+ cont.addContinuationListener(new Listener(f));
} else if (cont.isExpired()) {
rsp.sendError(SC_SERVICE_UNAVAILABLE);
@@ -149,17 +149,31 @@
@Override
public void destroy() {}
- private final class TaskThunk implements CancelableRunnable, ContinuationListener {
+ private final class Listener implements ContinuationListener {
+ final Future future;
- private final ScheduledThreadPoolExecutor executor;
+ Listener(Future future) {
+ this.future = future;
+ }
+
+ @Override
+ public void onComplete(Continuation self) {}
+
+ @Override
+ public void onTimeout(Continuation self) {
+ future.cancel(true);
+ }
+
+ }
+
+ private final class TaskThunk implements CancelableRunnable {
private final Continuation cont;
private final String name;
private final Object lock = new Object();
private boolean done;
private Thread worker;
- TaskThunk(ScheduledThreadPoolExecutor executor, Continuation cont, HttpServletRequest req) {
- this.executor = executor;
+ TaskThunk(Continuation cont, HttpServletRequest req) {
this.cont = cont;
this.name = generateName(req);
}
@@ -204,14 +218,6 @@
}
@Override
- public void onComplete(Continuation self) {}
-
- @Override
- public void onTimeout(Continuation self) {
- executor.remove(this);
- }
-
- @Override
public String toString() {
return name;
}