// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.multisite.kafka.consumer;

import static com.google.common.truth.Truth.assertThat;
import static java.util.stream.Collectors.toSet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.gerrit.acceptance.AbstractDaemonTest;
import com.google.gerrit.acceptance.GerritConfig;
import com.google.gerrit.acceptance.LogThreshold;
import com.google.gerrit.acceptance.NoHttpd;
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.extensions.api.changes.ReviewInput;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.PluginConfigFactory;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.data.PatchSetAttribute;
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.PatchSetCreatedEvent;
import com.google.gerrit.server.events.RefUpdatedEvent;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.GitModule;
import com.googlesource.gerrit.plugins.multisite.Module;
import com.googlesource.gerrit.plugins.multisite.PluginModule;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;

@NoHttpd
@LogThreshold(level = "INFO")
@UseLocalDisk
public class EventConsumerIT extends AbstractDaemonTest {
  public static final String GERRIT_CONFIG_KEY = "gerrit.installModule";
  public static final String GERRIT_CONFIG_VALUE =
      "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule";
  private static final int QUEUE_POLL_TIMEOUT_MSECS = 10000;

  public static class KafkaTestContainerModule extends LifecycleModule {

    public static class KafkaStopAtShutdown implements LifecycleListener {
      private final KafkaContainer kafka;

      public KafkaStopAtShutdown(KafkaContainer kafka) {
        this.kafka = kafka;
      }

      @Override
      public void stop() {
        kafka.stop();
      }

      @Override
      public void start() {
        // Do nothing
      }
    }

    public static class TestBrokerModule extends BrokerModule {
      @Override
      protected void configure() {
        DynamicItem.itemOf(binder(), BrokerApi.class);
        bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);

        install(new SubscriberModule());
      }
    }

    private final FileBasedConfig config;
    private final Module multiSiteModule;
    private final PluginModule pluginModule;
    private final GitModule gitModule;

    @Inject
    public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
      this.config =
          new FileBasedConfig(
              sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
      config.setBoolean("ref-database", null, "enabled", false);
      config.save();

      Configuration multiSiteConfig = new Configuration(config, new Config());

      PluginConfigFactory cfgFactory = mock(PluginConfigFactory.class);
      when(cfgFactory.getGlobalPluginConfig("multi-site")).thenReturn(config);

      this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
      this.pluginModule = new PluginModule(multiSiteConfig, new KafkaBrokerModule());
      this.gitModule = new GitModule(multiSiteConfig);
    }

    @Override
    protected void configure() {
      try {
        final KafkaContainer kafka = startAndConfigureKafkaConnection();

        listener().toInstance(new KafkaStopAtShutdown(kafka));

        install(multiSiteModule);
        install(pluginModule);
        install(gitModule);

      } catch (IOException e) {
        throw new IllegalStateException(e);
      }
    }

    private KafkaContainer startAndConfigureKafkaConnection() throws IOException {
      KafkaContainer kafkaContainer = new KafkaContainer();
      kafkaContainer.start();

      Config kafkaConfig = new Config();
      kafkaConfig.setString(
          "kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());

      PluginConfigFactory configFactory = mock(PluginConfigFactory.class);
      when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
      KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(configFactory, "multi-site");
      bind(KafkaConfiguration.class).toInstance(kafkaConfiguration);

      listener().toInstance(new KafkaStopAtShutdown(kafkaContainer));

      return kafkaContainer;
    }
  }

  @Test
  @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
  public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
    drainQueue(droppedEventsQueue);

    ChangeData change = createChange().getChange();
    String project = change.project().get();
    int changeNum = change.getId().get();
    String changeNotesRef = change.notes().getRefName();
    int patchsetNum = change.currentPatchSet().getPatchSetId();
    String patchsetRevision = change.currentPatchSet().getRevision().get();
    String patchsetRef = change.currentPatchSet().getRefName();

    Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
    assertThat(eventsByType).isNotEmpty();

    assertThat(eventsByType.get("change-index"))
        .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));

    assertThat(
            eventsByType.get("ref-updated").stream()
                .map(e -> ((RefUpdatedEvent) e).getRefName())
                .collect(toSet()))
        .containsAllOf(changeNotesRef, patchsetRef); // 'refs/sequences/changes'
    // not always updated thus
    // not checked

    List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
    assertThat(patchSetCreatedEvents).hasSize(1);
    assertPatchSetAttributes(
        (PatchSetCreatedEvent) patchSetCreatedEvents.get(0),
        patchsetNum,
        patchsetRevision,
        patchsetRef);
  }

  private void assertPatchSetAttributes(
      PatchSetCreatedEvent patchSetCreated,
      int patchsetNum,
      String patchsetRevision,
      String patchsetRef) {
    PatchSetAttribute patchSetAttribute = patchSetCreated.patchSet.get();
    assertThat(patchSetAttribute.number).isEqualTo(patchsetNum);
    assertThat(patchSetAttribute.revision).isEqualTo(patchsetRevision);
    assertThat(patchSetAttribute.ref).isEqualTo(patchsetRef);
  }

  @Test
  @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
  public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
    ChangeData change = createChange().getChange();
    String project = change.project().get();
    int changeNum = change.getId().get();
    drainQueue(droppedEventsQueue);

    ReviewInput in = ReviewInput.recommend();
    in.message = "LGTM";
    gApi.changes().id(changeNum).revision("current").review(in);

    Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);

    assertThat(eventsByType).isNotEmpty();

    assertThat(eventsByType.get("change-index"))
        .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));

    List<Event> commentAddedEvents = eventsByType.get("comment-added");
    assertThat(commentAddedEvents).hasSize(1);
    assertThat(((CommentAddedEvent) commentAddedEvents.get(0)).comment)
        .isEqualTo("Patch Set 1: Code-Review+1\n\n" + in.message);
  }

  private String getParentCommit(ChangeData change) throws Exception {
    RevCommit parent;
    try (Repository repo = repoManager.openRepository(change.project());
        RevWalk walk = new RevWalk(repo)) {
      RevCommit commit =
          walk.parseCommit(ObjectId.fromString(change.currentPatchSet().getRevision().get()));
      parent = commit.getParent(0);
    }
    return parent.getId().name();
  }

  private ChangeIndexEvent createChangeIndexEvent(
      String projectName, int changeId, String targetSha1) {
    ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, false);
    event.targetSha = targetSha1;
    return event;
  }

  private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEvents = new LinkedBlockingQueue<>();

    TypeLiteral<DynamicSet<DroppedEventListener>> type =
        new TypeLiteral<DynamicSet<DroppedEventListener>>() {};
    server
        .getTestInjector()
        .getInstance(Key.get(type))
        .add(
            "multi-site",
            new DroppedEventListener() {
              @Override
              public void onEventDropped(SourceAwareEventWrapper event) {
                droppedEvents.offer(event);
              }
            });
    return droppedEvents;
  }

  private Map<String, List<Event>> receiveEventsByType(
      LinkedBlockingQueue<SourceAwareEventWrapper> queue) throws InterruptedException {
    return drainQueue(queue).stream()
        .sorted(Comparator.comparing(e -> e.type))
        .collect(Collectors.groupingBy(e -> e.type));
  }

  private List<Event> drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
      throws InterruptedException {
    Gson gson = server.getTestInjector().getInstance(Key.get(Gson.class, BrokerGson.class));
    SourceAwareEventWrapper event;
    List<Event> eventsList = new ArrayList<>();
    while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
      eventsList.add(event.getEventBody(gson));
    }
    return eventsList;
  }
}
