// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.replication;

import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
import static java.util.stream.Collectors.toList;

import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.replication.Destination.Factory;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.transport.URIish;

@Singleton
public class DestinationsCollection implements ReplicationDestinations {
  private static final FluentLogger logger = FluentLogger.forEnclosingClass();

  private final Factory destinationFactory;
  private final Provider<ReplicationQueue> replicationQueue;
  private volatile List<Destination> destinations;
  private boolean shuttingDown;

  public static class EventQueueNotEmptyException extends Exception {
    private static final long serialVersionUID = 1L;

    public EventQueueNotEmptyException(String errorMessage) {
      super(errorMessage);
    }
  }

  @Inject
  public DestinationsCollection(
      Destination.Factory destinationFactory,
      Provider<ReplicationQueue> replicationQueue,
      ReplicationConfig replicationConfig,
      ConfigParser configParser,
      EventBus eventBus)
      throws ConfigInvalidException {
    this.destinationFactory = destinationFactory;
    this.replicationQueue = replicationQueue;
    this.destinations =
        allDestinations(
            destinationFactory, configParser.parseRemotes(replicationConfig.getConfig()));
    eventBus.register(this);
  }

  @Override
  public Multimap<Destination, URIish> getURIs(
      Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
    if (getAll(filterType).isEmpty()) {
      return ImmutableMultimap.of();
    }

    SetMultimap<Destination, URIish> uris = HashMultimap.create();
    for (Destination config : getAll(filterType)) {
      if (filterType != FilterType.PROJECT_DELETION && !config.wouldPushProject(projectName)) {
        continue;
      }

      if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
        continue;
      }

      boolean adminURLUsed = false;

      for (String url : config.getAdminUrls()) {
        if (Strings.isNullOrEmpty(url)) {
          continue;
        }

        URIish uri;
        try {
          uri = new URIish(url);
        } catch (URISyntaxException e) {
          repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
          continue;
        }

        if (!isGerrit(uri) && !isGerritHttp(uri)) {
          String path =
              replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
          if (path == null) {
            repLog.warn("adminURL {} does not contain ${name}", uri);
            continue;
          }

          uri = uri.setPath(path);
          if (!isSSH(uri)) {
            repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
            continue;
          }
        }
        uris.put(config, uri);
        adminURLUsed = true;
      }

      if (!adminURLUsed) {
        for (URIish uri : config.getURIs(projectName, "*")) {
          uris.put(config, uri);
        }
      }
    }
    return uris;
  }

  @Override
  public List<Destination> getAll(FilterType filterType) {
    Predicate<? super Destination> filter;
    switch (filterType) {
      case PROJECT_CREATION:
        filter = dest -> dest.isCreateMissingRepos();
        break;
      case PROJECT_DELETION:
        filter = dest -> dest.isReplicateProjectDeletions();
        break;
      case ALL:
      default:
        filter = dest -> true;
        break;
    }
    return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
  }

  @Override
  public List<Destination> getDestinations(URIish uri, Project.NameKey project, String ref) {
    List<Destination> dests = new ArrayList<>();
    for (Destination dest : getAll(FilterType.ALL)) {
      if (dest.wouldPush(uri, project, ref)) {
        dests.add(dest);
      }
    }
    return dests;
  }

  @Override
  public boolean isEmpty() {
    return destinations.isEmpty();
  }

  @Override
  public synchronized void startup(WorkQueue workQueue) {
    shuttingDown = false;
    for (Destination cfg : destinations) {
      cfg.start(workQueue);
    }
  }

  /* shutdown() cannot be set as a synchronized method because
   * it may need to wait for pending events to complete;
   * e.g. when enabling the drain of replication events before
   * shutdown.
   *
   * As a rule of thumb for synchronized methods, because they
   * implicitly define a critical section and associated lock,
   * they should never hold waiting for another resource, otherwise
   * the risk of deadlock is very high.
   *
   * See more background about deadlocks, what they are and how to
   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
   */
  @Override
  public int shutdown() {
    synchronized (this) {
      shuttingDown = true;
    }

    int discarded = 0;
    for (Destination cfg : destinations) {
      try {
        drainReplicationEvents(cfg);
      } catch (EventQueueNotEmptyException e) {
        logger.atWarning().log("Event queue not empty: %s", e.getMessage());
      } finally {
        discarded += cfg.shutdown();
      }
    }
    return discarded;
  }

  void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
    int drainQueueAttempts = destination.getDrainQueueAttempts();
    if (drainQueueAttempts == 0) {
      return;
    }
    int pending = destination.getQueueInfo().pending.size();
    int inFlight = destination.getQueueInfo().inFlight.size();
    while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
      try {
        logger.atInfo().log(
            "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
            inFlight, pending);
        Thread.sleep(destination.getReplicationDelayMilliseconds());
      } catch (InterruptedException ie) {
        logger.atWarning().withCause(ie).log(
            "Wait for replication events to drain has been interrupted");
      }
      pending = destination.getQueueInfo().pending.size();
      inFlight = destination.getQueueInfo().inFlight.size();
      drainQueueAttempts--;
    }
    if (pending > 0 || inFlight > 0) {
      throw new EventQueueNotEmptyException(
          String.format("Pending: %d - InFlight: %d", pending, inFlight));
    }
  }

  @Subscribe
  public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
    if (shuttingDown) {
      logger.atWarning().log("Shutting down: configuration reload ignored");
      return;
    }

    try {
      replicationQueue.get().stop();
      destinations = allDestinations(destinationFactory, remoteConfigurations);
      logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
    } finally {
      replicationQueue.get().start();
    }
  }

  private List<Destination> allDestinations(
      Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {

    ImmutableList.Builder<Destination> dest = ImmutableList.builder();
    for (RemoteConfiguration c : remoteConfigurations) {
      if (c instanceof DestinationConfiguration) {
        dest.add(destinationFactory.create((DestinationConfiguration) c));
      }
    }
    return dest.build();
  }
}
