| // Copyright (C) 2023 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.googlesource.gerrit.plugins.spannerrefdb; |
| |
| import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException; |
| import com.google.cloud.Timestamp; |
| import com.google.cloud.spanner.DatabaseClient; |
| import com.google.cloud.spanner.ErrorCode; |
| import com.google.cloud.spanner.Key; |
| import com.google.cloud.spanner.KeySet; |
| import com.google.cloud.spanner.Mutation; |
| import com.google.cloud.spanner.ResultSet; |
| import com.google.cloud.spanner.SpannerException; |
| import com.google.cloud.spanner.Statement; |
| import com.google.cloud.spanner.Struct; |
| import com.google.cloud.spanner.TransactionRunner; |
| import com.google.cloud.spanner.Value; |
| import com.google.common.flogger.FluentLogger; |
| import com.google.gerrit.server.config.GerritInstanceId; |
| import com.google.inject.Inject; |
| import com.google.inject.assistedinject.Assisted; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * * Locks are a mechanism to ensure exclusive usage of the locked resource. Here we provide a |
| * Spanner-specific implementation of locks which are used from the global-refdb module, which is |
| * used to ensure refs are updated consistently across multiple Gerrit primary sites (see |
| * README.md). |
| * |
| * <p>Persistent distributed Locks are used to guard distributed transactions spanning RefUpdates in |
| * a git repo of the involved Gerrit primary sites and corresponding updates of those refs in the |
| * global-refdb. |
| */ |
| public class Lock implements AutoCloseable { |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| |
| public interface Factory { |
| Lock create(@Assisted("projectName") String projectName, @Assisted("refName") String refName); |
| } |
| |
| private static final Long SECONDS_FOR_STALE_HEARTBEAT = 30L; |
| private static final int HEARTBEAT_INTERVAL = 2; |
| private static final String RECLAIM_LOCK_PREFIX = "RECLAIM"; |
| private final DatabaseClient dbClient; |
| private final String gerritInstanceId; |
| private ScheduledExecutorService heartbeatExecutor; |
| private final String projectName; |
| private final String refName; |
| |
| private Timestamp token; |
| private ScheduledFuture<?> heartbeatTask; |
| |
| @Inject |
| Lock( |
| DatabaseClient dbClient, |
| @GerritInstanceId String gerritInstanceId, |
| @HeartbeatExecutor ScheduledExecutorService heartbeatExecutor, |
| @Assisted("projectName") String projectName, |
| @Assisted("refName") String refName) { |
| this.dbClient = dbClient; |
| this.gerritInstanceId = gerritInstanceId; |
| this.heartbeatExecutor = heartbeatExecutor; |
| this.projectName = projectName; |
| this.refName = refName; |
| } |
| |
| /** |
| * Attempt to lock a project/ref by creating a lock in the locks table. If the lock already |
| * exists, attempt to reclaim it. |
| * |
| * <p>Once the lock is successfully claimed, saves the token information and schedules a |
| * heartbeat. |
| * |
| * @throws GlobalRefDbLockException if there is a failure to claim the lock |
| */ |
| @SuppressWarnings("FutureReturnValueIgnored") |
| public void tryLock() throws GlobalRefDbLockException { |
| try { |
| TransactionRunner transactionRunner = dbClient.readWriteTransaction(); |
| transactionRunner.run( |
| transaction -> { |
| transaction.buffer(createLock()); |
| return true; |
| }); |
| token = transactionRunner.getCommitTimestamp(); |
| heartbeatTask = |
| heartbeatExecutor.scheduleAtFixedRate( |
| this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); |
| logger.atFine().log("Locked %s %s.", projectName, refName); |
| } catch (Exception e) { |
| Throwable cause = e.getCause(); |
| if (cause instanceof SpannerException) { |
| ErrorCode code = ((SpannerException) cause).getErrorCode(); |
| if (code == ErrorCode.ALREADY_EXISTS) { |
| logger.atFine().log( |
| "Lock for %s %s already exists, attempting to reclaim.", projectName, refName); |
| if (reclaim()) { |
| return; |
| } |
| } |
| } |
| throw new GlobalRefDbLockException(projectName, refName, e); |
| } |
| } |
| |
| /** |
| * Attempt to reclaim a lock that is already claimed by another process. |
| * |
| * <p>Checks for heartbeat staleness; if the heartbeat is not stale, does not proceed further. If |
| * the heartbeat is stale, ensure the lock still exists and has the expected token. Then insert a |
| * reclaim-lock to assert the reclamation process is in progress and update the old lock. This is |
| * all done in one transaction so that if any one step fails, the transaction is aborted. Clean up |
| * the reclaim-lock on success. |
| */ |
| @SuppressWarnings("FutureReturnValueIgnored") |
| private boolean reclaim() { |
| TransactionRunner transactionRunner = dbClient.readWriteTransaction(); |
| |
| boolean success = |
| transactionRunner.run( |
| transaction -> { |
| Struct row = |
| transaction.readRow( |
| "locks", Key.of(projectName, refName), Arrays.asList("token", "heartbeat")); |
| if (row == null) { |
| // If the old lock doesn't exist any more, we create a new one |
| transaction.buffer(createLock()); |
| return true; |
| } |
| String timestampDifferenceQuery = |
| "SELECT TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), token, SECOND) AS seconds FROM locks" |
| + " WHERE project = @project AND ref = @ref;"; |
| ResultSet resultSet = |
| transaction.executeQuery( |
| Statement.newBuilder(timestampDifferenceQuery) |
| .bind("project") |
| .to(projectName) |
| .bind("ref") |
| .to(refName) |
| .build()); |
| if (resultSet.next() && resultSet.getLong("seconds") <= SECONDS_FOR_STALE_HEARTBEAT) { |
| // Check if the old lock is stale; if not, do nothing |
| return false; |
| } |
| Timestamp oldToken = row.getTimestamp("token"); |
| transaction.buffer(createReclaimLock(oldToken)); |
| transaction.buffer(updateReclaimedLock()); |
| transaction.buffer(deleteReclaimLockForCleanup(oldToken)); |
| return true; |
| }); |
| |
| if (success) { |
| token = transactionRunner.getCommitTimestamp(); |
| heartbeatExecutor.scheduleAtFixedRate( |
| this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS); |
| logger.atFine().log("Reclaimed lock for %s %s.", projectName, refName); |
| } |
| return success; |
| } |
| |
| private Mutation deleteReclaimLockForCleanup(Timestamp reclaimToken) { |
| return Mutation.delete( |
| "locks", KeySet.prefixRange(Key.of(reclaimLockName(reclaimToken), refName))); |
| } |
| |
| /** |
| * Update the heartbeat timestamp of the current lock if it matches the expected token. If the |
| * tokens do not match, the lock has been reclaimed and a new heartbeat started, so this heartbeat |
| * ends. |
| * |
| * @throws GlobalRefDbLockException if the lock unexpectedly no longer exists but the heartbeat is |
| * still ongoing |
| */ |
| private void heartbeat() { |
| try { |
| boolean success = |
| dbClient |
| .readWriteTransaction() |
| .run( |
| transaction -> { |
| Struct row = |
| transaction.readRow( |
| "locks", Key.of(projectName, refName), Collections.singleton("token")); |
| if (row == null) { |
| logger.atWarning().log( |
| "Lock %s %s (%s) not found", projectName, refName, token); |
| return false; |
| } |
| if (!token.equals(row.getTimestamp("token"))) { |
| logger.atWarning().log( |
| "Lock for %s %s has unexpected token (%s), expecting (%s)", |
| projectName, refName, row.getTimestamp("token"), token); |
| return false; |
| } |
| transaction.buffer(updateHeartbeat()); |
| return true; |
| }); |
| if (!success) { |
| logger.atFine().log("Heartbeat stopping for lock %s %s (%s)", projectName, refName, token); |
| cancelHeartbeatTask(); |
| } |
| } catch (Exception e) { |
| logger.atWarning().withCause(e).log( |
| "Heartbeat failed to update for lock %s %s (%s)", projectName, refName, token); |
| } |
| } |
| |
| /** Close the lock, shutting down the heartbeat and deleting the lock from the locks table. */ |
| @Override |
| public void close() { |
| cancelHeartbeatTask(); |
| |
| dbClient |
| .readWriteTransaction() |
| .run( |
| transaction -> { |
| transaction.executeUpdate(deleteLockStatement()); |
| return true; |
| }); |
| } |
| |
| private void cancelHeartbeatTask() { |
| if (heartbeatTask != null) { |
| heartbeatTask.cancel(false); |
| } |
| } |
| |
| private String reclaimLockName(Timestamp reclaimToken) { |
| return String.format( |
| "%s/%s/%012d%09d", |
| RECLAIM_LOCK_PREFIX, projectName, reclaimToken.getSeconds(), reclaimToken.getNanos()); |
| } |
| |
| private Mutation createLock() { |
| return Mutation.newInsertBuilder("locks") |
| .set("project") |
| .to(projectName) |
| .set("ref") |
| .to(refName) |
| .set("heartbeat") |
| .to(Value.COMMIT_TIMESTAMP) |
| .set("token") |
| .to(Value.COMMIT_TIMESTAMP) |
| .set("owner") |
| .to(gerritInstanceId) |
| .build(); |
| } |
| |
| private Mutation createReclaimLock(Timestamp reclaimToken) { |
| return Mutation.newInsertBuilder("locks") |
| .set("project") |
| .to(reclaimLockName(reclaimToken)) |
| .set("ref") |
| .to(refName) |
| .set("heartbeat") |
| .to(Value.COMMIT_TIMESTAMP) |
| .set("token") |
| .to(Value.COMMIT_TIMESTAMP) |
| .set("owner") |
| .to(gerritInstanceId) |
| .build(); |
| } |
| |
| private Mutation updateReclaimedLock() { |
| return Mutation.newUpdateBuilder("locks") |
| .set("project") |
| .to(projectName) |
| .set("ref") |
| .to(refName) |
| .set("heartbeat") |
| .to(Value.COMMIT_TIMESTAMP) |
| .set("token") |
| .to(Value.COMMIT_TIMESTAMP) |
| .set("owner") |
| .to(gerritInstanceId) |
| .build(); |
| } |
| |
| private Mutation updateHeartbeat() { |
| return Mutation.newUpdateBuilder("locks") |
| .set("project") |
| .to(projectName) |
| .set("ref") |
| .to(refName) |
| .set("heartbeat") |
| .to(Value.COMMIT_TIMESTAMP) |
| .build(); |
| } |
| |
| private Statement deleteLockStatement() { |
| return Statement.newBuilder( |
| "DELETE FROM locks WHERE project = @project AND ref = @ref AND token = @token") |
| .bind("project") |
| .to(projectName) |
| .bind("ref") |
| .to(refName) |
| .bind("token") |
| .to(token) |
| .build(); |
| } |
| } |