blob: 180e5cbe3658326536752a857c26fc58d2b665f7 [file] [log] [blame]
// Copyright (C) 2022 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.events;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.BranchNameKey;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.config.GerritInstanceId;
import com.google.gerrit.server.config.GerritServerConfigProvider;
import com.google.gerrit.server.config.PluginConfig;
import com.google.gerrit.server.events.ChangeEvent;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventBroker;
import com.google.gerrit.server.events.EventGson;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.gerrit.server.events.RefEvent;
import com.google.gerrit.server.events.UserScopedEventListener;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.plugincontext.PluginSetContext;
import com.google.gerrit.server.plugincontext.PluginSetEntryContext;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class FileSystemEventBroker extends EventBroker {
private static final Logger log = LoggerFactory.getLogger(FileSystemEventBroker.class);
protected static final Predicate<Event> IS_NOTEDB_METAREF =
event -> {
if (event instanceof RefEvent) {
return RefNames.isNoteDbMetaRef(((RefEvent) event).getRefName());
}
return false;
};
protected static final String KEY_FILTER = "filter";
protected static final String FILTER_TYPE_DROP = "DROP";
protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
protected static final String FILTER_ELEMENT_EVENT_REFUPDATED = "RefUpdatedEvent";
protected static final String FILTER_TEST_IS_NOTEDB_METAREF = "isNoteDbMetaRef";
protected final EventStore store;
protected final Gson gson;
protected final DynamicSet<StreamEventListener> streamEventListeners;
protected long lastSent;
protected Predicate<Event> drop = e -> false;
protected Set<String> dropEventNames = new HashSet<>();
@Inject
public FileSystemEventBroker(
PluginSetContext<UserScopedEventListener> listeners,
PluginSetContext<EventListener> unrestrictedListeners,
PermissionBackend permissionBackend,
ProjectCache projectCache,
ChangeNotes.Factory notesFactory,
@Nullable @GerritInstanceId String gerritInstanceId,
EventStore store,
@EventGson Gson gson,
DynamicSet<StreamEventListener> streamEventListeners,
GerritServerConfigProvider gerritServerConfigProvider,
@PluginName String pluginName)
throws IOException {
super(
listeners,
unrestrictedListeners,
permissionBackend,
projectCache,
notesFactory,
gerritInstanceId);
this.store = store;
this.gson = gson;
this.streamEventListeners = streamEventListeners;
lastSent = store.getHead();
readAndParseCfg(pluginName, gerritServerConfigProvider);
}
@Override
public void postEvent(Change change, ChangeEvent event) throws PermissionBackendException {
Drop drop = storeEvent(event);
super.postEvent(change, event);
fireEventForStreamListeners(drop);
}
@Override
public void postEvent(Project.NameKey projectName, ProjectEvent event) {
Drop drop = storeEvent(event);
super.postEvent(projectName, event);
try {
fireEventForStreamListeners(drop);
} catch (PermissionBackendException e) {
log.error("Permission Exception while dispatching the event. Will be tried again.", e);
}
}
@Override
public void postEvent(BranchNameKey branchName, RefEvent event)
throws PermissionBackendException {
Drop drop = storeEvent(event);
super.postEvent(branchName, event);
fireEventForStreamListeners(drop);
}
@Override
public void postEvent(Event event) throws PermissionBackendException {
Drop drop = storeEvent(event);
super.postEvent(event);
fireEventForStreamListeners(drop);
}
protected Drop storeEvent(Event event) {
if (!isDropEvent(event)) {
try {
store.add(gson.toJson(event));
return Drop.FALSE;
} catch (IOException ex) {
log.error("Cannot add event to event store", ex);
}
}
return Drop.TRUE;
}
protected boolean isDropEvent(Event event) {
if (drop.test(event) || dropEventNames.contains(event.getClass().getName())) {
return true;
}
return false;
}
public void fireEventForStreamListeners() throws PermissionBackendException {
fireEventForStreamListeners(Drop.FALSE);
}
protected synchronized void fireEventForStreamListeners(Drop drop)
throws PermissionBackendException {
if (!Drop.TRUE.equals(drop)) {
try {
long current = store.getHead();
while (lastSent < current) {
long next = lastSent + 1;
fireEventForUserScopedEventListener(
Type.STREAM, gson.fromJson(store.get(next), Event.class));
lastSent = next;
}
} catch (IOException e) {
// Next Event would re-try the events.
}
for (StreamEventListener l : streamEventListeners) {
l.onStreamEventUpdate();
}
}
}
@Override
protected void fireEvent(Change change, ChangeEvent event) throws PermissionBackendException {
setInstanceIdWhenEmpty(event);
for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
CurrentUser user = c.call(UserScopedEventListener::getUser);
if (isVisibleTo(change, user)) {
c.run(l -> l.onEvent(event));
}
}
fireEventForUnrestrictedListeners(event);
}
@Override
protected void fireEvent(Project.NameKey project, ProjectEvent event) {
setInstanceIdWhenEmpty(event);
for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
CurrentUser user = c.call(UserScopedEventListener::getUser);
if (isVisibleTo(project, user)) {
c.run(l -> l.onEvent(event));
}
}
fireEventForUnrestrictedListeners(event);
}
@Override
protected void fireEvent(BranchNameKey branchName, RefEvent event)
throws PermissionBackendException {
setInstanceIdWhenEmpty(event);
for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(Type.NON_STREAM)) {
CurrentUser user = c.call(UserScopedEventListener::getUser);
if (isVisibleTo(branchName, user)) {
c.run(l -> l.onEvent(event));
}
}
fireEventForUnrestrictedListeners(event);
}
@Override
protected void fireEvent(Event event) throws PermissionBackendException {
setInstanceIdWhenEmpty(event);
fireEventForUserScopedEventListener(Type.NON_STREAM, event);
fireEventForUnrestrictedListeners(event);
}
protected void fireEventForUserScopedEventListener(Type type, Event event)
throws PermissionBackendException {
for (PluginSetEntryContext<UserScopedEventListener> c : getListeners(type)) {
CurrentUser user = c.call(UserScopedEventListener::getUser);
if (isVisibleTo(event, user)) {
c.run(l -> l.onEvent(event));
}
}
}
protected enum Type {
STREAM,
NON_STREAM
}
protected enum Drop {
TRUE,
FALSE
}
protected List<PluginSetEntryContext<UserScopedEventListener>> getListeners(Type type) {
List<PluginSetEntryContext<UserScopedEventListener>> filteredListeners = new ArrayList<>();
for (PluginSetEntryContext<UserScopedEventListener> c : listeners) {
if ((type == Type.STREAM) == isStreamListener(c.get())) {
filteredListeners.add(c);
}
}
return filteredListeners;
}
protected boolean isStreamListener(UserScopedEventListener l) {
return l.getClass().getName().startsWith("com.google.gerrit.sshd.commands.StreamEvents");
}
protected void readAndParseCfg(String pluginName, GerritServerConfigProvider configProvider) {
PluginConfig cfg = PluginConfig.createFromGerritConfig(pluginName, configProvider.loadConfig());
for (String filter : cfg.getStringList(KEY_FILTER)) {
String pieces[] = filter.split(" ");
if (pieces.length == 3) {
if (FILTER_TYPE_DROP.equals(pieces[0])) {
if (FILTER_ELEMENT_CLASSNAME.equals(pieces[1])) {
dropEventNames.add(pieces[2]);
continue;
}
if (FILTER_ELEMENT_EVENT_REFUPDATED.equals(pieces[1])
&& FILTER_TEST_IS_NOTEDB_METAREF.equals(pieces[2])) {
drop = IS_NOTEDB_METAREF;
continue;
}
}
}
log.error("Ignoring invalid filter: " + filter);
}
}
}