// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.spannerrefdb;

import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDbSystemError;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.inject.Inject;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import javax.inject.Singleton;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;

@Singleton
public class SpannerRefDatabase implements GlobalRefDatabase {
  private static final FluentLogger logger = FluentLogger.forEnclosingClass();

  private final DatabaseClient dbClient;

  @Inject
  SpannerRefDatabase(DatabaseClient dbClient) {
    this.dbClient = dbClient;
  }

  @Override
  public boolean isUpToDate(Project.NameKey project, Ref ref) throws GlobalRefDbLockException {
    String valueInSpanner = get(project, ref.getName());
    if (valueInSpanner == null) {
      // If it doesn't exist, assume up to date (will populate on next compareAndPut)
      return true;
    }
    ObjectId objectIdInSharedRefDb = ObjectId.fromString(valueInSpanner);
    boolean isUpToDate = objectIdInSharedRefDb.equals(ref.getObjectId());
    if (!isUpToDate) {
      logger.atFine().log(
          "%s:%s is out of sync: local=%s spanner=%s",
          project, ref.getName(), ref.getObjectId(), objectIdInSharedRefDb);
    }
    return isUpToDate;
  }

  @Override
  public boolean compareAndPut(Project.NameKey project, Ref ref, ObjectId newValue)
      throws GlobalRefDbSystemError {
    newValue = Optional.ofNullable(newValue).orElse(ObjectId.zeroId());
    ObjectId currValue = Optional.ofNullable(ref.getObjectId()).orElse(ObjectId.zeroId());
    return doCompareAndPut(project, ref.getName(), currValue.getName(), newValue.name());
  }

  @Override
  public <T> boolean compareAndPut(
      Project.NameKey project, String refName, T expectedValue, T newValue)
      throws GlobalRefDbSystemError {
    String newRefValue =
        Optional.ofNullable(newValue).map(Object::toString).orElse(ObjectId.zeroId().getName());
    String expectedRefValue =
        Optional.ofNullable(expectedValue)
            .map(Object::toString)
            .orElse(ObjectId.zeroId().getName());

    return doCompareAndPut(project, refName, expectedRefValue, newRefValue);
  }

  /**
   * Run the compare and put to set the ref value to the newValue if and only if the current ref
   * value is expectedValue. Its primary key is the refPath.
   *
   * <p>If the ref doesn't exist at all, insert it and return true. If the ref exists and its
   * current value is the same as expected current value, update and return true. Otherwise, return
   * false.
   *
   * @param project - the project
   * @param refName - the ref
   * @param expectedValue - the expected value to be found in the global-refdb
   * @param newValue - the new value to put in the global-refdb if the expectedValue is present
   * @return success
   * @throws GlobalRefDbSystemError
   */
  private boolean doCompareAndPut(
      Project.NameKey project, String refName, String expectedValue, String newValue)
      throws GlobalRefDbSystemError {
    logger.atInfo().log(
        "Do compare and put for %s / %s. Expected value: %s. New value: %s",
        project.get(), refName, expectedValue, newValue);

    return dbClient
        .readWriteTransaction()
        .run(
            transaction -> {
              Struct row =
                  transaction.readRow(
                      "refs", Key.of(project.get(), refName), Arrays.asList("value"));
              // If the newValue is zeroId, delete the row if the expected value is correct
              if (newValue.equals(ObjectId.zeroId().name())) {
                if (row == null) {
                  return true;
                }
                if (row.getString(0).equals(expectedValue)) {
                  transaction.buffer(Mutation.delete("refs", Key.of(project.get(), refName)));
                  return true;
                }
                return false;
              }
              // If the row is null, the row doesn't exist and will be created.
              // If the value is the expected value, the row should be updated.
              if (row == null || row.getString(0).equals(expectedValue)) {
                transaction.buffer(
                    Mutation.newInsertOrUpdateBuilder("refs")
                        .set("project")
                        .to(project.get())
                        .set("ref")
                        .to(refName)
                        .set("value")
                        .to(newValue)
                        .build());
                return true;
              }
              return false;
            });
  }

  public class Lock implements AutoCloseable {

    private final String projectName;
    private final String refName;

    public Lock(String projectName, String refName) throws GlobalRefDbLockException {
      // Attempt to create the lock here
      this.projectName = projectName;
      this.refName = refName;
      try {
        dbClient
            .readWriteTransaction()
            .run(
                transaction -> {
                  transaction.buffer(
                      Mutation.newInsertBuilder("locks")
                          .set("project")
                          .to(projectName)
                          .set("ref")
                          .to(refName)
                          .build());
                  return true;
                });
      } catch (Exception e) {
        logger.atSevere().withCause(e).log(
            "Failed to acquire lock for %s %s", projectName, refName);
        throw new GlobalRefDbLockException(projectName, refName, e);
      }
    }

    @Override
    public void close() throws Exception {
      dbClient.write(
          Collections.singletonList(Mutation.delete("locks", Key.of(projectName, refName))));
    }
  }

  @Override
  public AutoCloseable lockRef(Project.NameKey project, String refName)
      throws GlobalRefDbLockException {
    logger.atInfo().log("Attempting to lock %s %s.", project.get(), refName);
    // TODO: Some sort of heartbeat that removes stale locks if they haven't been
    return new Lock(project.get(), refName);
  }

  @Override
  public boolean exists(Project.NameKey project, String refName) {
    logger.atInfo().log("Checking if ref %s %s exists.", project.get(), refName);
    try (ResultSet resultSet =
        dbClient
            .singleUse()
            .executeQuery(
                Statement.newBuilder("SELECT * FROM refs WHERE project = @project and ref = @ref")
                    .bind("project")
                    .to(project.get())
                    .bind("ref")
                    .to(refName)
                    .build())) {
      return resultSet.next();
    }
  }

  @Override
  public void remove(Project.NameKey project) throws GlobalRefDbSystemError {
    // Delete all rows with matching project
    logger.atInfo().log("Removing project %s from global-refdb", project.get());
    dbClient.write(
        Collections.singletonList(
            Mutation.delete("refs", KeySet.prefixRange(Key.of(project.get())))));
  }

  private String get(Project.NameKey project, String refName) throws GlobalRefDbSystemError {
    try (ResultSet resultSet =
        dbClient
            .singleUse()
            .executeQuery(
                Statement.newBuilder(
                        "SELECT value FROM refs WHERE project = @project and ref = @ref")
                    .bind("project")
                    .to(project.get())
                    .bind("ref")
                    .to(refName)
                    .build())) {
      if (resultSet.next()) {
        return resultSet.getString("value");
      }
      return null;
    } catch (Exception e) {
      throw new GlobalRefDbSystemError(String.format("Cannot get value for %s", project.get()), e);
    }
  }

  @Override
  public <T> Optional<T> get(Project.NameKey project, String refName, Class<T> clazz)
      throws GlobalRefDbSystemError {
    // The only usage of this is as a String but the API requires Optional<T>?
    // It looks like zookeeper has/uses a StringDeserializerFactory/StringDeserializer to deal
    // with this, while the aws implementation opts for this.
    return Optional.ofNullable((T) get(project, refName));
  }
}
