Share the threads/permits for soft maxes
```
[quota "project-a"]
softMaxStartPerUserForQueue = 5 SSH-Interactive-Worker
[quota "project-b"]
softMaxStartPerUserForQueue = 10 SSH-Interactive-Worker
```
Previously, for the above configuration, The computation of free threads
was buggy since each instance is not aware of the permits acquired by
the other quota. This results in config not being enforced correctly.
Fix this to only have single shared QueueStats object.
Change-Id: I7e19bc4370c8fd30e158092f7b0d1f12e59fa5e2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/QueueStats.java b/src/main/java/com/googlesource/gerrit/plugins/quota/QueueStats.java
new file mode 100644
index 0000000..ebf2f8a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/QueueStats.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.quota;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueueStats {
+ public enum Queue {
+ INTERACTIVE("SSH-Interactive-Worker"),
+ BATCH("SSH-Batch-Worker"),
+ UNKNOWN("UNKNOWN");
+
+ private final String name;
+
+ Queue(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static String[] keys() {
+ return Arrays.stream(Queue.values()).map(Queue::getName).toArray(String[]::new);
+ }
+
+ public static Queue fromKey(String key) {
+ for (Queue q : Queue.values()) {
+ if (q.getName().equals(key)) {
+ return q;
+ }
+ }
+ return UNKNOWN;
+ }
+ }
+
+ public static Map<Queue, AtomicInteger> availableThreadsPerQueue = new HashMap<>();
+
+ public static void initQueueWithCapacity(Queue q, int c) {
+ availableThreadsPerQueue.put(q, new AtomicInteger(c));
+ }
+
+ public static boolean acquire(Queue q, int c) {
+ AtomicInteger available = availableThreadsPerQueue.get(q);
+ if (available == null) {
+ return true;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ available.updateAndGet(
+ current -> {
+ if (current < c) {
+ success.setPlain(false);
+ return current;
+ }
+ success.setPlain(true);
+ return current - c;
+ });
+ return success.getPlain();
+ }
+
+ public static void release(Queue q, int c) {
+ AtomicInteger available = availableThreadsPerQueue.get(q);
+ if (available != null) {
+ available.addAndGet(c);
+ }
+ }
+
+ public static boolean ensureIdle(Queue q, int c) {
+ AtomicInteger available = availableThreadsPerQueue.get(q);
+ if (available == null) {
+ return true;
+ }
+
+ return available.get() > c;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java b/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java
index f70f24a..a1be14b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java
@@ -62,12 +62,11 @@
return cfg.getLong(QUOTA, namespace, KEY_MAX_TOTAL_SIZE, Long.MAX_VALUE);
}
- public List<TaskQuota> getAllQuotas(TaskQuota.BuildInfo buildInfo) {
+ public List<TaskQuota> getAllQuotas() {
return Arrays.stream(TaskQuotaKeys.values())
.flatMap(
type ->
Arrays.stream(cfg.getStringList(QUOTA, namespace, type.key))
- .map(buildInfo::generateWithCfg)
.map(type.processor)
.flatMap(Optional::stream))
.toList();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java b/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java
index 60d2d84..c2d623e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java
@@ -17,37 +17,28 @@
import static com.googlesource.gerrit.plugins.quota.TaskParser.user;
import com.google.gerrit.server.git.WorkQueue;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class SoftMaxPerUserForQueue extends TaskQuota {
- public static final Map<String, Function<BuildInfo, Integer>> QUEUES =
- Map.of(
- "SSH-Interactive-Worker",
- BuildInfo::interactiveThreads,
- "SSH-Batch-Worker",
- BuildInfo::batchThreads);
+public class SoftMaxPerUserForQueue implements TaskQuota {
public static final Pattern CONFIG_PATTERN =
- Pattern.compile("(\\d+)\\s+(" + String.join("|", QUEUES.keySet()) + ")");
+ Pattern.compile("(\\d+)\\s+(" + String.join("|", QueueStats.Queue.keys()) + ")");
private final int softMax;
- private final String queueName;
+ private final QueueStats.Queue queue;
private final ConcurrentHashMap<String, Integer> taskStartedCountByUser =
new ConcurrentHashMap<>();
- public SoftMaxPerUserForQueue(int maxPermits, int softMax, String queueName) {
- super(maxPermits);
+ public SoftMaxPerUserForQueue(int softMax, String queueName) {
this.softMax = softMax;
- this.queueName = queueName;
+ this.queue = QueueStats.Queue.fromKey(queueName);
}
@Override
public boolean isApplicable(WorkQueue.Task<?> task) {
- return task.getQueueName().equals(queueName);
+ return task.getQueueName().equals(queue.getName());
}
@Override
@@ -60,14 +51,9 @@
user,
(key, val) -> {
int runningTasks = (val != null) ? val : 0;
- boolean overSoftLimit = runningTasks >= softMax;
- int permitCost = overSoftLimit ? 2 : 1;
- if (permits.tryAcquire(permitCost)) {
+ if (runningTasks < softMax || QueueStats.ensureIdle(queue, 1)) {
acquired.setPlain(true);
- if (overSoftLimit) {
- permits.release(1);
- }
++runningTasks;
}
return runningTasks;
@@ -85,19 +71,15 @@
taskStartedCountByUser.computeIfPresent(
user,
(u, tasks) -> {
- permits.release(1);
return tasks == 1 ? null : --tasks;
}));
}
- public static Optional<TaskQuota> build(BuildInfo buildInfo) {
- Matcher matcher = CONFIG_PATTERN.matcher(buildInfo.config());
+ public static Optional<TaskQuota> build(String cfg) {
+ Matcher matcher = CONFIG_PATTERN.matcher(cfg);
return matcher.find()
? Optional.of(
- new SoftMaxPerUserForQueue(
- QUEUES.get(matcher.group(2)).apply(buildInfo),
- Integer.parseInt(matcher.group(1)),
- matcher.group(2)))
+ new SoftMaxPerUserForQueue(Integer.parseInt(matcher.group(1)), matcher.group(2)))
: Optional.empty();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java
index ab11138..1e689a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java
@@ -15,35 +15,11 @@
package com.googlesource.gerrit.plugins.quota;
import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.Semaphore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class TaskQuota {
- protected static final Logger log = LoggerFactory.getLogger(TaskQuota.class);
- protected final Semaphore permits;
+public interface TaskQuota {
+ boolean isApplicable(WorkQueue.Task<?> task);
- public TaskQuota(int maxPermits) {
- this.permits = new Semaphore(maxPermits);
- }
+ boolean tryAcquire(WorkQueue.Task<?> task);
- public abstract boolean isApplicable(WorkQueue.Task<?> task);
-
- public boolean tryAcquire(WorkQueue.Task<?> task) {
- return permits.tryAcquire();
- }
-
- public void release(WorkQueue.Task<?> task) {
- permits.release();
- }
-
- public record BuildInfo(String config, int interactiveThreads, int batchThreads) {
- public BuildInfo(int interactiveThreads, int batchThreads) {
- this("", interactiveThreads, batchThreads);
- }
-
- public BuildInfo generateWithCfg(String cfg) {
- return new BuildInfo(cfg, interactiveThreads, batchThreads);
- }
- }
+ void release(WorkQueue.Task<?> task);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java
index 14f74ac..2d15abe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java
@@ -18,7 +18,7 @@
import java.util.Map;
import java.util.Set;
-public abstract class TaskQuotaForTask extends TaskQuota {
+public abstract class TaskQuotaForTask extends TaskQuotaWithPermits {
protected static final Map<String, Set<String>> SUPPORTED_TASKS_BY_GROUP =
Map.of("uploadpack", Set.of("git-upload-pack"), "receivepack", Set.of("git-receive-pack"));
private final String taskGroup;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java
index 69ef8bf..f8ae3cc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java
@@ -18,8 +18,11 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TaskQuotaForTaskForQueue extends TaskQuotaForTask {
+ public static final Logger log = LoggerFactory.getLogger(TaskQuotaForTaskForQueue.class);
public static final Pattern CONFIG_PATTERN =
Pattern.compile(
"(\\d+)\\s+(" + String.join("|", SUPPORTED_TASKS_BY_GROUP.keySet()) + ")\\s+(.+)");
@@ -35,14 +38,14 @@
return super.isApplicable(task) && task.getQueueName().equals(queueName);
}
- public static Optional<TaskQuota> build(BuildInfo buildInfo) {
- Matcher matcher = CONFIG_PATTERN.matcher(buildInfo.config());
+ public static Optional<TaskQuota> build(String cfg) {
+ Matcher matcher = CONFIG_PATTERN.matcher(cfg);
if (matcher.matches()) {
return Optional.of(
new TaskQuotaForTaskForQueue(
matcher.group(3), matcher.group(2), Integer.parseInt(matcher.group(1))));
} else {
- log.error("Invalid configuration entry [{}]", buildInfo.config());
+ log.error("Invalid configuration entry [{}]", cfg);
return Optional.empty();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java
index eb64ae5..7582619 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java
@@ -18,8 +18,11 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TaskQuotaForTaskForQueueForUser extends TaskQuotaForTaskForQueue {
+ public static final Logger log = LoggerFactory.getLogger(TaskQuotaForTaskForQueueForUser.class);
public static final Pattern CONFIG_PATTERN =
Pattern.compile(
"(\\d+)\\s+("
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java
index 469441d..1d840e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java
@@ -26,9 +26,9 @@
SOFT_MAX_START_FOR_QUEUE_PER_USER("softMaxStartPerUserForQueue", SoftMaxPerUserForQueue::build);
public final String key;
- public final Function<TaskQuota.BuildInfo, Optional<TaskQuota>> processor;
+ public final Function<String, Optional<TaskQuota>> processor;
- TaskQuotaKeys(String key, Function<TaskQuota.BuildInfo, Optional<TaskQuota>> processor) {
+ TaskQuotaKeys(String key, Function<String, Optional<TaskQuota>> processor) {
this.key = key;
this.processor = processor;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java
index 311b365..783045f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java
@@ -17,8 +17,11 @@
import com.google.gerrit.server.git.WorkQueue;
import java.util.Optional;
import java.util.regex.Matcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TaskQuotaPerUserForTaskForQueue extends TaskQuotaForTaskForQueue {
+ public static final Logger log = LoggerFactory.getLogger(TaskQuotaPerUserForTaskForQueue.class);
private final PerUserTaskQuota perUserTaskQuota;
public TaskQuotaPerUserForTaskForQueue(String queue, String taskGroup, int maxStart) {
@@ -36,14 +39,14 @@
perUserTaskQuota.release(task);
}
- public static Optional<TaskQuota> build(BuildInfo buildInfo) {
- Matcher matcher = CONFIG_PATTERN.matcher(buildInfo.config());
+ public static Optional<TaskQuota> build(String cfg) {
+ Matcher matcher = CONFIG_PATTERN.matcher(cfg);
if (matcher.matches()) {
return Optional.of(
new TaskQuotaPerUserForTaskForQueue(
matcher.group(3), matcher.group(2), Integer.parseInt(matcher.group(1))));
} else {
- log.error("Invalid configuration entry [{}]", buildInfo.config());
+ log.error("Invalid configuration entry [{}]", cfg);
return Optional.empty();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaWithPermits.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaWithPermits.java
new file mode 100644
index 0000000..8a8bc94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaWithPermits.java
@@ -0,0 +1,20 @@
+package com.googlesource.gerrit.plugins.quota;
+
+import com.google.gerrit.server.git.WorkQueue;
+import java.util.concurrent.Semaphore;
+
+public abstract class TaskQuotaWithPermits implements TaskQuota {
+ protected final Semaphore permits;
+
+ public TaskQuotaWithPermits(int maxPermits) {
+ this.permits = new Semaphore(maxPermits);
+ }
+
+ public boolean tryAcquire(WorkQueue.Task<?> task) {
+ return permits.tryAcquire();
+ }
+
+ public void release(WorkQueue.Task<?> task) {
+ permits.release();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java
index 1b776d6..a4af200 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java
@@ -38,7 +38,6 @@
private final Map<QuotaSection, List<TaskQuota>> quotasByNamespace = new HashMap<>();
private final Pattern PROJECT_PATTERN = Pattern.compile("\\s+/?(.*)\\s+(\\(\\S+\\))$");
private final Config quotaConfig;
- private final TaskQuota.BuildInfo baseBuildInfo;
@Inject
public TaskQuotas(
@@ -56,7 +55,8 @@
poolSize += batchThreads;
}
int interactiveThreads = Math.max(1, poolSize - batchThreads);
- baseBuildInfo = new TaskQuota.BuildInfo(interactiveThreads, batchThreads);
+ QueueStats.initQueueWithCapacity(QueueStats.Queue.INTERACTIVE, interactiveThreads);
+ QueueStats.initQueueWithCapacity(QueueStats.Queue.BATCH, batchThreads);
initQuotas();
}
@@ -64,7 +64,7 @@
private void initQuotas() {
quotasByNamespace.putAll(
quotaFinder.getQuotaNamespaces(quotaConfig).stream()
- .collect(Collectors.toMap(Function.identity(), qs -> qs.getAllQuotas(baseBuildInfo))));
+ .collect(Collectors.toMap(Function.identity(), QuotaSection::getAllQuotas)));
}
@Override
@@ -81,12 +81,19 @@
})
.orElse(List.of());
+ QueueStats.Queue queue = QueueStats.Queue.fromKey(task.getQueueName());
+
+ if (!QueueStats.acquire(queue, 1)) {
+ return false;
+ }
+
List<TaskQuota> acquiredQuotas = new ArrayList<>();
for (TaskQuota quota : quotas) {
if (quota.isApplicable(task)) {
if (!quota.tryAcquire(task)) {
log.debug("Task [{}] will be parked due task quota rules", task);
acquiredQuotas.forEach(q -> q.release(task));
+ QueueStats.release(queue, 1);
return false;
}
acquiredQuotas.add(quota);
@@ -113,6 +120,7 @@
}
private void release(WorkQueue.Task<?> task) {
+ QueueStats.release(QueueStats.Queue.fromKey(task.getQueueName()), 1);
Optional.ofNullable(quotasByTask.remove(task.getTaskId()))
.ifPresent(quotas -> quotas.forEach(q -> q.release(task)));
}