Allow events from other primaries to be dispatched
This results in core stream events command to also return
the events from other primaries.
Change-Id: Idb8334f8876b217fd32ca8b10937c4eaaa925a03
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
deleted file mode 100644
index 3336309..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
+++ /dev/null
@@ -1,101 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.events;
-
-import com.google.common.base.Supplier;
-import com.google.gerrit.entities.EntitiesAdapterFactory;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.config.GerritServerConfigProvider;
-import com.google.gerrit.server.config.PluginConfig;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.ProjectNameKeyAdapter;
-import com.google.gerrit.server.events.SupplierSerializer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class CoreListener implements EventListener {
- private static Logger log = LoggerFactory.getLogger(CoreListener.class);
-
- protected static final String KEY_FILTER = "filter";
- protected static final String FILTER_TYPE_DROP = "DROP";
- protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
-
- protected static final Gson gson =
- new GsonBuilder()
- .registerTypeAdapter(Supplier.class, new SupplierSerializer())
- .registerTypeAdapter(Project.NameKey.class, new ProjectNameKeyAdapter())
- .registerTypeAdapterFactory(EntitiesAdapterFactory.create())
- .create();
-
- protected final String pluginName;
- protected final GerritServerConfigProvider gerritServerConfigProvider;
- protected final DynamicSet<StreamEventListener> listeners;
- protected final EventStore store;
- protected Set<String> dropEventNames = new HashSet<>();
-
- @Inject
- protected CoreListener(
- @PluginName String pluginName,
- GerritServerConfigProvider gerritServerConfigProvider,
- EventStore store,
- DynamicSet<StreamEventListener> listeners) {
- this.pluginName = pluginName;
- this.gerritServerConfigProvider = gerritServerConfigProvider;
- this.store = store;
- this.listeners = listeners;
- readAndParseCfg();
- }
-
- @Override
- public void onEvent(Event event) {
- if (dropEventNames.contains(event.getClass().getName())) {
- return;
- }
- try {
- store.add(gson.toJson(event));
- } catch (IOException e) {
- log.error("Cannot add event to event store", e);
- }
- for (StreamEventListener l : listeners) {
- l.onStreamEventUpdate();
- }
- }
-
- protected void readAndParseCfg() {
- PluginConfig cfg =
- PluginConfig.createFromGerritConfig(pluginName, gerritServerConfigProvider.loadConfig());
- for (String filter : cfg.getStringList(KEY_FILTER)) {
- String pieces[] = filter.split(" ");
- if (pieces.length == 3
- && FILTER_TYPE_DROP.equals(pieces[0])
- && FILTER_ELEMENT_CLASSNAME.equals(pieces[1])) {
- dropEventNames.add(pieces[2]);
- } else {
- log.error("Ignoring invalid filter: " + filter);
- }
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
new file mode 100644
index 0000000..ef73a36
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
@@ -0,0 +1,157 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.config.GerritServerConfigProvider;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventBroker;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.gerrit.server.events.UserScopedEventListener;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.plugincontext.PluginSetContext;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class FileSystemEventBroker extends EventBroker {
+ private static final Logger log = LoggerFactory.getLogger(FileSystemEventBroker.class);
+ protected final EventStore store;
+ protected final Gson gson;
+ protected final DynamicSet<StreamEventListener> streamEventListeners;
+ protected volatile long lastSent;
+ protected static final String KEY_FILTER = "filter";
+ protected static final String FILTER_TYPE_DROP = "DROP";
+ protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
+ protected Set<String> dropEventNames = new HashSet<>();
+
+ @Inject
+ public FileSystemEventBroker(
+ PluginSetContext<UserScopedEventListener> listeners,
+ PluginSetContext<EventListener> unrestrictedListeners,
+ PermissionBackend permissionBackend,
+ ProjectCache projectCache,
+ ChangeNotes.Factory notesFactory,
+ @Nullable @GerritInstanceId String gerritInstanceId,
+ EventStore store,
+ @EventGson Gson gson,
+ DynamicSet<StreamEventListener> streamEventListeners,
+ GerritServerConfigProvider gerritServerConfigProvider,
+ @PluginName String pluginName)
+ throws IOException {
+ super(
+ listeners,
+ unrestrictedListeners,
+ permissionBackend,
+ projectCache,
+ notesFactory,
+ gerritInstanceId);
+ this.store = store;
+ this.gson = gson;
+ this.streamEventListeners = streamEventListeners;
+ lastSent = store.getHead();
+ readAndParseCfg(pluginName, gerritServerConfigProvider);
+ }
+
+ @Override
+ public void postEvent(Change change, ChangeEvent event) throws PermissionBackendException {
+ storeEvent(event);
+ sendAllPendingEvents();
+ }
+
+ @Override
+ public void postEvent(Project.NameKey projectName, ProjectEvent event) {
+ storeEvent(event);
+ try {
+ sendAllPendingEvents();
+ } catch (PermissionBackendException e) {
+ log.error("Permission Exception while dispatching the event. Will be tried again.", e);
+ }
+ }
+
+ @Override
+ protected void fireEvent(BranchNameKey branchName, RefEvent event)
+ throws PermissionBackendException {
+ storeEvent(event);
+ sendAllPendingEvents();
+ }
+
+ @Override
+ public void postEvent(Event event) throws PermissionBackendException {
+ storeEvent(event);
+ sendAllPendingEvents();
+ }
+
+ protected void storeEvent(Event event) {
+ if (dropEventNames.contains(event.getClass().getName())) {
+ return;
+ }
+ try {
+ store.add(gson.toJson(event));
+ } catch (IOException ex) {
+ log.error("Cannot add event to event store", ex);
+ }
+ }
+
+ protected synchronized void sendAllPendingEvents() throws PermissionBackendException {
+ try {
+ long current = store.getHead();
+ while (lastSent < current) {
+ long next = lastSent + 1;
+ fireEvent(gson.fromJson(store.get(next), Event.class));
+ lastSent = next;
+ }
+ } catch (IOException e) {
+ // Next Event would re-try the events.
+ }
+ for (StreamEventListener l : streamEventListeners) {
+ l.onStreamEventUpdate();
+ }
+ }
+
+ protected void readAndParseCfg(String pluginName, GerritServerConfigProvider configProvider) {
+ PluginConfig cfg = PluginConfig.createFromGerritConfig(pluginName, configProvider.loadConfig());
+ for (String filter : cfg.getStringList(KEY_FILTER)) {
+ String pieces[] = filter.split(" ");
+ if (pieces.length == 3
+ && FILTER_TYPE_DROP.equals(pieces[0])
+ && FILTER_ELEMENT_CLASSNAME.equals(pieces[1])) {
+ dropEventNames.add(pieces[2]);
+ } else {
+ log.error("Ignoring invalid filter: " + filter);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/Module.java b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
index 27830fb..c0e1271 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
@@ -14,8 +14,9 @@
package com.googlesource.gerrit.plugins.events;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.inject.AbstractModule;
import com.googlesource.gerrit.plugins.events.fsstore.FsStore;
@@ -24,6 +25,6 @@
protected void configure() {
DynamicSet.setOf(binder(), StreamEventListener.class);
bind(EventStore.class).to(FsStore.class);
- DynamicSet.bind(binder(), EventListener.class).to(CoreListener.class);
+ DynamicItem.bind(binder(), EventDispatcher.class).to(FileSystemEventBroker.class);
}
}