Use a shared heartbeat thread pool, don't create one for each lock Before this change, each instance of the Lock class created a thread pool of size one to run the heartbeat task. This didn't scale well as the number of threads needed to maintain the heartbeat of N locks was N. There are Git operations which lock a large number of refs and this would have resulted in the large number of threads maintaining heartbeat for each lock. Now we use a dedicated thread pool of hardcoded size 10. We may make this size configurable in the future if we see a need for that. Change-Id: Ia657a7a7ab80c5124bba40981e79c281e7381f53
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java new file mode 100644 index 0000000..733ce8a --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java
@@ -0,0 +1,24 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.spannerrefdb; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import com.google.inject.BindingAnnotation; +import java.lang.annotation.Retention; + +@Retention(RUNTIME) +@BindingAnnotation +public @interface HeartbeatExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java new file mode 100644 index 0000000..c1ff592 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java
@@ -0,0 +1,56 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.spannerrefdb; + +import com.google.gerrit.extensions.annotations.PluginName; +import com.google.gerrit.extensions.events.LifecycleListener; +import com.google.gerrit.server.git.WorkQueue; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import java.util.concurrent.ScheduledExecutorService; + +@Singleton +public class HeartbeatExecutorProvider + implements Provider<ScheduledExecutorService>, LifecycleListener { + private static final int DEFAULT_THREADS = 10; + + private final WorkQueue workQueue; + private final String pluginName; + + private ScheduledExecutorService executor; + + @Inject + HeartbeatExecutorProvider(WorkQueue workQueue, @PluginName String pluginName) { + this.workQueue = workQueue; + this.pluginName = pluginName; + } + + @Override + public void start() {} + + @Override + public void stop() { + if (executor != null) { + executor.shutdown(); + } + } + + @Override + public ScheduledExecutorService get() { + executor = workQueue.createQueue(DEFAULT_THREADS, pluginName + "-heartbeat"); + return executor; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java index 72fef8d..20a921b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java +++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
@@ -33,8 +33,8 @@ import com.google.inject.assistedinject.Assisted; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -59,20 +59,23 @@ private static final String RECLAIM_LOCK_PREFIX = "RECLAIM"; private final DatabaseClient dbClient; private final String gerritInstanceId; + private ScheduledExecutorService heartbeatExecutor; private final String projectName; private final String refName; - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private Timestamp token; + private ScheduledFuture<?> heartbeatTask; @Inject Lock( DatabaseClient dbClient, @GerritInstanceId String gerritInstanceId, + @HeartbeatExecutor ScheduledExecutorService heartbeatExecutor, @Assisted("projectName") String projectName, @Assisted("refName") String refName) { this.dbClient = dbClient; this.gerritInstanceId = gerritInstanceId; + this.heartbeatExecutor = heartbeatExecutor; this.projectName = projectName; this.refName = refName; } @@ -86,6 +89,7 @@ * * @throws GlobalRefDbLockException if there is a failure to claim the lock */ + @SuppressWarnings("FutureReturnValueIgnored") public void tryLock() throws GlobalRefDbLockException { try { TransactionRunner transactionRunner = dbClient.readWriteTransaction(); @@ -95,7 +99,9 @@ return true; }); token = transactionRunner.getCommitTimestamp(); - scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); + heartbeatTask = + heartbeatExecutor.scheduleAtFixedRate( + this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); logger.atFine().log("Locked %s %s.", projectName, refName); } catch (Exception e) { Throwable cause = e.getCause(); @@ -122,6 +128,7 @@ * all done in one transaction so that if any one step fails, the transaction is aborted. Clean up * the reclaim-lock on success. */ + @SuppressWarnings("FutureReturnValueIgnored") private boolean reclaim() { TransactionRunner transactionRunner = dbClient.readWriteTransaction(); @@ -161,7 +168,8 @@ if (success) { token = transactionRunner.getCommitTimestamp(); - scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); + heartbeatExecutor.scheduleAtFixedRate( + this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); logger.atFine().log("Reclaimed lock for %s %s.", projectName, refName); } return success; @@ -206,7 +214,7 @@ }); if (!success) { logger.atFine().log("Heartbeat stopping for lock %s %s (%s)", projectName, refName, token); - scheduler.shutdown(); + cancelHeartbeatTask(); } } catch (Exception e) { logger.atWarning().withCause(e).log( @@ -217,7 +225,7 @@ /** Close the lock, shutting down the heartbeat and deleting the lock from the locks table. */ @Override public void close() { - scheduler.shutdownNow(); + cancelHeartbeatTask(); dbClient .readWriteTransaction() @@ -228,6 +236,12 @@ }); } + private void cancelHeartbeatTask() { + if (heartbeatTask != null) { + heartbeatTask.cancel(false); + } + } + private String reclaimLockName(Timestamp reclaimToken) { return String.format( "%s/%s/%012d%09d",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java index 7216d5c..bffb112 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java +++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
@@ -32,6 +32,7 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jgit.lib.Config; class Module extends LifecycleModule { @@ -52,6 +53,11 @@ .in(Scopes.SINGLETON); listener().to(SpannerLifeCycleManager.class); install(new FactoryModuleBuilder().build(Lock.Factory.class)); + bind(ScheduledExecutorService.class) + .annotatedWith(HeartbeatExecutor.class) + .toProvider(HeartbeatExecutorProvider.class) + .in(Scopes.SINGLETON); + listener().to(HeartbeatExecutorProvider.class); } @Provides
diff --git a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java index 4288641..70adc77 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java +++ b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
@@ -27,6 +27,9 @@ import com.google.cloud.spanner.SpannerOptions; import java.util.Collections; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.junit.Ignore; import org.testcontainers.containers.SpannerEmulatorContainer; import org.testcontainers.utility.DockerImageName; @@ -46,6 +49,7 @@ private final Database spannerDatabase; private final DatabaseId dbId; private final DatabaseClient databaseClient; + private final ScheduledExecutorService heartbeatExecutor; private final SpannerRefDatabase spannerRefDb; public EmulatedSpannerRefDb() throws Exception { @@ -67,11 +71,12 @@ spannerInstance.createDatabase(SPANNER_DATABASE_ID, Collections.emptyList()).get(); createSchema(); databaseClient = createDatabaseClient(); + heartbeatExecutor = Executors.newScheduledThreadPool(2); Lock.Factory lockFactory = new Lock.Factory() { @Override public Lock create(String projectName, String refName) { - return new Lock(databaseClient, "", projectName, refName); + return new Lock(databaseClient, "", heartbeatExecutor, projectName, refName); } }; spannerRefDb = new SpannerRefDatabase(databaseClient, lockFactory); @@ -83,6 +88,12 @@ } public void cleanup() { + heartbeatExecutor.shutdownNow(); + try { + heartbeatExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } spannerDatabase.drop(); instanceAdminClient.deleteInstance(SPANNER_INSTANCE_ID); spanner.close();