// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.kinesis;

import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;

import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.WaitUtil;
import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;

@TestPlugin(
    name = "events-aws-kinesis",
    sysModule = "com.googlesource.gerrit.plugins.kinesis.Module")
public class KinesisEventsIT extends LightweightPluginDaemonTest {
  // This timeout is quite high to allow the kinesis coordinator to acquire a
  // lease on the newly created stream
  private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
  private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
  private static final long SEND_TIMEOUT_MILLIS = 200;

  private static final int LOCALSTACK_PORT = 4566;

  private LocalStackContainer localstack =
      new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.8"))
          .withServices(DYNAMODB, KINESIS, CLOUDWATCH)
          .withEnv("USE_SSL", "true")
          .withExposedPorts(LOCALSTACK_PORT);

  private KinesisClient kinesisClient;

  @Before
  public void setUpTestPlugin() throws Exception {
    localstack.start();

    kinesisClient =
        KinesisClient.builder()
            .endpointOverride(localstack.getEndpointOverride(KINESIS))
            .credentialsProvider(
                StaticCredentialsProvider.create(
                    AwsBasicCredentials.create(
                        localstack.getAccessKey(), localstack.getSecretKey())))
            .region(Region.of(localstack.getRegion()))
            .build();

    System.setProperty("endpoint", localstack.getEndpointOverride(KINESIS).toASCIIString());
    System.setProperty("region", localstack.getRegion());
    System.setProperty("aws.accessKeyId", localstack.getAccessKey());

    // The secret key property name has changed from aws-sdk 1.11.x and 2.x [1]
    // Export both names so that default credential provider chains work for both
    // Kinesis Consumer Library (uses V2) and Kinesis Producer Library (uses v1)
    // [1]  https://docs.aws.amazon.com/sdk-for-java/latest/migration-guide/client-credential.html
    System.setProperty("aws.secretKey", localstack.getSecretKey());
    System.setProperty("aws.secretAccessKey", localstack.getSecretKey());

    super.setUpTestPlugin();
  }

  @Override
  public void tearDownTestPlugin() {
    localstack.close();

    super.tearDownTestPlugin();
  }

  @Test
  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
  public void shouldConsumeAnEventPublishedToATopic() throws Exception {
    String streamName = UUID.randomUUID().toString();
    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);

    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);

    Event event = eventMessage();
    kinesisBroker().send(streamName, event);
    WaitUtil.waitUntil(
        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
  }

  @Test
  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
  public void shouldReplayMessages() throws Exception {
    String streamName = UUID.randomUUID().toString();
    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);

    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);

    Event event = eventMessage();
    kinesisBroker().send(streamName, event);

    WaitUtil.waitUntil(
        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);

    eventConsumerCounter.clear();
    kinesisBroker().disconnect();
    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
    kinesisBroker().replayAllEvents(streamName);

    WaitUtil.waitUntil(
        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
    compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
  }

  @Test
  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
  @GerritConfig(name = "plugin.events-aws-kinesis.publishTimeoutMs", value = "10000")
  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
  public void sendingSynchronouslyShouldBeSuccessful()
      throws InterruptedException, ExecutionException {
    String streamName = UUID.randomUUID().toString();
    createStreamAsync(streamName);

    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
    assertThat(result.get()).isTrue();
  }

  @Test
  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
  public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut()
      throws InterruptedException, ExecutionException, TimeoutException {
    String streamName = "not-existing-stream";

    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
    assertThat(result.get(SEND_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).isFalse();
  }

  @Test
  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
  public void sendingAsynchronouslyShouldFailWhenStreamDoesNotExist() {
    String streamName = "not-existing-stream";

    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
    ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
    assertThat(executionException)
        .hasMessageThat()
        .contains("com.amazonaws.services.kinesis.producer.UserRecordFailedException");
  }

  @Test
  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
  public void sendingAsynchronouslyShouldBeSuccessful()
      throws InterruptedException, ExecutionException {
    String streamName = UUID.randomUUID().toString();
    createStreamAsync(streamName);

    ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
    assertThat(result.get()).isTrue();
  }

  public KinesisBrokerApi kinesisBroker() {
    return (KinesisBrokerApi) plugin.getSysInjector().getInstance(BrokerApi.class);
  }

  private void createStreamAndWait(String streamName, Duration timeout)
      throws InterruptedException {
    createStreamAsync(streamName);

    WaitUtil.waitUntil(
        () ->
            kinesisClient
                .describeStream(DescribeStreamRequest.builder().streamName(streamName).build())
                .streamDescription()
                .streamStatus()
                .equals(StreamStatus.ACTIVE),
        timeout);
  }

  private void createStreamAsync(String streamName) {
    kinesisClient.createStream(
        CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
  }

  private Event eventMessage() {
    Event event = new ProjectCreatedEvent();
    event.instanceId = "instance-id";
    return event;
  }

  private void compareEvents(Event event, Event expectedEvent) {
    assertThat(event.type).isEqualTo(expectedEvent.type);
    assertThat(event.eventCreatedOn).isEqualTo(expectedEvent.eventCreatedOn);
    assertThat(event.instanceId).isEqualTo(expectedEvent.instanceId);
  }

  private static class EventConsumerCounter implements Consumer<Event> {
    List<Event> consumedMessages = new ArrayList<>();

    @Override
    public void accept(Event eventMessage) {
      consumedMessages.add(eventMessage);
    }

    public List<Event> getConsumedMessages() {
      return consumedMessages;
    }

    public void clear() {
      consumedMessages.clear();
    }
  }
}
