// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.googlesource.gerrit.plugins.events;

import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.BranchNameKey;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.config.GerritServerConfigProvider;
import com.google.gerrit.server.config.PluginConfig;
import com.google.gerrit.server.events.ChangeEvent;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventBroker;
import com.google.gerrit.server.events.EventGson;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefEvent;
import com.google.gerrit.server.events.UserScopedEventListener;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.plugincontext.PluginSetContext;
import com.google.gerrit.server.plugincontext.PluginSetEntryContext;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class FileSystemEventBroker extends EventBroker {
  private static final Logger log = LoggerFactory.getLogger(FileSystemEventBroker.class);

  protected static final Predicate<Event> IS_NOTEDB_METAREF =
      event -> {
        if (event instanceof RefEvent) {
          return RefNames.isNoteDbMetaRef(((RefEvent) event).getRefName());
        }
        return false;
      };

  protected static final String KEY_FILTER = "filter";
  protected static final String FILTER_TYPE_DROP = "DROP";
  protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
  protected static final String FILTER_ELEMENT_EVENT_REFUPDATED = "RefUpdatedEvent";
  protected static final String FILTER_TEST_IS_NOTEDB_METAREF = "isNoteDbMetaRef";

  protected final EventStore store;
  protected final Gson gson;
  protected final DynamicSet<StreamEventListener> streamEventListeners;

  protected long lastSent;
  protected Predicate<Event> drop = e -> false;
  protected Set<String> dropEventNames = new HashSet<>();

  @Inject
  public FileSystemEventBroker(
      PluginSetContext<UserScopedEventListener> listeners,
      PluginSetContext<EventListener> unrestrictedListeners,
      PermissionBackend permissionBackend,
      ProjectCache projectCache,
      ChangeNotes.Factory notesFactory,
      @Nullable @GerritInstanceId String gerritInstanceId,
      EventStore store,
      @EventGson Gson gson,
      DynamicSet<StreamEventListener> streamEventListeners,
      GerritServerConfigProvider gerritServerConfigProvider,
      @PluginName String pluginName)
      throws IOException {
    super(
        listeners,
        unrestrictedListeners,
        permissionBackend,
        projectCache,
        notesFactory,
        gerritInstanceId);
    this.store = store;
    this.gson = gson;
    this.streamEventListeners = streamEventListeners;
    lastSent = store.getHead();
    readAndParseCfg(pluginName, gerritServerConfigProvider);
  }

  @Override
  public void postEvent(Change change, ChangeEvent event) throws PermissionBackendException {
    Drop drop = storeEvent(event);
    super.postEvent(change, event);
    fireEventForStreamListeners(drop);
  }

  @Override
  public void postEvent(Project.NameKey projectName, ProjectEvent event) {
    Drop drop = storeEvent(event);
    super.postEvent(projectName, event);
    try {
      fireEventForStreamListeners(drop);
    } catch (PermissionBackendException e) {
      log.error("Permission Exception while dispatching the event. Will be tried again.", e);
    }
  }

  @Override
  public void postEvent(BranchNameKey branchName, RefEvent event)
      throws PermissionBackendException {
    Drop drop = storeEvent(event);
    super.postEvent(branchName, event);
    fireEventForStreamListeners(drop);
  }

  @Override
  public void postEvent(Event event) throws PermissionBackendException {
    Drop drop = storeEvent(event);
    super.postEvent(event);
    fireEventForStreamListeners(drop);
  }

  protected Drop storeEvent(Event event) {
    if (!isDropEvent(event)) {
      try {
        store.add(gson.toJson(event));
        return Drop.FALSE;
      } catch (IOException ex) {
        log.error("Cannot add event to event store", ex);
      }
    }
    return Drop.TRUE;
  }

  protected boolean isDropEvent(Event event) {
    if (drop.test(event) || dropEventNames.contains(event.getClass().getName())) {
      return true;
    }
    return false;
  }

  public void fireEventForStreamListeners() throws PermissionBackendException {
    fireEventForStreamListeners(Drop.FALSE);
  }

  protected synchronized void fireEventForStreamListeners(Drop drop)
      throws PermissionBackendException {
    if (!Drop.TRUE.equals(drop)) {
      try {
        long current = store.getHead();
        while (lastSent < current) {
          long next = lastSent + 1;
          fireEventForUserScopedEventListener(
              Type.STREAM, gson.fromJson(store.get(next), Event.class));
          lastSent = next;
        }
      } catch (IOException e) {
        // Next Event would re-try the events.
      }
      for (StreamEventListener l : streamEventListeners) {
        l.onStreamEventUpdate();
      }
    }
  }

  @Override
  protected void fireEvent(Change change, ChangeEvent event) throws PermissionBackendException {
    setInstanceIdWhenEmpty(event);
    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
      CurrentUser user = c.call(UserScopedEventListener::getUser);
      if (isVisibleTo(change, user)) {
        c.run(l -> l.onEvent(event));
      }
    }
    fireEventForUnrestrictedListeners(event);
  }

  @Override
  protected void fireEvent(Project.NameKey project, ProjectEvent event) {
    setInstanceIdWhenEmpty(event);
    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
      CurrentUser user = c.call(UserScopedEventListener::getUser);
      if (isVisibleTo(project, user)) {
        c.run(l -> l.onEvent(event));
      }
    }
    fireEventForUnrestrictedListeners(event);
  }

  @Override
  protected void fireEvent(BranchNameKey branchName, RefEvent event)
      throws PermissionBackendException {
    setInstanceIdWhenEmpty(event);
    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
      CurrentUser user = c.call(UserScopedEventListener::getUser);
      if (isVisibleTo(branchName, user)) {
        c.run(l -> l.onEvent(event));
      }
    }
    fireEventForUnrestrictedListeners(event);
  }

  @Override
  protected void fireEvent(Event event) throws PermissionBackendException {
    setInstanceIdWhenEmpty(event);
    fireEventForUserScopedEventListener(Type.NON_STREAM, event);
    fireEventForUnrestrictedListeners(event);
  }

  protected void fireEventForUserScopedEventListener(Type type, Event event)
      throws PermissionBackendException {
    for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(type)) {
      CurrentUser user = c.call(UserScopedEventListener::getUser);
      if (isVisibleTo(event, user)) {
        c.run(l -> l.onEvent(event));
      }
    }
  }

  protected enum Type {
    STREAM,
    NON_STREAM
  }

  protected enum Drop {
    TRUE,
    FALSE
  }

  protected List<PluginSetEntryContext<UserScopedEventListener>> getListeners(Type type) {
    List<PluginSetEntryContext<UserScopedEventListener>> filteredListeners = new ArrayList<>();
    for (PluginSetEntryContext<UserScopedEventListener> c : listeners) {
      if ((type == Type.STREAM) == isStreamListener(c.get())) {
        filteredListeners.add(c);
      }
    }
    return filteredListeners;
  }

  protected boolean isStreamListener(UserScopedEventListener l) {
    return l.getClass().getName().startsWith("com.google.gerrit.sshd.commands.StreamEvents");
  }

  protected void readAndParseCfg(String pluginName, GerritServerConfigProvider configProvider) {
    PluginConfig cfg = PluginConfig.createFromGerritConfig(pluginName, configProvider.loadConfig());
    for (String filter : cfg.getStringList(KEY_FILTER)) {
      String pieces[] = filter.split(" ");
      if (pieces.length == 3) {
        if (FILTER_TYPE_DROP.equals(pieces[0])) {
          if (FILTER_ELEMENT_CLASSNAME.equals(pieces[1])) {
            dropEventNames.add(pieces[2]);
            continue;
          }
          if (FILTER_ELEMENT_EVENT_REFUPDATED.equals(pieces[1])
              && FILTER_TEST_IS_NOTEDB_METAREF.equals(pieces[2])) {
            drop = IS_NOTEDB_METAREF;
            continue;
          }
        }
      }
      log.error("Ignoring invalid filter: " + filter);
    }
  }
}
