Merge "Use a shared heartbeat thread pool, don't create one for each lock"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java
new file mode 100644
index 0000000..733ce8a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.spannerrefdb;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface HeartbeatExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java
new file mode 100644
index 0000000..c1ff592
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.spannerrefdb;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.concurrent.ScheduledExecutorService;
+
+@Singleton
+public class HeartbeatExecutorProvider
+ implements Provider<ScheduledExecutorService>, LifecycleListener {
+ private static final int DEFAULT_THREADS = 10;
+
+ private final WorkQueue workQueue;
+ private final String pluginName;
+
+ private ScheduledExecutorService executor;
+
+ @Inject
+ HeartbeatExecutorProvider(WorkQueue workQueue, @PluginName String pluginName) {
+ this.workQueue = workQueue;
+ this.pluginName = pluginName;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {
+ if (executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ @Override
+ public ScheduledExecutorService get() {
+ executor = workQueue.createQueue(DEFAULT_THREADS, pluginName + "-heartbeat");
+ return executor;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
index 72fef8d..20a921b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
@@ -33,8 +33,8 @@
import com.google.inject.assistedinject.Assisted;
import java.util.Arrays;
import java.util.Collections;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -59,20 +59,23 @@
private static final String RECLAIM_LOCK_PREFIX = "RECLAIM";
private final DatabaseClient dbClient;
private final String gerritInstanceId;
+ private ScheduledExecutorService heartbeatExecutor;
private final String projectName;
private final String refName;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private Timestamp token;
+ private ScheduledFuture<?> heartbeatTask;
@Inject
Lock(
DatabaseClient dbClient,
@GerritInstanceId String gerritInstanceId,
+ @HeartbeatExecutor ScheduledExecutorService heartbeatExecutor,
@Assisted("projectName") String projectName,
@Assisted("refName") String refName) {
this.dbClient = dbClient;
this.gerritInstanceId = gerritInstanceId;
+ this.heartbeatExecutor = heartbeatExecutor;
this.projectName = projectName;
this.refName = refName;
}
@@ -86,6 +89,7 @@
*
* @throws GlobalRefDbLockException if there is a failure to claim the lock
*/
+ @SuppressWarnings("FutureReturnValueIgnored")
public void tryLock() throws GlobalRefDbLockException {
try {
TransactionRunner transactionRunner = dbClient.readWriteTransaction();
@@ -95,7 +99,9 @@
return true;
});
token = transactionRunner.getCommitTimestamp();
- scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
+ heartbeatTask =
+ heartbeatExecutor.scheduleAtFixedRate(
+ this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
logger.atFine().log("Locked %s %s.", projectName, refName);
} catch (Exception e) {
Throwable cause = e.getCause();
@@ -122,6 +128,7 @@
* all done in one transaction so that if any one step fails, the transaction is aborted. Clean up
* the reclaim-lock on success.
*/
+ @SuppressWarnings("FutureReturnValueIgnored")
private boolean reclaim() {
TransactionRunner transactionRunner = dbClient.readWriteTransaction();
@@ -161,7 +168,8 @@
if (success) {
token = transactionRunner.getCommitTimestamp();
- scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
+ heartbeatExecutor.scheduleAtFixedRate(
+ this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
logger.atFine().log("Reclaimed lock for %s %s.", projectName, refName);
}
return success;
@@ -206,7 +214,7 @@
});
if (!success) {
logger.atFine().log("Heartbeat stopping for lock %s %s (%s)", projectName, refName, token);
- scheduler.shutdown();
+ cancelHeartbeatTask();
}
} catch (Exception e) {
logger.atWarning().withCause(e).log(
@@ -217,7 +225,7 @@
/** Close the lock, shutting down the heartbeat and deleting the lock from the locks table. */
@Override
public void close() {
- scheduler.shutdownNow();
+ cancelHeartbeatTask();
dbClient
.readWriteTransaction()
@@ -228,6 +236,12 @@
});
}
+ private void cancelHeartbeatTask() {
+ if (heartbeatTask != null) {
+ heartbeatTask.cancel(false);
+ }
+ }
+
private String reclaimLockName(Timestamp reclaimToken) {
return String.format(
"%s/%s/%012d%09d",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
index 7216d5c..bffb112 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
@@ -32,6 +32,7 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jgit.lib.Config;
class Module extends LifecycleModule {
@@ -52,6 +53,11 @@
.in(Scopes.SINGLETON);
listener().to(SpannerLifeCycleManager.class);
install(new FactoryModuleBuilder().build(Lock.Factory.class));
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(HeartbeatExecutor.class)
+ .toProvider(HeartbeatExecutorProvider.class)
+ .in(Scopes.SINGLETON);
+ listener().to(HeartbeatExecutorProvider.class);
}
@Provides
diff --git a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
index 4288641..70adc77 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
@@ -27,6 +27,9 @@
import com.google.cloud.spanner.SpannerOptions;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.testcontainers.containers.SpannerEmulatorContainer;
import org.testcontainers.utility.DockerImageName;
@@ -46,6 +49,7 @@
private final Database spannerDatabase;
private final DatabaseId dbId;
private final DatabaseClient databaseClient;
+ private final ScheduledExecutorService heartbeatExecutor;
private final SpannerRefDatabase spannerRefDb;
public EmulatedSpannerRefDb() throws Exception {
@@ -67,11 +71,12 @@
spannerInstance.createDatabase(SPANNER_DATABASE_ID, Collections.emptyList()).get();
createSchema();
databaseClient = createDatabaseClient();
+ heartbeatExecutor = Executors.newScheduledThreadPool(2);
Lock.Factory lockFactory =
new Lock.Factory() {
@Override
public Lock create(String projectName, String refName) {
- return new Lock(databaseClient, "", projectName, refName);
+ return new Lock(databaseClient, "", heartbeatExecutor, projectName, refName);
}
};
spannerRefDb = new SpannerRefDatabase(databaseClient, lockFactory);
@@ -83,6 +88,12 @@
}
public void cleanup() {
+ heartbeatExecutor.shutdownNow();
+ try {
+ heartbeatExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
spannerDatabase.drop();
instanceAdminClient.deleteInstance(SPANNER_INSTANCE_ID);
spanner.close();