Add PARKED WorkQueue.Task state
With the addition of a TaskParker interface, it is possible that the
TaskParkers could delay the starting of Tasks. Add a new state to expose
the task parking.
Release-Notes: Tasks blocked from running by TaskParker implementations show their state as 'parked'
Change-Id: I5b828ca37d171d34f3f774f6011ccdd40a264836
diff --git a/java/com/google/gerrit/server/git/WorkQueue.java b/java/com/google/gerrit/server/git/WorkQueue.java
index afa831f..6f904ea 100644
--- a/java/com/google/gerrit/server/git/WorkQueue.java
+++ b/java/com/google/gerrit/server/git/WorkQueue.java
@@ -634,11 +634,12 @@
return all.values();
}
- public void onStart(Task<?> task) {
+ public void waitUntilReadyToStart(Task<?> task) {
if (!listeners.isEmpty() && !isReadyToStart(task)) {
incrementCorePoolSizeBy(1);
ParkedTask parkedTask = new ParkedTask(task);
parked.offer(parkedTask);
+ task.runningState.set(Task.State.PARKED);
try {
parkedTask.latch.await();
} catch (InterruptedException e) {
@@ -647,6 +648,9 @@
incrementCorePoolSizeBy(-1);
}
}
+ }
+
+ public void onStart(Task<?> task) {
listeners.runEach(extension -> extension.get().onStart(task));
}
@@ -757,13 +761,14 @@
// Ordered like this so ordinal matches the order we would
// prefer to see tasks sorted in: done before running,
// stopping before running, running before starting,
- // starting before ready, ready before sleeping.
+ // starting before parked, parked before ready, ready before sleeping.
//
DONE,
CANCELLED,
STOPPING,
RUNNING,
STARTING,
+ PARKED,
READY,
SLEEPING,
OTHER
@@ -895,10 +900,12 @@
@Override
public void run() {
- if (runningState.compareAndSet(null, State.STARTING)) {
+ if (runningState.compareAndSet(null, State.READY)) {
String oldThreadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName(oldThreadName + "[" + this + "]");
+ executor.waitUntilReadyToStart(this); // Transitions to PARKED while not ready to start
+ runningState.set(State.STARTING);
executor.onStart(this);
runningState.set(State.RUNNING);
task.run();
diff --git a/java/com/google/gerrit/server/restapi/config/GetSummary.java b/java/com/google/gerrit/server/restapi/config/GetSummary.java
index 77af0f3..c76f0a4 100644
--- a/java/com/google/gerrit/server/restapi/config/GetSummary.java
+++ b/java/com/google/gerrit/server/restapi/config/GetSummary.java
@@ -77,6 +77,7 @@
int tasksTotal = pending.size();
int tasksStopping = 0;
int tasksRunning = 0;
+ int tasksParked = 0;
int tasksStarting = 0;
int tasksReady = 0;
int tasksSleeping = 0;
@@ -88,6 +89,9 @@
case RUNNING:
tasksRunning++;
break;
+ case PARKED:
+ tasksParked++;
+ break;
case STARTING:
tasksStarting++;
break;
@@ -108,6 +112,7 @@
taskSummary.total = toInteger(tasksTotal);
taskSummary.stopping = toInteger(tasksStopping);
taskSummary.running = toInteger(tasksRunning);
+ taskSummary.parked = toInteger(tasksParked);
taskSummary.starting = toInteger(tasksStarting);
taskSummary.ready = toInteger(tasksReady);
taskSummary.sleeping = toInteger(tasksSleeping);
@@ -245,6 +250,7 @@
public Integer total;
public Integer stopping;
public Integer running;
+ public Integer parked;
public Integer starting;
public Integer ready;
public Integer sleeping;
diff --git a/java/com/google/gerrit/sshd/commands/ShowQueue.java b/java/com/google/gerrit/sshd/commands/ShowQueue.java
index 00361ad..14915bf 100644
--- a/java/com/google/gerrit/sshd/commands/ShowQueue.java
+++ b/java/com/google/gerrit/sshd/commands/ShowQueue.java
@@ -134,6 +134,7 @@
switch (task.state) {
case DONE:
case CANCELLED:
+ case PARKED:
case STARTING:
case RUNNING:
case STOPPING:
@@ -212,6 +213,8 @@
return "";
case STARTING:
return "starting ...";
+ case PARKED:
+ return "parked .....";
case READY:
return "waiting ....";
case SLEEPING:
diff --git a/javatests/com/google/gerrit/acceptance/server/util/TaskParkerIT.java b/javatests/com/google/gerrit/acceptance/server/util/TaskParkerIT.java
index 11c49ed..3b82ebe 100644
--- a/javatests/com/google/gerrit/acceptance/server/util/TaskParkerIT.java
+++ b/javatests/com/google/gerrit/acceptance/server/util/TaskParkerIT.java
@@ -181,7 +181,7 @@
parker.isReadyToStart.assertCalledEventually();
assertTaskCountIs(1);
- assertStateIs(State.STARTING);
+ assertStateIs(State.READY);
parker.onNotReadyToStart.assertUncalled();
parker.onStart.assertUncalled();
runnable.run.assertUncalled();
@@ -238,6 +238,7 @@
// park runnable1
parker.isReadyToStart.complete(false);
assertCorePoolSizeIsEventually(2);
+ assertStateIs(task1, State.PARKED);
runnable2.run.assertCalledEventually();
assertTaskCountIs(2);
@@ -245,6 +246,7 @@
parker.onStart.assertUncalled();
runnable1.run.assertUncalled();
parker.onStop.assertUncalled();
+ assertStateIs(task1, State.PARKED);
assertCounterIsEventually(forwarder.isReadyToStartCounter, 2);
assertCounter(forwarder.onNotReadyToStartCounter, 0);
@@ -297,6 +299,7 @@
executor.execute(runnable1);
parker.isReadyToStart.assertCalledEventuallyThenComplete(false);
Task<?> task1 = forwarder.task; // task for runnable1
+ assertStateIsEventually(task1, State.PARKED);
assertCounterIsEventually(forwarder.isReadyToStartCounter, 1);
assertCounter(forwarder.onNotReadyToStartCounter, 0);
assertTaskCountIsEventually(1);
@@ -307,6 +310,7 @@
executor.execute(runnable2);
parker.isReadyToStart.assertCalledEventuallyThenComplete(false);
Task<?> task2 = forwarder.task; // task for runnable2
+ assertStateIsEventually(task2, State.PARKED);
assertCounterIsEventually(forwarder.isReadyToStartCounter, 2);
assertCounter(forwarder.onNotReadyToStartCounter, 0);
@@ -322,7 +326,7 @@
assertCounter(forwarder.onNotReadyToStartCounter, 0);
parker.isReadyToStart.assertCalledEventually();
Task<?> task3 = forwarder.task; // task for runnable3
- assertStateIs(task3, State.STARTING);
+ assertStateIs(task3, State.READY);
parker.isReadyToStart.complete(true);
parker.onStart.assertCalledEventually();
assertStateIs(task3, State.STARTING);
@@ -344,6 +348,7 @@
assertCounterIsEventually(forwarder.isReadyToStartCounter, 4);
assertCounter(forwarder.onNotReadyToStartCounter, 0);
runnable2.run.assertUncalled();
+ assertStateIs(task2, State.PARKED);
runnable1.run.complete();
assertCorePoolSizeIsEventually(2);
assertTaskCountIsEventually(1);
@@ -382,6 +387,7 @@
assertCounterIsEventually(forwarder2.isReadyToStartCounter, 1);
assertCounter(forwarder2.onNotReadyToStartCounter, 0);
Task<?> task1 = forwarder.task; // task for runnable1
+ assertStateIsEventually(task1, State.PARKED);
// set parker2 to ready and execute runnable-2
parker2.isReadyToStart.set(true);
@@ -433,7 +439,9 @@
forwarder.resetDelegate(parker);
executor.execute(runnable2);
parker.isReadyToStart.assertCalledEventually();
- assertCorePoolSizeIsEventually(3); // asserts runnable2 is parked
+ assertCorePoolSizeIsEventually(3);
+ Task<?> task2 = forwarder.task; // task for runnable2
+ assertStateIs(task2, State.PARKED);
forwarder.resetDelegate(parker);
runnable1.run.complete(); // unblock runnable1
@@ -452,6 +460,7 @@
executor.execute(runnable1);
parker.isReadyToStart.assertCalledEventuallyThenComplete(false);
assertCorePoolSizeIsEventually(2);
+ assertStateIsEventually(forwarder.task, State.PARKED);
// interrupt the thread with parked task
for (Thread t : Thread.getAllStackTraces().keySet()) {
@@ -474,6 +483,7 @@
parker.isReadyToStart.assertCalledEventuallyThenComplete(false);
assertCorePoolSizeIsEventually(2);
Task<?> task = forwarder.task;
+ assertStateIsEventually(task, State.PARKED);
// cancel parked task
task.cancel(true);
@@ -526,4 +536,13 @@
private void assertStateIs(Task<?> task, Task.State state) {
TaskListenerIT.assertStateIs(task, state);
}
+
+ private void assertStateIsEventually(Task<?> task, Task.State state) throws InterruptedException {
+ long ms = 0;
+ assertThat(task).isNotNull();
+ while (!task.getState().equals(state)) {
+ assertThat(ms++).isLessThan(TIMEOUT);
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+ }
}