blob: 20a921b01ab08837c5fbe2794c415b269a30832a [file] [log] [blame]
// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.spannerrefdb;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Value;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.config.GerritInstanceId;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* * Locks are a mechanism to ensure exclusive usage of the locked resource. Here we provide a
* Spanner-specific implementation of locks which are used from the global-refdb module, which is
* used to ensure refs are updated consistently across multiple Gerrit primary sites (see
* README.md).
*
* <p>Persistent distributed Locks are used to guard distributed transactions spanning RefUpdates in
* a git repo of the involved Gerrit primary sites and corresponding updates of those refs in the
* global-refdb.
*/
public class Lock implements AutoCloseable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
Lock create(@Assisted("projectName") String projectName, @Assisted("refName") String refName);
}
private static final Long SECONDS_FOR_STALE_HEARTBEAT = 30L;
private static final int HEARTBEAT_INTERVAL = 2;
private static final String RECLAIM_LOCK_PREFIX = "RECLAIM";
private final DatabaseClient dbClient;
private final String gerritInstanceId;
private ScheduledExecutorService heartbeatExecutor;
private final String projectName;
private final String refName;
private Timestamp token;
private ScheduledFuture<?> heartbeatTask;
@Inject
Lock(
DatabaseClient dbClient,
@GerritInstanceId String gerritInstanceId,
@HeartbeatExecutor ScheduledExecutorService heartbeatExecutor,
@Assisted("projectName") String projectName,
@Assisted("refName") String refName) {
this.dbClient = dbClient;
this.gerritInstanceId = gerritInstanceId;
this.heartbeatExecutor = heartbeatExecutor;
this.projectName = projectName;
this.refName = refName;
}
/**
* Attempt to lock a project/ref by creating a lock in the locks table. If the lock already
* exists, attempt to reclaim it.
*
* <p>Once the lock is successfully claimed, saves the token information and schedules a
* heartbeat.
*
* @throws GlobalRefDbLockException if there is a failure to claim the lock
*/
@SuppressWarnings("FutureReturnValueIgnored")
public void tryLock() throws GlobalRefDbLockException {
try {
TransactionRunner transactionRunner = dbClient.readWriteTransaction();
transactionRunner.run(
transaction -> {
transaction.buffer(createLock());
return true;
});
token = transactionRunner.getCommitTimestamp();
heartbeatTask =
heartbeatExecutor.scheduleAtFixedRate(
this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
logger.atFine().log("Locked %s %s.", projectName, refName);
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause instanceof SpannerException) {
ErrorCode code = ((SpannerException) cause).getErrorCode();
if (code == ErrorCode.ALREADY_EXISTS) {
logger.atFine().log(
"Lock for %s %s already exists, attempting to reclaim.", projectName, refName);
if (reclaim()) {
return;
}
}
}
throw new GlobalRefDbLockException(projectName, refName, e);
}
}
/**
* Attempt to reclaim a lock that is already claimed by another process.
*
* <p>Checks for heartbeat staleness; if the heartbeat is not stale, does not proceed further. If
* the heartbeat is stale, ensure the lock still exists and has the expected token. Then insert a
* reclaim-lock to assert the reclamation process is in progress and update the old lock. This is
* all done in one transaction so that if any one step fails, the transaction is aborted. Clean up
* the reclaim-lock on success.
*/
@SuppressWarnings("FutureReturnValueIgnored")
private boolean reclaim() {
TransactionRunner transactionRunner = dbClient.readWriteTransaction();
boolean success =
transactionRunner.run(
transaction -> {
Struct row =
transaction.readRow(
"locks", Key.of(projectName, refName), Arrays.asList("token", "heartbeat"));
if (row == null) {
// If the old lock doesn't exist any more, we create a new one
transaction.buffer(createLock());
return true;
}
String timestampDifferenceQuery =
"SELECT TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), token, SECOND) AS seconds FROM locks"
+ " WHERE project = @project AND ref = @ref;";
ResultSet resultSet =
transaction.executeQuery(
Statement.newBuilder(timestampDifferenceQuery)
.bind("project")
.to(projectName)
.bind("ref")
.to(refName)
.build());
if (resultSet.getLong("seconds") <= SECONDS_FOR_STALE_HEARTBEAT) {
// Check if the old lock is stale; if not, do nothing
return false;
}
Timestamp oldToken = row.getTimestamp("token");
transaction.buffer(createReclaimLock(oldToken));
transaction.buffer(updateReclaimedLock());
transaction.buffer(deleteReclaimLockForCleanup(oldToken));
return true;
});
if (success) {
token = transactionRunner.getCommitTimestamp();
heartbeatExecutor.scheduleAtFixedRate(
this::heartbeat, 0, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
logger.atFine().log("Reclaimed lock for %s %s.", projectName, refName);
}
return success;
}
private Mutation deleteReclaimLockForCleanup(Timestamp reclaimToken) {
return Mutation.delete(
"locks", KeySet.prefixRange(Key.of(reclaimLockName(reclaimToken), refName)));
}
/**
* Update the heartbeat timestamp of the current lock if it matches the expected token. If the
* tokens do not match, the lock has been reclaimed and a new heartbeat started, so this heartbeat
* ends.
*
* @throws GlobalRefDbLockException if the lock unexpectedly no longer exists but the heartbeat is
* still ongoing
*/
private void heartbeat() {
try {
boolean success =
dbClient
.readWriteTransaction()
.run(
transaction -> {
Struct row =
transaction.readRow(
"locks", Key.of(projectName, refName), Collections.singleton("token"));
if (row == null) {
logger.atWarning().log(
"Lock %s %s (%s) not found", projectName, refName, token);
return false;
}
if (!token.equals(row.getTimestamp("token"))) {
logger.atWarning().log(
"Lock for %s %s has unexpected token (%s), expecting (%s)",
projectName, refName, row.getTimestamp("token"), token);
return false;
}
transaction.buffer(updateHeartbeat());
return true;
});
if (!success) {
logger.atFine().log("Heartbeat stopping for lock %s %s (%s)", projectName, refName, token);
cancelHeartbeatTask();
}
} catch (Exception e) {
logger.atWarning().withCause(e).log(
"Heartbeat failed to update for lock %s %s (%s)", projectName, refName, token);
}
}
/** Close the lock, shutting down the heartbeat and deleting the lock from the locks table. */
@Override
public void close() {
cancelHeartbeatTask();
dbClient
.readWriteTransaction()
.run(
transaction -> {
transaction.executeUpdate(deleteLockStatement());
return true;
});
}
private void cancelHeartbeatTask() {
if (heartbeatTask != null) {
heartbeatTask.cancel(false);
}
}
private String reclaimLockName(Timestamp reclaimToken) {
return String.format(
"%s/%s/%012d%09d",
RECLAIM_LOCK_PREFIX, projectName, reclaimToken.getSeconds(), reclaimToken.getNanos());
}
private Mutation createLock() {
return Mutation.newInsertBuilder("locks")
.set("project")
.to(projectName)
.set("ref")
.to(refName)
.set("heartbeat")
.to(Value.COMMIT_TIMESTAMP)
.set("token")
.to(Value.COMMIT_TIMESTAMP)
.set("owner")
.to(gerritInstanceId)
.build();
}
private Mutation createReclaimLock(Timestamp reclaimToken) {
return Mutation.newInsertBuilder("locks")
.set("project")
.to(reclaimLockName(reclaimToken))
.set("ref")
.to(refName)
.set("heartbeat")
.to(Value.COMMIT_TIMESTAMP)
.set("token")
.to(Value.COMMIT_TIMESTAMP)
.set("owner")
.to(gerritInstanceId)
.build();
}
private Mutation updateReclaimedLock() {
return Mutation.newUpdateBuilder("locks")
.set("project")
.to(projectName)
.set("ref")
.to(refName)
.set("heartbeat")
.to(Value.COMMIT_TIMESTAMP)
.set("token")
.to(Value.COMMIT_TIMESTAMP)
.set("owner")
.to(gerritInstanceId)
.build();
}
private Mutation updateHeartbeat() {
return Mutation.newUpdateBuilder("locks")
.set("project")
.to(projectName)
.set("ref")
.to(refName)
.set("heartbeat")
.to(Value.COMMIT_TIMESTAMP)
.build();
}
private Statement deleteLockStatement() {
return Statement.newBuilder(
"DELETE FROM locks WHERE project = @project AND ref = @ref AND token = @token")
.bind("project")
.to(projectName)
.bind("ref")
.to(refName)
.bind("token")
.to(token)
.build();
}
}