// Copyright (C) 2010 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.events;

import com.google.gerrit.common.data.GlobalCapability;
import com.google.gerrit.extensions.annotations.RequiresCapability;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.extensions.registration.RegistrationHandle;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.sshd.BaseCommand;
import com.google.gerrit.sshd.CommandMetaData;
import com.google.gerrit.sshd.StreamCommandExecutor;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.Future;
import org.apache.sshd.server.Environment;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiresCapability(GlobalCapability.STREAM_EVENTS)
@CommandMetaData(name = "stream", description = "Monitor events occurring in real time")
public final class StreamEvents extends BaseCommand {
  private static final Logger log = LoggerFactory.getLogger(StreamEvents.class);

  protected static final int BATCH_SIZE = 32; // yield thread after
  protected static final Gson gson = new Gson();
  protected static final JsonParser parser = new JsonParser();

  @Option(
    name = "--resume-after",
    metaVar = "RESUME_AFTER",
    usage = "event id after which to resume playing events on connection"
  )
  protected void parseId(String arg) throws IOException {
    resume = 0;
    if (arg.equals("0")) {
      return;
    }

    String[] ids = arg.split(":");
    if (ids.length == 2) {
      if (!ids[0].equals(events.getUuid().toString())) { // store has changed
        return;
      }

      try {
        resume = new Long(ids[1]);
        return;
      } catch (NumberFormatException e) { // fall through
      }
    }
    throw new IllegalArgumentException("Invalid event Id: " + arg);
  }

  protected long resume = -1;

  @Option(name = "--ids", usage = "add ids to events (useful for resuming after a disconnect)")
  protected boolean includeIds = false;

  @Inject @StreamCommandExecutor protected WorkQueue.Executor threadPool;

  @Inject protected EventStore events;

  @Inject protected DynamicSet<StreamEventListener> subscriptionListeners;

  @Inject protected BranchHelper perms;

  @Inject protected IdentifiedUser currentUser;

  protected CancelableRunnable flusherRunnable;
  protected RegistrationHandle subscription;

  protected final Object crossThreadlock = new Object();
  protected Future<?> flusherTask;
  protected PrintWriter stdout;

  protected long sent;
  protected volatile boolean shuttingDown = false;

  @Override
  public void start(final Environment env) throws IOException {
    try {
      parseCommandLine();
    } catch (UnloggedFailure e) {
      String msg = e.getMessage();
      if (!msg.endsWith("\n")) {
        msg += "\n";
      }
      err.write(msg.getBytes("UTF-8"));
      err.flush();
      onExit(1);
      return;
    }
    stdout = toPrintWriter(out);

    initSent();
    flusherRunnable = createFlusherRunnable();
    subscribe();
    startFlush();
  }

  protected CancelableRunnable createFlusherRunnable() {
    return new CancelableRunnable() {
      @Override
      public void run() {
        try {
          flushBatch();
        } catch (IOException e) {
          log.error("Error Flushing Stream Events", e);
        }
      }

      @Override
      public void cancel() {
        onExit(0);
      }
    };
  }

  protected void initSent() throws IOException {
    long head = events.getHead();
    long tail = events.getTail();
    if (resume == -1 || resume > head) {
      sent = head;
    } else {
      sent = resume;
    }
    if (sent < tail) {
      sent = tail - 1;
    }
  }

  protected void startFlush() throws IOException {
    synchronized (crossThreadlock) {
      if (flusherTask == null && !shuttingDown) {
        if (sent < events.getHead()) {
          flusherTask = threadPool.submit(flusherRunnable);
        }
      }
    }
  }

  @Override
  protected void onExit(final int rc) {
    unsubscribe();
    synchronized (crossThreadlock) {
      shuttingDown = true;
    }
    super.onExit(rc);
  }

  @Override
  public void destroy() {
    unsubscribe();
    synchronized (crossThreadlock) {
      boolean alreadyShuttingDown = shuttingDown;
      shuttingDown = true;
      if (flusherTask != null) {
        flusherTask.cancel(true);
      } else if (!alreadyShuttingDown) {
        onExit(0);
      }
    }
  }

  protected void subscribe() {
    subscription =
        subscriptionListeners.add(
            new StreamEventListener() {
              @Override
              public void onStreamEventUpdate() {
                try {
                  startFlush();
                } catch (IOException e) {
                  log.error("Error starting to flushing Stream Events", e);
                }
              }
            });
  }

  protected void unsubscribe() {
    if (subscription != null) {
      subscription.remove();
      subscription = null;
    }
  }

  protected void flushBatch() throws IOException {
    String uuid = events.getUuid().toString();
    int processed = 0;
    long head = events.getHead();
    while (sent < head && processed < BATCH_SIZE) {
      long sending = sent + 1;
      String event = events.get(sending);
      if (Thread.interrupted() || stdout.checkError()) {
        onExit(0);
        return;
      }
      flush(uuid, sending, event);
      sent = sending;
      processed++;
    }
    synchronized (crossThreadlock) {
      flusherTask = null;
    }
    startFlush();
  }

  protected void flush(String uuid, long number, String json) {
    if (json != null) {
      JsonElement el = parser.parse(json);
      if (perms.isVisibleTo(el, currentUser)) {
        if (includeIds) {
          el.getAsJsonObject().addProperty("id", uuid + ":" + number);
          json = gson.toJson(el);
        }
        flush(json + "\n");
      }
    }
  }

  protected void flush(String msg) {
    synchronized (stdout) {
      stdout.print(msg);
      stdout.flush();
    }
  }
}
