Merge "Use a shared heartbeat thread pool, don't create one for each lock"
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java
new file mode 100644
index 0000000..733ce8a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.spannerrefdb;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface HeartbeatExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java
new file mode 100644
index 0000000..c1ff592
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/HeartbeatExecutorProvider.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.spannerrefdb;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.concurrent.ScheduledExecutorService;
+
+@Singleton
+public class HeartbeatExecutorProvider
+    implements Provider<ScheduledExecutorService>, LifecycleListener {
+  private static final int DEFAULT_THREADS = 10;
+
+  private final WorkQueue workQueue;
+  private final String pluginName;
+
+  private ScheduledExecutorService executor;
+
+  @Inject
+  HeartbeatExecutorProvider(WorkQueue workQueue, @PluginName String pluginName) {
+    this.workQueue = workQueue;
+    this.pluginName = pluginName;
+  }
+
+  @Override
+  public void start() {}
+
+  @Override
+  public void stop() {
+    if (executor != null) {
+      executor.shutdown();
+    }
+  }
+
+  @Override
+  public ScheduledExecutorService get() {
+    executor = workQueue.createQueue(DEFAULT_THREADS, pluginName + "-heartbeat");
+    return executor;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
index 72fef8d..20a921b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
@@ -33,8 +33,8 @@
 import com.google.inject.assistedinject.Assisted;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -59,20 +59,23 @@
   private static final String RECLAIM_LOCK_PREFIX = "RECLAIM";
   private final DatabaseClient dbClient;
   private final String gerritInstanceId;
+  private ScheduledExecutorService heartbeatExecutor;
   private final String projectName;
   private final String refName;
-  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
 
   private Timestamp token;
+  private ScheduledFuture<?> heartbeatTask;
 
   @Inject
   Lock(
       DatabaseClient dbClient,
       @GerritInstanceId String gerritInstanceId,
+      @HeartbeatExecutor ScheduledExecutorService heartbeatExecutor,
       @Assisted("projectName") String projectName,
       @Assisted("refName") String refName) {
     this.dbClient = dbClient;
     this.gerritInstanceId = gerritInstanceId;
+    this.heartbeatExecutor = heartbeatExecutor;
     this.projectName = projectName;
     this.refName = refName;
   }
@@ -86,6 +89,7 @@
    *
    * @throws GlobalRefDbLockException if there is a failure to claim the lock
    */
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void tryLock() throws GlobalRefDbLockException {
     try {
       TransactionRunner transactionRunner = dbClient.readWriteTransaction();
@@ -95,7 +99,9 @@
             return true;
           });
       token = transactionRunner.getCommitTimestamp();
-      scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
+      heartbeatTask =
+          heartbeatExecutor.scheduleAtFixedRate(
+              this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
       logger.atFine().log("Locked %s %s.", projectName, refName);
     } catch (Exception e) {
       Throwable cause = e.getCause();
@@ -122,6 +128,7 @@
    * all done in one transaction so that if any one step fails, the transaction is aborted. Clean up
    * the reclaim-lock on success.
    */
+  @SuppressWarnings("FutureReturnValueIgnored")
   private boolean reclaim() {
     TransactionRunner transactionRunner = dbClient.readWriteTransaction();
 
@@ -161,7 +168,8 @@
 
     if (success) {
       token = transactionRunner.getCommitTimestamp();
-      scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
+      heartbeatExecutor.scheduleAtFixedRate(
+          this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
       logger.atFine().log("Reclaimed lock for %s %s.", projectName, refName);
     }
     return success;
@@ -206,7 +214,7 @@
                   });
       if (!success) {
         logger.atFine().log("Heartbeat stopping for lock %s %s (%s)", projectName, refName, token);
-        scheduler.shutdown();
+        cancelHeartbeatTask();
       }
     } catch (Exception e) {
       logger.atWarning().withCause(e).log(
@@ -217,7 +225,7 @@
   /** Close the lock, shutting down the heartbeat and deleting the lock from the locks table. */
   @Override
   public void close() {
-    scheduler.shutdownNow();
+    cancelHeartbeatTask();
 
     dbClient
         .readWriteTransaction()
@@ -228,6 +236,12 @@
             });
   }
 
+  private void cancelHeartbeatTask() {
+    if (heartbeatTask != null) {
+      heartbeatTask.cancel(false);
+    }
+  }
+
   private String reclaimLockName(Timestamp reclaimToken) {
     return String.format(
         "%s/%s/%012d%09d",
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
index 7216d5c..bffb112 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
@@ -32,6 +32,7 @@
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import org.eclipse.jgit.lib.Config;
 
 class Module extends LifecycleModule {
@@ -52,6 +53,11 @@
         .in(Scopes.SINGLETON);
     listener().to(SpannerLifeCycleManager.class);
     install(new FactoryModuleBuilder().build(Lock.Factory.class));
+    bind(ScheduledExecutorService.class)
+        .annotatedWith(HeartbeatExecutor.class)
+        .toProvider(HeartbeatExecutorProvider.class)
+        .in(Scopes.SINGLETON);
+    listener().to(HeartbeatExecutorProvider.class);
   }
 
   @Provides
diff --git a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
index 4288641..70adc77 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
@@ -27,6 +27,9 @@
 import com.google.cloud.spanner.SpannerOptions;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.junit.Ignore;
 import org.testcontainers.containers.SpannerEmulatorContainer;
 import org.testcontainers.utility.DockerImageName;
@@ -46,6 +49,7 @@
   private final Database spannerDatabase;
   private final DatabaseId dbId;
   private final DatabaseClient databaseClient;
+  private final ScheduledExecutorService heartbeatExecutor;
   private final SpannerRefDatabase spannerRefDb;
 
   public EmulatedSpannerRefDb() throws Exception {
@@ -67,11 +71,12 @@
         spannerInstance.createDatabase(SPANNER_DATABASE_ID, Collections.emptyList()).get();
     createSchema();
     databaseClient = createDatabaseClient();
+    heartbeatExecutor = Executors.newScheduledThreadPool(2);
     Lock.Factory lockFactory =
         new Lock.Factory() {
           @Override
           public Lock create(String projectName, String refName) {
-            return new Lock(databaseClient, "", projectName, refName);
+            return new Lock(databaseClient, "", heartbeatExecutor, projectName, refName);
           }
         };
     spannerRefDb = new SpannerRefDatabase(databaseClient, lockFactory);
@@ -83,6 +88,12 @@
   }
 
   public void cleanup() {
+    heartbeatExecutor.shutdownNow();
+    try {
+      heartbeatExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
     spannerDatabase.drop();
     instanceAdminClient.deleteInstance(SPANNER_INSTANCE_ID);
     spanner.close();