// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.gerrit.server.git;

import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.ReplicationUser;
import com.google.gerrit.server.account.GroupMembership;
import com.google.gerrit.server.account.ListGroupMembership;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.FactoryModule;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.project.NoSuchProjectException;
import com.google.gerrit.server.project.PerRequestProjectControlCache;
import com.google.gerrit.server.project.ProjectControl;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import com.google.inject.servlet.RequestScoped;

import com.jcraft.jsch.Session;

import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.storage.file.FileRepository;
import org.eclipse.jgit.transport.JschConfigSessionFactory;
import org.eclipse.jgit.transport.OpenSshConfig;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.RemoteSession;
import org.eclipse.jgit.transport.SshSessionFactory;
import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.FS;
import org.eclipse.jgit.util.QuotedString;
import org.eclipse.jgit.util.io.StreamCopyThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/** Manages automatic replication to remote repositories. */
@Singleton
public class PushReplication implements ReplicationQueue {
  static final Logger log = LoggerFactory.getLogger(PushReplication.class);

  public static class Module extends AbstractModule {
    @Override
    protected void configure() {
      bind(ReplicationQueue.class).to(PushReplication.class);
    }
  }

  static {
    // Install our own factory which always runs in batch mode, as we
    // have no UI available for interactive prompting.
    //
    SshSessionFactory.setInstance(new JschConfigSessionFactory() {
      @Override
      protected void configure(OpenSshConfig.Host hc, Session session) {
        // Default configuration is batch mode.
      }
    });
  }

  private final Injector injector;
  private final WorkQueue workQueue;
  private final List<ReplicationConfig> configs;
  private final SchemaFactory<ReviewDb> database;
  private final ReplicationUser.Factory replicationUserFactory;
  private final GitRepositoryManager gitRepositoryManager;

  @Inject
  PushReplication(final Injector i, final WorkQueue wq, final SitePaths site,
      final ReplicationUser.Factory ruf, final SchemaFactory<ReviewDb> db,
      final GitRepositoryManager grm)
      throws ConfigInvalidException, IOException {
    injector = i;
    workQueue = wq;
    database = db;
    replicationUserFactory = ruf;
    gitRepositoryManager = grm;
    configs = allConfigs(site);
  }

  @Override
  public boolean isEnabled() {
    return configs.size() > 0;
  }

  @Override
  public void scheduleFullSync(final Project.NameKey project,
      final String urlMatch) {
    for (final ReplicationConfig cfg : configs) {
      for (final URIish uri : cfg.getURIs(project, urlMatch)) {
        cfg.schedule(project, PushOp.ALL_REFS, uri);
      }
    }
  }

  @Override
  public void scheduleUpdate(final Project.NameKey project, final String ref) {
    for (final ReplicationConfig cfg : configs) {
      if (cfg.wouldPushRef(ref)) {
        for (final URIish uri : cfg.getURIs(project, null)) {
          cfg.schedule(project, ref, uri);
        }
      }
    }
  }

  private static String replace(final String pat, final String key,
      final String val) {
    final int n = pat.indexOf("${" + key + "}");

    if (n != -1) {
      return pat.substring(0, n) + val + pat.substring(n + 3 + key.length());
    } else {
      return null;
    }
  }

  private List<ReplicationConfig> allConfigs(final SitePaths site)
      throws ConfigInvalidException, IOException {
    final FileBasedConfig cfg =
        new FileBasedConfig(site.replication_config, FS.DETECTED);

    if (!cfg.getFile().exists()) {
      log.warn("No " + cfg.getFile() + "; not replicating");
      return Collections.emptyList();
    }
    if (cfg.getFile().length() == 0) {
      log.info("Empty " + cfg.getFile() + "; not replicating");
      return Collections.emptyList();
    }

    try {
      cfg.load();
    } catch (ConfigInvalidException e) {
      throw new ConfigInvalidException("Config file " + cfg.getFile()
          + " is invalid: " + e.getMessage(), e);
    } catch (IOException e) {
      throw new IOException("Cannot read " + cfg.getFile() + ": "
          + e.getMessage(), e);
    }

    final List<ReplicationConfig> r = new ArrayList<ReplicationConfig>();
    for (final RemoteConfig c : allRemotes(cfg)) {
      if (c.getURIs().isEmpty()) {
        continue;
      }

      for (final URIish u : c.getURIs()) {
        if (u.getPath() == null || !u.getPath().contains("${name}")) {
          throw new ConfigInvalidException("remote." + c.getName() + ".url"
              + " \"" + u + "\" lacks ${name} placeholder in " + cfg.getFile());
        }
      }

      // In case if refspec destination for push is not set then we assume it is
      // equal to source
      for (RefSpec ref : c.getPushRefSpecs()) {
        if (ref.getDestination() == null) {
          ref.setDestination(ref.getSource());
        }
      }


      if (c.getPushRefSpecs().isEmpty()) {
        RefSpec spec = new RefSpec();
        spec = spec.setSourceDestination("refs/*", "refs/*");
        spec = spec.setForceUpdate(true);
        c.addPushRefSpec(spec);
      }

      r.add(new ReplicationConfig(injector, workQueue, c, cfg, database,
          replicationUserFactory, gitRepositoryManager));
    }
    return Collections.unmodifiableList(r);
  }

  private List<RemoteConfig> allRemotes(final FileBasedConfig cfg)
      throws ConfigInvalidException {
    List<String> names = new ArrayList<String>(cfg.getSubsections("remote"));
    Collections.sort(names);

    final List<RemoteConfig> result = new ArrayList<RemoteConfig>(names.size());
    for (final String name : names) {
      try {
        result.add(new RemoteConfig(cfg, name));
      } catch (URISyntaxException e) {
        throw new ConfigInvalidException("remote " + name
            + " has invalid URL in " + cfg.getFile());
      }
    }
    return result;
  }

  @Override
  public void replicateNewProject(Project.NameKey projectName, String head) {
    if (!isEnabled()) {
      return;
    }

    for (ReplicationConfig config : configs) {
      List<URIish> uriList = config.getURIs(projectName, "*");
      String[] adminUrls = config.getAdminUrls();
      boolean adminURLUsed = false;

      for (String url : adminUrls) {
        URIish adminURI = null;
        try {
          if (url != null && !url.isEmpty()) {
            adminURI = new URIish(url);
          }
        } catch (URISyntaxException e) {
          log.error("The URL '" + url + "' is invalid");
        }

        if (adminURI != null) {
          final String replacedPath =
              replace(adminURI.getPath(), "name", projectName.get());
          if (replacedPath != null) {
            adminURI = adminURI.setPath(replacedPath);
            if (usingSSH(adminURI)) {
              replicateProject(adminURI, head);
              adminURLUsed = true;
            } else {
              log.error("The adminURL '" + url
                  + "' is non-SSH which is not allowed");
            }
          }
        }
      }

      if (!adminURLUsed) {
        for (URIish uri : uriList) {
          replicateProject(uri, head);
        }
      }
    }
  }

  private void replicateProject(final URIish replicateURI, final String head) {
    if (!replicateURI.isRemote()) {
      replicateProjectLocally(replicateURI, head);
    } else if (usingSSH(replicateURI)) {
      replicateProjectOverSsh(replicateURI, head);
    } else {
      log.warn("Cannot create new project on remote site since neither the "
          + "connection method is SSH nor the replication target is local: "
          + replicateURI.toString());
      return;
    }
  }

  private void replicateProjectLocally(final URIish replicateURI,
      final String head) {
    try {
      final Repository repo = new FileRepository(replicateURI.getPath());
      try {
        repo.create(true /* bare */);

        final RefUpdate u = repo.updateRef(Constants.HEAD);
        u.disableRefLog();
        u.link(head);
      } finally {
        repo.close();
      }
    } catch (IOException e) {
      log.error("Failed to replicate project locally: "
          + replicateURI.getPath());
    }
  }

  private void replicateProjectOverSsh(final URIish replicateURI,
      final String head) {
    SshSessionFactory sshFactory = SshSessionFactory.getInstance();
    RemoteSession sshSession;
    String projectPath = QuotedString.BOURNE.quote(replicateURI.getPath());

    OutputStream errStream = createErrStream();
    String cmd =
        "mkdir -p " + projectPath + "&& cd " + projectPath
            + "&& git init --bare" + "&& git symbolic-ref HEAD "
            + QuotedString.BOURNE.quote(head);

    try {
      sshSession = sshFactory.getSession(replicateURI, null, FS.DETECTED, 0);
      Process proc = sshSession.exec(cmd, 0);
      proc.getOutputStream().close();
      StreamCopyThread out = new StreamCopyThread(proc.getInputStream(), errStream);
      StreamCopyThread err = new StreamCopyThread(proc.getErrorStream(), errStream);
      out.start();
      err.start();
      try {
        proc.waitFor();
        out.halt();
        err.halt();
      } catch (InterruptedException interrupted) {
        // Don't wait, drop out immediately.
      }
      sshSession.disconnect();
    } catch (IOException e) {
      log.error("Communication error when trying to replicate to: "
          + replicateURI.toString() + "\n" + "Error reported: "
          + e.getMessage() + "\n" + "Error in communication: "
          + errStream.toString());
    }
  }

  private OutputStream createErrStream() {
    return new OutputStream() {
      private StringBuilder all = new StringBuilder();
      private StringBuilder sb = new StringBuilder();

      @Override
      public String toString() {
        String r = all.toString();
        while (r.endsWith("\n"))
          r = r.substring(0, r.length() - 1);
        return r;
      }

      @Override
      public synchronized void write(final int b) {
        if (b == '\r') {
          return;
        }

        sb.append((char) b);

        if (b == '\n') {
          all.append(sb);
          sb.setLength(0);
        }
      }
    };
  }

  private boolean usingSSH(final URIish uri) {
    final String scheme = uri.getScheme();
    if (!uri.isRemote()) return false;
    if (scheme != null && scheme.toLowerCase().contains("ssh")) return true;
    if (scheme == null && uri.getHost() != null && uri.getPath() != null)
      return true;
    return false;
  }

  static class ReplicationConfig {
    private final RemoteConfig remote;
    private final String[] adminUrls;
    private final int delay;
    private final int retryDelay;
    private final WorkQueue.Executor pool;
    private final Map<URIish, PushOp> pending = new HashMap<URIish, PushOp>();
    private final PushOp.Factory opFactory;
    private final ProjectControl.Factory projectControlFactory;
    private final GitRepositoryManager mgr;
    private final boolean replicatePermissions;

    ReplicationConfig(final Injector injector, final WorkQueue workQueue,
        final RemoteConfig rc, final Config cfg, SchemaFactory<ReviewDb> db,
        final ReplicationUser.Factory replicationUserFactory,
        final GitRepositoryManager gitRepositoryManager) {

      remote = rc;
      delay = Math.max(0, getInt(rc, cfg, "replicationdelay", 15));
      retryDelay = Math.max(0, getInt(rc, cfg, "replicationretry", 1));

      final int poolSize = Math.max(0, getInt(rc, cfg, "threads", 1));
      final String poolName = "ReplicateTo-" + rc.getName();
      pool = workQueue.createQueue(poolSize, poolName);

      String[] authGroupNames =
          cfg.getStringList("remote", rc.getName(), "authGroup");
      final GroupMembership authGroups;
      if (authGroupNames.length > 0) {
        authGroups = new ListGroupMembership(ConfigUtil.groupsFor(db, authGroupNames, //
            log, "Group \"{0}\" not in database, removing from authGroup"));
      } else {
        authGroups = ReplicationUser.EVERYTHING_VISIBLE;
      }

      adminUrls = cfg.getStringList("remote", rc.getName(), "adminUrl");
      replicatePermissions = cfg.getBoolean("remote", rc.getName(),
              "replicatePermissions", true);
      mgr = gitRepositoryManager;

      final ReplicationUser remoteUser =
          replicationUserFactory.create(authGroups);

      projectControlFactory =
          injector.createChildInjector(new AbstractModule() {
            @Override
            protected void configure() {
              bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
              bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
              bind(CurrentUser.class).toInstance(remoteUser);
            }
          }).getInstance(ProjectControl.Factory.class);

      opFactory = injector.createChildInjector(new FactoryModule() {
        @Override
        protected void configure() {
          bind(PushReplication.ReplicationConfig.class).toInstance(ReplicationConfig.this);
          bind(RemoteConfig.class).toInstance(remote);
          factory(PushOp.Factory.class);
        }
      }).getInstance(PushOp.Factory.class);
    }

    private int getInt(final RemoteConfig rc, final Config cfg,
        final String name, final int defValue) {
      return cfg.getInt("remote", rc.getName(), name, defValue);
    }

    void schedule(final Project.NameKey project, final String ref,
        final URIish uri) {
      PerThreadRequestScope ctx = new PerThreadRequestScope();
      PerThreadRequestScope old = PerThreadRequestScope.set(ctx);
      try {
        try {
          if (!controlFor(project).isVisible()) {
            return;
          }
        } catch (NoSuchProjectException e1) {
          log.error("Internal error: project " + project
              + " not found during replication");
          return;
        }
      } finally {
        PerThreadRequestScope.set(old);
      }

      if (!replicatePermissions) {
        PushOp e;
        synchronized (pending) {
          e = pending.get(uri);
        }
        if (e == null) {
          Repository git;
          try {
            git = mgr.openRepository(project);
          } catch (RepositoryNotFoundException err) {
            log.error("Internal error: project " + project
                + " not found during replication", err);
            return;
          }
          try {
            Ref head = git.getRef(Constants.HEAD);
            if (head != null
                && head.isSymbolic()
                && GitRepositoryManager.REF_CONFIG.equals(head.getLeaf().getName())) {
              return;
            }
          } catch (IOException err) {
            log.error("Internal error: cannot check type of project " + project
                + " during replication", err);
            return;
          } finally {
            git.close();
          }
        }
      }

      synchronized (pending) {
        PushOp e = pending.get(uri);
        if (e == null) {
          e = opFactory.create(project, uri);
          pool.schedule(e, delay, TimeUnit.SECONDS);
          pending.put(uri, e);
        }
        e.addRef(ref);
      }
    }

    /**
     * It schedules again a PushOp instance.
     * <p>
     * It is assumed to be previously scheduled and found a
     * transport exception. It will schedule it as a push
     * operation to be retried after the minutes count
     * determined by class attribute retryDelay.
     * <p>
     * In case the PushOp instance to be scheduled has same
     * URI than one also pending for retry, it adds to the one
     * pending the refs list of the parameter instance.
     * <p>
     * In case the PushOp instance to be scheduled has same
     * URI than one pending, but not pending for retry, it
     * indicates the one pending should be canceled when it
     * starts executing, removes it from pending list, and
     * adds its refs to the parameter instance. The parameter
     * instance is scheduled for retry.
     * <p>
     * Notice all operations to indicate a PushOp should be
     * canceled, or it is retrying, or remove/add it from/to
     * pending Map should be protected by the lock on pending
     * Map class instance attribute.
     *
     * @param pushOp The PushOp instance to be scheduled.
     */
    void reschedule(final PushOp pushOp) {
      // It locks access to pending variable.
      synchronized (pending) {
        URIish uri = pushOp.getURI();
        PushOp pendingPushOp = pending.get(uri);

        if (pendingPushOp != null) {
          // There is one PushOp instance already pending to same URI.

          if (pendingPushOp.isRetrying()) {
            // The one pending is one already retrying, so it should
            // maintain it and add to it the refs of the one passed
            // as parameter to the method.

            // This scenario would happen if a PushOp has started running
            // and then before it failed due transport exception, another
            // one to same URI started. The first one would fail and would
            // be rescheduled, being present in pending list. When the
            // second one fails, it will also be rescheduled and then,
            // here, find out replication to its URI is already pending
            // for retry (blocking).
            pendingPushOp.addRefs(pushOp.getRefs());

          } else {
            // The one pending is one that is NOT retrying, it was just
            // scheduled believing no problem would happen. The one pending
            // should be canceled, and this is done by setting its canceled
            // flag, removing it from pending list, and adding its refs to
            // the pushOp instance that should then, later, in this method,
            // be scheduled for retry.

            // Notice that the PushOp found pending will start running and,
            // when notifying it is starting (with pending lock protection),
            // it will see it was canceled and then it will do nothing with
            // pending list and it will not execute its run implementation.

            pendingPushOp.cancel();
            pending.remove(uri);

            pushOp.addRefs(pendingPushOp.getRefs());
          }
        }

        if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
          // The PushOp method param instance should be scheduled for retry.
          // Remember when retrying it should be used different delay.

          pushOp.setToRetry();

          pending.put(uri, pushOp);
          pool.schedule(pushOp, retryDelay, TimeUnit.MINUTES);
        }
      }
    }

    ProjectControl controlFor(final Project.NameKey project)
        throws NoSuchProjectException {
      return projectControlFactory.controlFor(project);
    }

    void notifyStarting(final PushOp op) {
      synchronized (pending) {
        if (!op.wasCanceled()) {
          pending.remove(op.getURI());
        }
      }
    }

    boolean wouldPushRef(final String ref) {
      if (!replicatePermissions && GitRepositoryManager.REF_CONFIG.equals(ref)) {
        return false;
      }
      for (final RefSpec s : remote.getPushRefSpecs()) {
        if (s.matchSource(ref)) {
          return true;
        }
      }
      return false;
    }

    boolean isReplicatePermissions() {
      return replicatePermissions;
    }

    List<URIish> getURIs(final Project.NameKey project, final String urlMatch) {
      final List<URIish> r = new ArrayList<URIish>(remote.getURIs().size());
      for (URIish uri : remote.getURIs()) {
        if (matches(uri, urlMatch)) {
          String name = project.get();
          if (needsUrlEncoding(uri)) {
            name = encode(name);
          }
          String replacedPath = replace(uri.getPath(), "name", name);
          if (replacedPath != null) {
            uri = uri.setPath(replacedPath);
            r.add(uri);
          }
        }
      }
      return r;
    }

    static boolean needsUrlEncoding(URIish uri) {
      return "http".equalsIgnoreCase(uri.getScheme())
        || "https".equalsIgnoreCase(uri.getScheme())
        || "amazon-s3".equalsIgnoreCase(uri.getScheme());
    }

    static String encode(String str) {
      try {
        // Some cleanup is required. The '/' character is always encoded as %2F
        // however remote servers will expect it to be not encoded as part of the
        // path used to the repository. Space is incorrectly encoded as '+' for this
        // context. In the path part of a URI space should be %20, but in form data
        // space is '+'. Our cleanup replace fixes these two issues.
        return URLEncoder.encode(str, "UTF-8")
          .replaceAll("%2[fF]", "/")
          .replace("+", "%20");
      } catch (UnsupportedEncodingException e) {
        throw new RuntimeException(e);
      }
    }

    String[] getAdminUrls() {
      return this.adminUrls;
    }

    private boolean matches(URIish uri, final String urlMatch) {
      if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
        return true;
      }
      return uri.toString().contains(urlMatch);
    }
  }
}
