// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.spannerrefdb;

import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;

import com.gerritforge.gerrit.globalrefdb.GlobalRefDbLockException;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Struct;
import com.google.gerrit.entities.Project;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class LockTest implements RefFixture {
  private SpannerTestSystem testSystem;
  private SpannerRefDatabase refDb;
  private DatabaseClient dbClient;

  private String TOKEN = "token";
  private String HEARTBEAT = "heartbeat";

  @Before
  public void setUp() throws Exception {
    testSystem = SpannerTestSystem.create();
    refDb = testSystem.database();
    dbClient = testSystem.dbClient();
  }

  @After
  public void tearDown() {
    testSystem.cleanup();
  }

  @Test
  public void lockUnlockedRef_Success() throws Exception {
    try (AutoCloseable refLock = refDb.lockRef(PROJECT_NAME_KEY, REF_NAME)) {
      assertThat(getLockTimestamp(PROJECT_NAME_KEY, REF_NAME, TOKEN)).isNotNull();

      Timestamp heartbeat = getLockTimestamp(PROJECT_NAME_KEY, REF_NAME, HEARTBEAT);
      try {
        Thread.sleep(4_000);
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
      }
      assertThat(getLockTimestamp(PROJECT_NAME_KEY, REF_NAME, HEARTBEAT).compareTo(heartbeat))
          .isGreaterThan(0);
    }

    assertThat(getLockRow(PROJECT_NAME_KEY, REF_NAME)).isNull();
  }

  @Test
  public void lockAlreadyLockedRef_Rejected() throws Exception {
    try (AutoCloseable refLock = refDb.lockRef(PROJECT_NAME_KEY, REF_NAME)) {
      GlobalRefDbLockException e =
          assertThrows(
              GlobalRefDbLockException.class, () -> refDb.lockRef(PROJECT_NAME_KEY, REF_NAME));
      assertThat(e)
          .hasMessageThat()
          .contains(
              String.format("Unable to lock ref %s on project %s", REF_NAME, PROJECT_NAME_KEY));
    }
  }

  @Test
  public void lockReleasedRef_Success() throws Exception {
    Timestamp lockToken;
    try (AutoCloseable refLock = refDb.lockRef(PROJECT_NAME_KEY, REF_NAME)) {
      lockToken = getLockTimestamp(PROJECT_NAME_KEY, REF_NAME, TOKEN);
    }

    try (AutoCloseable refLock = refDb.lockRef(PROJECT_NAME_KEY, REF_NAME)) {
      Timestamp newLockToken = getLockTimestamp(PROJECT_NAME_KEY, REF_NAME, TOKEN);
      assertThat(lockToken).isNotEqualTo(newLockToken);
    }
  }

  @Test
  public void reclaimStaleLockedRef_Success() throws Exception {
    Timestamp staleTimestamp = Timestamp.ofTimeSecondsAndNanos(0, 0);
    insertLockRow(PROJECT_NAME_KEY, REF_NAME, staleTimestamp);
    try (AutoCloseable refLock = refDb.lockRef(PROJECT_NAME_KEY, REF_NAME)) {
      Timestamp newLockToken = getLockTimestamp(PROJECT_NAME_KEY, REF_NAME, TOKEN);
      assertThat(newLockToken).isNotNull();
      assertThat(staleTimestamp).isLessThan(newLockToken);
    }
  }

  @Test
  public void reclaimFreshLockedRef_Rejected() throws Exception {
    Timestamp freshTimestamp = Timestamp.now();
    insertLockRow(PROJECT_NAME_KEY, REF_NAME, freshTimestamp);
    GlobalRefDbLockException e =
        assertThrows(
            GlobalRefDbLockException.class, () -> refDb.lockRef(PROJECT_NAME_KEY, REF_NAME));
    assertThat(e)
        .hasMessageThat()
        .contains(String.format("Unable to lock ref %s on project %s", REF_NAME, PROJECT_NAME_KEY));
  }

  @Test
  public void concurrentLocks_OnlyOneSuccess() throws Exception {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    ExecutorService pool = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; i++) {
      Future<Boolean> f1 = pool.submit(() -> tryLockAwaitBarrier(cyclicBarrier));
      Future<Boolean> f2 = pool.submit(() -> tryLockAwaitBarrier(cyclicBarrier));
      assertThat(f1.get()).isNotEqualTo(f2.get());
    }
    pool.shutdown();
  }

  @Test
  public void concurrentReclamations_OnlyOneSuccess() throws Exception {
    Timestamp staleTimestamp = Timestamp.ofTimeSecondsAndNanos(0, 0);
    CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    ExecutorService pool = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; i++) {
      insertLockRow(PROJECT_NAME_KEY, REF_NAME, staleTimestamp);
      Future<Boolean> f1 = pool.submit(() -> tryLockAwaitBarrier(cyclicBarrier));
      Future<Boolean> f2 = pool.submit(() -> tryLockAwaitBarrier(cyclicBarrier));
      assertThat(f1.get()).isNotEqualTo(f2.get());
    }
    pool.shutdown();
  }

  private boolean tryLockAwaitBarrier(CyclicBarrier barrier) throws Exception {
    try (AutoCloseable refLock = refDb.lockRef(PROJECT_NAME_KEY, REF_NAME)) {
      barrier.await();
      return true;
    } catch (GlobalRefDbLockException e) {
      barrier.await();
    }
    return false;
  }

  private Timestamp getLockTimestamp(Project.NameKey project, String refName, String column) {
    Struct row = getLockRow(project, refName);
    return row != null ? row.getTimestamp(column) : null;
  }

  private Struct getLockRow(Project.NameKey project, String refName) {
    return dbClient
        .singleUse()
        .readRow("locks", Key.of(project.get(), refName), Arrays.asList(TOKEN, HEARTBEAT));
  }

  private void insertLockRow(Project.NameKey project, String refName, Timestamp timestamp) {
    Mutation insertLock =
        Mutation.newInsertBuilder("locks")
            .set("project")
            .to(project.get())
            .set("ref")
            .to(refName)
            .set("heartbeat")
            .to(timestamp)
            .set("token")
            .to(timestamp)
            .set("owner")
            .to("")
            .build();
    dbClient.write(Collections.singletonList(insertLock));
  }
}
