Merge "Share the threads/permits for soft maxes"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/QueueStats.java b/src/main/java/com/googlesource/gerrit/plugins/quota/QueueStats.java
new file mode 100644
index 0000000..ebf2f8a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/QueueStats.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2025 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.quota;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class QueueStats {
+ public enum Queue {
+ INTERACTIVE("SSH-Interactive-Worker"),
+ BATCH("SSH-Batch-Worker"),
+ UNKNOWN("UNKNOWN");
+
+ private final String name;
+
+ Queue(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static String[] keys() {
+ return Arrays.stream(Queue.values()).map(Queue::getName).toArray(String[]::new);
+ }
+
+ public static Queue fromKey(String key) {
+ for (Queue q : Queue.values()) {
+ if (q.getName().equals(key)) {
+ return q;
+ }
+ }
+ return UNKNOWN;
+ }
+ }
+
+ public static Map<Queue, AtomicInteger> availableThreadsPerQueue = new HashMap<>();
+
+ public static void initQueueWithCapacity(Queue q, int c) {
+ availableThreadsPerQueue.put(q, new AtomicInteger(c));
+ }
+
+ public static boolean acquire(Queue q, int c) {
+ AtomicInteger available = availableThreadsPerQueue.get(q);
+ if (available == null) {
+ return true;
+ }
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ available.updateAndGet(
+ current -> {
+ if (current < c) {
+ success.setPlain(false);
+ return current;
+ }
+ success.setPlain(true);
+ return current - c;
+ });
+ return success.getPlain();
+ }
+
+ public static void release(Queue q, int c) {
+ AtomicInteger available = availableThreadsPerQueue.get(q);
+ if (available != null) {
+ available.addAndGet(c);
+ }
+ }
+
+ public static boolean ensureIdle(Queue q, int c) {
+ AtomicInteger available = availableThreadsPerQueue.get(q);
+ if (available == null) {
+ return true;
+ }
+
+ return available.get() > c;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java b/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java
index f70f24a..a1be14b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/QuotaSection.java
@@ -62,12 +62,11 @@
return cfg.getLong(QUOTA, namespace, KEY_MAX_TOTAL_SIZE, Long.MAX_VALUE);
}
- public List<TaskQuota> getAllQuotas(TaskQuota.BuildInfo buildInfo) {
+ public List<TaskQuota> getAllQuotas() {
return Arrays.stream(TaskQuotaKeys.values())
.flatMap(
type ->
Arrays.stream(cfg.getStringList(QUOTA, namespace, type.key))
- .map(buildInfo::generateWithCfg)
.map(type.processor)
.flatMap(Optional::stream))
.toList();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java b/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java
index 60d2d84..c2d623e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/SoftMaxPerUserForQueue.java
@@ -17,37 +17,28 @@
import static com.googlesource.gerrit.plugins.quota.TaskParser.user;
import com.google.gerrit.server.git.WorkQueue;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class SoftMaxPerUserForQueue extends TaskQuota {
- public static final Map<String, Function<BuildInfo, Integer>> QUEUES =
- Map.of(
- "SSH-Interactive-Worker",
- BuildInfo::interactiveThreads,
- "SSH-Batch-Worker",
- BuildInfo::batchThreads);
+public class SoftMaxPerUserForQueue implements TaskQuota {
public static final Pattern CONFIG_PATTERN =
- Pattern.compile("(\\d+)\\s+(" + String.join("|", QUEUES.keySet()) + ")");
+ Pattern.compile("(\\d+)\\s+(" + String.join("|", QueueStats.Queue.keys()) + ")");
private final int softMax;
- private final String queueName;
+ private final QueueStats.Queue queue;
private final ConcurrentHashMap<String, Integer> taskStartedCountByUser =
new ConcurrentHashMap<>();
- public SoftMaxPerUserForQueue(int maxPermits, int softMax, String queueName) {
- super(maxPermits);
+ public SoftMaxPerUserForQueue(int softMax, String queueName) {
this.softMax = softMax;
- this.queueName = queueName;
+ this.queue = QueueStats.Queue.fromKey(queueName);
}
@Override
public boolean isApplicable(WorkQueue.Task<?> task) {
- return task.getQueueName().equals(queueName);
+ return task.getQueueName().equals(queue.getName());
}
@Override
@@ -60,14 +51,9 @@
user,
(key, val) -> {
int runningTasks = (val != null) ? val : 0;
- boolean overSoftLimit = runningTasks >= softMax;
- int permitCost = overSoftLimit ? 2 : 1;
- if (permits.tryAcquire(permitCost)) {
+ if (runningTasks < softMax || QueueStats.ensureIdle(queue, 1)) {
acquired.setPlain(true);
- if (overSoftLimit) {
- permits.release(1);
- }
++runningTasks;
}
return runningTasks;
@@ -85,19 +71,15 @@
taskStartedCountByUser.computeIfPresent(
user,
(u, tasks) -> {
- permits.release(1);
return tasks == 1 ? null : --tasks;
}));
}
- public static Optional<TaskQuota> build(BuildInfo buildInfo) {
- Matcher matcher = CONFIG_PATTERN.matcher(buildInfo.config());
+ public static Optional<TaskQuota> build(String cfg) {
+ Matcher matcher = CONFIG_PATTERN.matcher(cfg);
return matcher.find()
? Optional.of(
- new SoftMaxPerUserForQueue(
- QUEUES.get(matcher.group(2)).apply(buildInfo),
- Integer.parseInt(matcher.group(1)),
- matcher.group(2)))
+ new SoftMaxPerUserForQueue(Integer.parseInt(matcher.group(1)), matcher.group(2)))
: Optional.empty();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java
index ab11138..1e689a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuota.java
@@ -15,35 +15,11 @@
package com.googlesource.gerrit.plugins.quota;
import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.Semaphore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public abstract class TaskQuota {
- protected static final Logger log = LoggerFactory.getLogger(TaskQuota.class);
- protected final Semaphore permits;
+public interface TaskQuota {
+ boolean isApplicable(WorkQueue.Task<?> task);
- public TaskQuota(int maxPermits) {
- this.permits = new Semaphore(maxPermits);
- }
+ boolean tryAcquire(WorkQueue.Task<?> task);
- public abstract boolean isApplicable(WorkQueue.Task<?> task);
-
- public boolean tryAcquire(WorkQueue.Task<?> task) {
- return permits.tryAcquire();
- }
-
- public void release(WorkQueue.Task<?> task) {
- permits.release();
- }
-
- public record BuildInfo(String config, int interactiveThreads, int batchThreads) {
- public BuildInfo(int interactiveThreads, int batchThreads) {
- this("", interactiveThreads, batchThreads);
- }
-
- public BuildInfo generateWithCfg(String cfg) {
- return new BuildInfo(cfg, interactiveThreads, batchThreads);
- }
- }
+ void release(WorkQueue.Task<?> task);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java
index 14f74ac..2d15abe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTask.java
@@ -18,7 +18,7 @@
import java.util.Map;
import java.util.Set;
-public abstract class TaskQuotaForTask extends TaskQuota {
+public abstract class TaskQuotaForTask extends TaskQuotaWithPermits {
protected static final Map<String, Set<String>> SUPPORTED_TASKS_BY_GROUP =
Map.of("uploadpack", Set.of("git-upload-pack"), "receivepack", Set.of("git-receive-pack"));
private final String taskGroup;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java
index 69ef8bf..f8ae3cc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueue.java
@@ -18,8 +18,11 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TaskQuotaForTaskForQueue extends TaskQuotaForTask {
+ public static final Logger log = LoggerFactory.getLogger(TaskQuotaForTaskForQueue.class);
public static final Pattern CONFIG_PATTERN =
Pattern.compile(
"(\\d+)\\s+(" + String.join("|", SUPPORTED_TASKS_BY_GROUP.keySet()) + ")\\s+(.+)");
@@ -35,14 +38,14 @@
return super.isApplicable(task) && task.getQueueName().equals(queueName);
}
- public static Optional<TaskQuota> build(BuildInfo buildInfo) {
- Matcher matcher = CONFIG_PATTERN.matcher(buildInfo.config());
+ public static Optional<TaskQuota> build(String cfg) {
+ Matcher matcher = CONFIG_PATTERN.matcher(cfg);
if (matcher.matches()) {
return Optional.of(
new TaskQuotaForTaskForQueue(
matcher.group(3), matcher.group(2), Integer.parseInt(matcher.group(1))));
} else {
- log.error("Invalid configuration entry [{}]", buildInfo.config());
+ log.error("Invalid configuration entry [{}]", cfg);
return Optional.empty();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java
index eb64ae5..7582619 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaForTaskForQueueForUser.java
@@ -18,8 +18,11 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TaskQuotaForTaskForQueueForUser extends TaskQuotaForTaskForQueue {
+ public static final Logger log = LoggerFactory.getLogger(TaskQuotaForTaskForQueueForUser.class);
public static final Pattern CONFIG_PATTERN =
Pattern.compile(
"(\\d+)\\s+("
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java
index 469441d..1d840e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaKeys.java
@@ -26,9 +26,9 @@
SOFT_MAX_START_FOR_QUEUE_PER_USER("softMaxStartPerUserForQueue", SoftMaxPerUserForQueue::build);
public final String key;
- public final Function<TaskQuota.BuildInfo, Optional<TaskQuota>> processor;
+ public final Function<String, Optional<TaskQuota>> processor;
- TaskQuotaKeys(String key, Function<TaskQuota.BuildInfo, Optional<TaskQuota>> processor) {
+ TaskQuotaKeys(String key, Function<String, Optional<TaskQuota>> processor) {
this.key = key;
this.processor = processor;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java
index 311b365..783045f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaPerUserForTaskForQueue.java
@@ -17,8 +17,11 @@
import com.google.gerrit.server.git.WorkQueue;
import java.util.Optional;
import java.util.regex.Matcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TaskQuotaPerUserForTaskForQueue extends TaskQuotaForTaskForQueue {
+ public static final Logger log = LoggerFactory.getLogger(TaskQuotaPerUserForTaskForQueue.class);
private final PerUserTaskQuota perUserTaskQuota;
public TaskQuotaPerUserForTaskForQueue(String queue, String taskGroup, int maxStart) {
@@ -36,14 +39,14 @@
perUserTaskQuota.release(task);
}
- public static Optional<TaskQuota> build(BuildInfo buildInfo) {
- Matcher matcher = CONFIG_PATTERN.matcher(buildInfo.config());
+ public static Optional<TaskQuota> build(String cfg) {
+ Matcher matcher = CONFIG_PATTERN.matcher(cfg);
if (matcher.matches()) {
return Optional.of(
new TaskQuotaPerUserForTaskForQueue(
matcher.group(3), matcher.group(2), Integer.parseInt(matcher.group(1))));
} else {
- log.error("Invalid configuration entry [{}]", buildInfo.config());
+ log.error("Invalid configuration entry [{}]", cfg);
return Optional.empty();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaWithPermits.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaWithPermits.java
new file mode 100644
index 0000000..8a8bc94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotaWithPermits.java
@@ -0,0 +1,20 @@
+package com.googlesource.gerrit.plugins.quota;
+
+import com.google.gerrit.server.git.WorkQueue;
+import java.util.concurrent.Semaphore;
+
+public abstract class TaskQuotaWithPermits implements TaskQuota {
+ protected final Semaphore permits;
+
+ public TaskQuotaWithPermits(int maxPermits) {
+ this.permits = new Semaphore(maxPermits);
+ }
+
+ public boolean tryAcquire(WorkQueue.Task<?> task) {
+ return permits.tryAcquire();
+ }
+
+ public void release(WorkQueue.Task<?> task) {
+ permits.release();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java
index 1b776d6..a4af200 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/quota/TaskQuotas.java
@@ -38,7 +38,6 @@
private final Map<QuotaSection, List<TaskQuota>> quotasByNamespace = new HashMap<>();
private final Pattern PROJECT_PATTERN = Pattern.compile("\\s+/?(.*)\\s+(\\(\\S+\\))$");
private final Config quotaConfig;
- private final TaskQuota.BuildInfo baseBuildInfo;
@Inject
public TaskQuotas(
@@ -56,7 +55,8 @@
poolSize += batchThreads;
}
int interactiveThreads = Math.max(1, poolSize - batchThreads);
- baseBuildInfo = new TaskQuota.BuildInfo(interactiveThreads, batchThreads);
+ QueueStats.initQueueWithCapacity(QueueStats.Queue.INTERACTIVE, interactiveThreads);
+ QueueStats.initQueueWithCapacity(QueueStats.Queue.BATCH, batchThreads);
initQuotas();
}
@@ -64,7 +64,7 @@
private void initQuotas() {
quotasByNamespace.putAll(
quotaFinder.getQuotaNamespaces(quotaConfig).stream()
- .collect(Collectors.toMap(Function.identity(), qs -> qs.getAllQuotas(baseBuildInfo))));
+ .collect(Collectors.toMap(Function.identity(), QuotaSection::getAllQuotas)));
}
@Override
@@ -81,12 +81,19 @@
})
.orElse(List.of());
+ QueueStats.Queue queue = QueueStats.Queue.fromKey(task.getQueueName());
+
+ if (!QueueStats.acquire(queue, 1)) {
+ return false;
+ }
+
List<TaskQuota> acquiredQuotas = new ArrayList<>();
for (TaskQuota quota : quotas) {
if (quota.isApplicable(task)) {
if (!quota.tryAcquire(task)) {
log.debug("Task [{}] will be parked due task quota rules", task);
acquiredQuotas.forEach(q -> q.release(task));
+ QueueStats.release(queue, 1);
return false;
}
acquiredQuotas.add(quota);
@@ -113,6 +120,7 @@
}
private void release(WorkQueue.Task<?> task) {
+ QueueStats.release(QueueStats.Queue.fromKey(task.getQueueName()), 1);
Optional.ofNullable(quotasByTask.remove(task.getTaskId()))
.ifPresent(quotas -> quotas.forEach(q -> q.release(task)));
}