Cancel outstanding tasks with Future.cancel()

This avoids using API surface of ScheduledThreadPoolExecutor, making
it possible for createQueue to return ScheduledExecutorService instead.

Change-Id: I4f44e45d663d89b1c45ee2fc1d0d29831fd5bebd
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java
index 256164a..01c5ea4 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java
@@ -30,6 +30,8 @@
 import com.google.inject.Singleton;
 import com.google.inject.servlet.ServletModule;
 import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -104,18 +106,16 @@
     final HttpServletResponse rsp = (HttpServletResponse) response;
     final Continuation cont = ContinuationSupport.getContinuation(req);
 
-    ScheduledThreadPoolExecutor executor = getExecutor();
-
     if (cont.isInitial()) {
-      TaskThunk task = new TaskThunk(executor, cont, req);
+      TaskThunk task = new TaskThunk(cont, req);
       if (maxWait > 0) {
         cont.setTimeout(maxWait);
       }
       cont.suspend(rsp);
-      cont.addContinuationListener(task);
       cont.setAttribute(TASK, task);
-      executor.submit(task);
 
+      Future f = getExecutor().submit(task);
+      cont.addContinuationListener(new Listener(f));
     } else if (cont.isExpired()) {
       rsp.sendError(SC_SERVICE_UNAVAILABLE);
 
@@ -149,17 +149,31 @@
   @Override
   public void destroy() {}
 
-  private final class TaskThunk implements CancelableRunnable, ContinuationListener {
+  private final class Listener implements ContinuationListener {
+    final Future future;
 
-    private final ScheduledThreadPoolExecutor executor;
+    Listener(Future future) {
+      this.future = future;
+    }
+
+    @Override
+    public void onComplete(Continuation self) {}
+
+    @Override
+    public void onTimeout(Continuation self) {
+      future.cancel(true);
+    }
+
+  }
+
+  private final class TaskThunk implements CancelableRunnable {
     private final Continuation cont;
     private final String name;
     private final Object lock = new Object();
     private boolean done;
     private Thread worker;
 
-    TaskThunk(ScheduledThreadPoolExecutor executor, Continuation cont, HttpServletRequest req) {
-      this.executor = executor;
+    TaskThunk(Continuation cont, HttpServletRequest req) {
       this.cont = cont;
       this.name = generateName(req);
     }
@@ -204,14 +218,6 @@
     }
 
     @Override
-    public void onComplete(Continuation self) {}
-
-    @Override
-    public void onTimeout(Continuation self) {
-      executor.remove(this);
-    }
-
-    @Override
     public String toString() {
       return name;
     }