Improve locking to handle stale locks

Locks are a mechanism to ensure exclusive usage of the locked
resource. Here we provide a Spanner-specific implementation of
locks which are used from the global-refdb module.

To ensure lock freshness we use heartbeats. Stale locks can be
reclaimed.

See Documentation/locks.md for more details.

Change-Id: Idbe3b457327d05fda2c2352bab2be27411346965
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
new file mode 100644
index 0000000..cb10160
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Lock.java
@@ -0,0 +1,304 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.spannerrefdb;
+
+import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.Key;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerException;
+import com.google.cloud.spanner.Statement;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.TransactionRunner;
+import com.google.cloud.spanner.Value;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/***
+ * Locks are a mechanism to ensure exclusive usage of the locked resource. Here we
+ * provide a Spanner-specific implementation of locks which are used from the
+ * global-refdb module, which is used to ensure refs are updated consistently
+ * across multiple Gerrit primary sites (see README.md).
+ *
+ * Persistent distributed Locks are used to guard distributed transactions
+ * spanning RefUpdates in a git repo of the involved Gerrit primary sites and
+ * corresponding updates of those refs in the global-refdb.
+ */
+public class Lock implements AutoCloseable {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  public interface LockFactory {
+    Lock create(@Assisted("projectName") String projectName, @Assisted("refName") String refName);
+  }
+
+  private static final Long SECONDS_FOR_STALE_HEARTBEAT = 30L;
+  private static final int HEARTBEAT_INTERVAL = 2;
+  private static final String RECLAIM_LOCK_PREFIX = "RECLAIM";
+  private final DatabaseClient dbClient;
+  private final String projectName;
+  private final String gerritInstanceId;
+  private final String refName;
+  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
+  private Timestamp token;
+
+  @Inject
+  Lock(
+      DatabaseClient dbClient,
+      @GerritInstanceId String gerritInstanceId,
+      @Assisted("projectName") String projectName,
+      @Assisted("refName") String refName) {
+    this.projectName = projectName;
+    this.refName = refName;
+    this.dbClient = dbClient;
+    this.gerritInstanceId = gerritInstanceId;
+  }
+
+  /**
+   * Attempt to lock a project/ref by creating a lock in the locks table. If the lock already
+   * exists, attempt to reclaim it.
+   *
+   * <p>Once the lock is successfully claimed, saves the token information and schedules a
+   * heartbeat.
+   *
+   * @throws GlobalRefDbLockException if there is a failure to claim the lock
+   */
+  public void tryLock() throws GlobalRefDbLockException {
+    try {
+      TransactionRunner transactionRunner = dbClient.readWriteTransaction();
+      transactionRunner.run(
+          transaction -> {
+            transaction.buffer(createLock());
+            return true;
+          });
+      token = transactionRunner.getCommitTimestamp();
+      scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
+      logger.atFine().log("Locked %s %s.", projectName, refName);
+    } catch (Exception e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof SpannerException) {
+        ErrorCode code = ((SpannerException) cause).getErrorCode();
+        if (code == ErrorCode.ALREADY_EXISTS) {
+          logger.atFine().log(
+              "Lock for %s %s already exists, attempting to reclaim.", projectName, refName);
+          if (reclaim()) {
+            return;
+          }
+        }
+      }
+      throw new GlobalRefDbLockException(projectName, refName, e);
+    }
+  }
+
+  /**
+   * Attempt to reclaim a lock that is already claimed by another process.
+   *
+   * <p>Checks for heartbeat staleness; if the heartbeat is not stale, does not proceed further. If
+   * the heartbeat is stale, ensure the lock still exists and has the expected token. Then insert a
+   * reclaim-lock to assert the reclamation process is in progress and update the old lock. This is
+   * all done in one transaction so that if any one step fails, the transaction is aborted. Clean up
+   * the reclaim-lock on success.
+   */
+  private boolean reclaim() {
+    TransactionRunner transactionRunner = dbClient.readWriteTransaction();
+
+    boolean success =
+        transactionRunner.run(
+            transaction -> {
+              Struct row =
+                  transaction.readRow(
+                      "locks", Key.of(projectName, refName), Arrays.asList("token", "heartbeat"));
+
+              if (row == null) {
+                // If the old lock doesn't exist any more, we create a new one
+                transaction.buffer(createLock());
+                return true;
+              }
+              String timestampDifferenceQuery =
+                  "SELECT TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), token, SECOND) AS seconds FROM locks"
+                      + " WHERE project = @project AND ref = @ref;";
+              ResultSet resultSet =
+                  transaction.executeQuery(
+                      Statement.newBuilder(timestampDifferenceQuery)
+                          .bind("project")
+                          .to(projectName)
+                          .bind("ref")
+                          .to(refName)
+                          .build());
+              if (resultSet.getLong("seconds") <= SECONDS_FOR_STALE_HEARTBEAT) {
+                // Check if the old lock is stale; if not, do nothing
+                return false;
+              }
+              Timestamp oldToken = row.getTimestamp("token");
+              transaction.buffer(createReclaimLock(oldToken));
+              transaction.buffer(updateReclaimedLock());
+              transaction.buffer(deleteReclaimLockForCleanup(oldToken));
+              return true;
+            });
+
+    if (success) {
+      token = transactionRunner.getCommitTimestamp();
+      scheduler.scheduleAtFixedRate(this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
+      logger.atFine().log("Reclaimed lock for %s %s.", projectName, refName);
+    }
+    return success;
+  }
+
+  private Mutation deleteReclaimLockForCleanup(Timestamp reclaimToken) {
+    return Mutation.delete(
+        "locks", KeySet.prefixRange(Key.of(reclaimLockName(reclaimToken), refName)));
+  }
+
+  /**
+   * Update the heartbeat timestamp of the current lock if it matches the expected token. If the
+   * tokens do not match, the lock has been reclaimed and a new heartbeat started, so this heartbeat
+   * ends.
+   *
+   * @throws GlobalRefDbLockException if the lock unexpectedly no longer exists but the heartbeat is
+   *     still ongoing
+   */
+  private void heartbeat() {
+    try {
+      boolean success =
+          dbClient
+              .readWriteTransaction()
+              .run(
+                  transaction -> {
+                    Struct row =
+                        transaction.readRow(
+                            "locks", Key.of(projectName, refName), Collections.singleton("token"));
+                    if (row == null) {
+                      logger.atWarning().log(
+                          "Lock %s %s (%s) not found", projectName, refName, token);
+                      return false;
+                    }
+                    if (!token.equals(row.getTimestamp("token"))) {
+                      logger.atWarning().log(
+                          "Lock for %s %s has unexpected token (%s), expecting (%s)",
+                          projectName, refName, row.getTimestamp("token"), token);
+                      return false;
+                    }
+                    transaction.buffer(updateHeartbeat());
+                    return true;
+                  });
+      if (!success) {
+        logger.atFine().log("Heartbeat stopping for lock %s %s (%s)", projectName, refName, token);
+        scheduler.shutdown();
+      }
+    } catch (Exception e) {
+      logger.atWarning().withCause(e).log(
+          "Heartbeat failed to update for lock %s %s (%s)", projectName, refName, token);
+    }
+  }
+
+  /** Close the lock, shutting down the heartbeat and deleting the lock from the locks table. */
+  @Override
+  public void close() {
+    scheduler.shutdownNow();
+
+    dbClient
+        .readWriteTransaction()
+        .run(
+            transaction -> {
+              transaction.executeUpdate(deleteLockStatement());
+              return true;
+            });
+  }
+
+  private String reclaimLockName(Timestamp reclaimToken) {
+    return String.format(
+        "%s/%s/%012d%09d",
+        RECLAIM_LOCK_PREFIX, projectName, reclaimToken.getSeconds(), reclaimToken.getNanos());
+  }
+
+  private Mutation createLock() {
+    return Mutation.newInsertBuilder("locks")
+        .set("project")
+        .to(projectName)
+        .set("ref")
+        .to(refName)
+        .set("heartbeat")
+        .to(Value.COMMIT_TIMESTAMP)
+        .set("token")
+        .to(Value.COMMIT_TIMESTAMP)
+        .set("owner")
+        .to(gerritInstanceId)
+        .build();
+  }
+
+  private Mutation createReclaimLock(Timestamp reclaimToken) {
+    return Mutation.newInsertBuilder("locks")
+        .set("project")
+        .to(reclaimLockName(reclaimToken))
+        .set("ref")
+        .to(refName)
+        .set("heartbeat")
+        .to(Value.COMMIT_TIMESTAMP)
+        .set("token")
+        .to(Value.COMMIT_TIMESTAMP)
+        .set("owner")
+        .to(gerritInstanceId)
+        .build();
+  }
+
+  private Mutation updateReclaimedLock() {
+    return Mutation.newUpdateBuilder("locks")
+        .set("project")
+        .to(projectName)
+        .set("ref")
+        .to(refName)
+        .set("heartbeat")
+        .to(Value.COMMIT_TIMESTAMP)
+        .set("token")
+        .to(Value.COMMIT_TIMESTAMP)
+        .set("owner")
+        .to(gerritInstanceId)
+        .build();
+  }
+
+  private Mutation updateHeartbeat() {
+    return Mutation.newUpdateBuilder("locks")
+        .set("project")
+        .to(projectName)
+        .set("ref")
+        .to(refName)
+        .set("heartbeat")
+        .to(Value.COMMIT_TIMESTAMP)
+        .build();
+  }
+
+  private Statement deleteLockStatement() {
+    return Statement.newBuilder(
+            "DELETE FROM locks WHERE project = @project AND ref = @ref AND token = @token")
+        .bind("project")
+        .to(projectName)
+        .bind("ref")
+        .to(refName)
+        .bind("token")
+        .to(token)
+        .build();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
index fc66666..df30ab7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/Module.java
@@ -23,6 +23,7 @@
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 
 class Module extends LifecycleModule {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -34,6 +35,10 @@
         .to(SpannerRefDatabase.class)
         .in(Scopes.SINGLETON);
     listener().to(SpannerLifeCycleManager.class);
+    install(
+        new FactoryModuleBuilder()
+            .implement(AutoCloseable.class, Lock.class)
+            .build(Lock.LockFactory.class));
   }
 
   @Provides
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerLifeCycleManager.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerLifeCycleManager.java
index 5a95276..9c80b63 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerLifeCycleManager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerLifeCycleManager.java
@@ -50,7 +50,14 @@
     createTable(
         "CREATE TABLE locks ("
             + "project  STRING(MAX) NOT NULL,"
-            + "ref  STRING(MAX) NOT NULL"
+            + "ref  STRING(MAX) NOT NULL,"
+            + "heartbeat TIMESTAMP OPTIONS ("
+            + "    allow_commit_timestamp = true"
+            + "),"
+            + "token TIMESTAMP OPTIONS ("
+            + "    allow_commit_timestamp = true"
+            + "),"
+            + "owner STRING(MAX) NOT NULL"
             + ") PRIMARY KEY (project, ref)");
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerRefDatabase.java
index 021bb0f..ad6655b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/spannerrefdb/SpannerRefDatabase.java
@@ -21,6 +21,7 @@
 import com.google.cloud.spanner.Key;
 import com.google.cloud.spanner.KeySet;
 import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.ReadOnlyTransaction;
 import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Statement;
 import com.google.cloud.spanner.Struct;
@@ -40,10 +41,12 @@
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final DatabaseClient dbClient;
+  private final Lock.LockFactory lockFactory;
 
   @Inject
-  SpannerRefDatabase(DatabaseClient dbClient) {
+  SpannerRefDatabase(DatabaseClient dbClient, Lock.LockFactory lockFactory) {
     this.dbClient = dbClient;
+    this.lockFactory = lockFactory;
   }
 
   @Override
@@ -125,6 +128,7 @@
                 }
                 return false;
               }
+
               // If the row is null, the row doesn't exist and will be created.
               // If the value is the expected value, the row should be updated.
               if (row == null || row.getString(0).equals(expectedValue)) {
@@ -143,49 +147,12 @@
             });
   }
 
-  public class Lock implements AutoCloseable {
-
-    private final String projectName;
-    private final String refName;
-
-    public Lock(String projectName, String refName) throws GlobalRefDbLockException {
-      // Attempt to create the lock here
-      this.projectName = projectName;
-      this.refName = refName;
-      try {
-        dbClient
-            .readWriteTransaction()
-            .run(
-                transaction -> {
-                  transaction.buffer(
-                      Mutation.newInsertBuilder("locks")
-                          .set("project")
-                          .to(projectName)
-                          .set("ref")
-                          .to(refName)
-                          .build());
-                  return true;
-                });
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log(
-            "Failed to acquire lock for %s %s", projectName, refName);
-        throw new GlobalRefDbLockException(projectName, refName, e);
-      }
-    }
-
-    @Override
-    public void close() throws Exception {
-      dbClient.write(
-          Collections.singletonList(Mutation.delete("locks", Key.of(projectName, refName))));
-    }
-  }
-
   @Override
   public AutoCloseable lockRef(Project.NameKey project, String refName)
       throws GlobalRefDbLockException {
-    logger.atInfo().log("Attempting to lock %s %s.", project.get(), refName);
-    // TODO: Some sort of heartbeat that removes stale locks if they haven't been
-    return new Lock(project.get(), refName);
+    Lock lock = lockFactory.create(project.get(), refName);
+    lock.tryLock();
+    return lock;
   }
 
   @Override
@@ -205,24 +172,21 @@
 
   @VisibleForTesting
   String get(Project.NameKey project, String refName) throws GlobalRefDbSystemError {
-    try (ResultSet resultSet =
-        dbClient
-            .singleUse()
-            .executeQuery(
-                Statement.newBuilder(
-                        "SELECT value FROM refs WHERE project = @project AND ref = @ref")
-                    .bind("project")
-                    .to(project.get())
-                    .bind("ref")
-                    .to(refName)
-                    .build())) {
+    try (ReadOnlyTransaction transaction = dbClient.readOnlyTransaction()) {
+      ResultSet resultSet =
+          transaction.executeQuery(
+              Statement.newBuilder("SELECT value FROM refs WHERE project = @project and ref = @ref")
+                  .bind("project")
+                  .to(project.get())
+                  .bind("ref")
+                  .to(refName)
+                  .build());
       if (resultSet.next()) {
         return resultSet.getString("value");
       }
       return null;
     } catch (Exception e) {
-      throw new GlobalRefDbSystemError(
-          String.format("Cannot get value for %s %s", project.get(), refName), e);
+      throw new GlobalRefDbSystemError(String.format("Cannot get value for %s", project.get()), e);
     }
   }
 
diff --git a/src/main/resources/Documentation/locks.md b/src/main/resources/Documentation/locks.md
new file mode 100644
index 0000000..47d1a95
--- /dev/null
+++ b/src/main/resources/Documentation/locks.md
@@ -0,0 +1,82 @@
+Locks
+=====
+
+Each entry of the locks table consists of a:
+
+| Name | Description | Type |
+| ------------- |:-------------|:-------------|
+`project` | Project name | `String`
+`ref` | Ref path | `String`
+`heartbeat` | Timestamp updated every few seconds to identify staleness | `Timestamp`
+`token` | Timestamp used to identify the lock version | `Timestamp`
+`owner` | Gerrit instance identifier | `String`
+
+The primary key is a composite of (project, ref).
+
+
+Design
+------
+
+When the global-refdb attempts to acquire a lock on a given entry, we first try
+to insert the lock into the locks table.
+
+If the insertion succeeds:
+* Set the Lock object's token value to match the token in the database
+    * The token is a unique identifier for the Lock which establishes versioning.
+* Start a new process to update the heartbeat value every few seconds
+    * The heartbeat ensures freshness of the lock and is used when deciding
+      whether or not a Lock is stale.
+
+If the insertion fails due to a lock on that project/ref already existing:
+* Check the timestamp of the existing lock.
+	* If it is sufficiently fresh, pass
+          on attempting to reclaim the lock and end here.
+	* If the timestamp is stale, attempt to reclaim the lock as described
+          below.
+
+Reclaiming an existing stale lock:
+* Check the existing token (now known as oldToken) of the lock
+* In one read/write transaction:
+  * Check that the token of the lock is still oldToken.
+  * Attempt to insert a meta reclaim-lock into the table to mark that the old
+    lock is being reclaimed.
+  * Refresh the old lock, issuing an update with our new heartbeat, token, and
+    owner.
+  * If any of those steps fail, our information is outdated and the attempt
+    will fail. As the read/write transaction is atomic, either every buffered
+    mutation succeeds or all fail.
+* If all succeed, the lock has been reclaimed and we set up the heartbeat and
+  clean up (deleting the meta reclaim-lock).
+
+Updating a heartbeat:
+* Check if the tokens of the in-database lock and the Lock object match.
+  * If so, update the heartbeat to the current timestamp.
+  * If not, shut down the heartbeat process; the lock has been reclaimed.
+
+When the lock is closed:
+* Shut down the heartbeat process
+* Check if the tokens of the in-database lock and the Lock object match.
+  * If so, delete the in-database lock.
+  * If not, stop - the lock has been reclaimed.
+
+Example
+-------
+Reclaim process:
+| Project | Ref | Heartbeat | Token | Description
+| --------|-----|:----------|:------|:-----------
+`project1` | `ref1` | `Stale timestamp` | `T1` | When a new process attempts to lock `project1-ref1`, it finds a stale heartbeat and can then insert a reclaim lock.
+`RECLAIM_project1_T1` | `ref1` | `Tx` | `Tx` | While this reclaim lock exists, no other process can succeed in the reclaim process.
+
+Result:
+| Project | Ref | Heartbeat | Token | Description
+| --------|-----|:----------|:------|:-----------
+`project1` | `ref1` | `Fresh timestamp` | `T2` | Once the RECLAIM_ lock has successfully been inserted, the old lock is then updated and now belongs to the new process. If another process now attempts to reclaim the original lock, it will find a fresh timestamp and new token, indicating the lock is not stale and cannot be reclaimed.
+
+Note:
+If a lock belonging to a frozen process is successfully reclaimed but the
+original owner resumes (i.e. after a long gc process), then its calling
+process may be unaware that it no longer holds the lock.
+
+Sources:
+* [Distributed locking](https://hazelcast.com/blog/long-live-distributed-locks/)
+* [Spindle, a distributed locking system built for Cloud Spanner in Go](https://github.com/flowerinthenight/spindle)
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
index 8b7fcfe..4eaaf84 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/spannerrefdb/EmulatedSpannerRefDb.java
@@ -78,7 +78,14 @@
     pluginConfig = createEmulatorConfiguration();
     createSchema();
     databaseClient = createDatabaseClient();
-    spannerRefDb = new SpannerRefDatabase(databaseClient);
+    Lock.LockFactory lockFactory =
+        new Lock.LockFactory() {
+          @Override
+          public Lock create(String projectName, String refName) {
+            return new Lock(databaseClient, "", projectName, refName);
+          }
+        };
+    spannerRefDb = new SpannerRefDatabase(databaseClient, lockFactory);
   }
 
   private void createSchema() {