Allow events from other primaries to be dispatched

This results in core stream events command to also return
the events from other primaries.

Change-Id: Idb8334f8876b217fd32ca8b10937c4eaaa925a03
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
deleted file mode 100644
index 3336309..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
+++ /dev/null
@@ -1,101 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.events;
-
-import com.google.common.base.Supplier;
-import com.google.gerrit.entities.EntitiesAdapterFactory;
-import com.google.gerrit.entities.Project;
-import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.config.GerritServerConfigProvider;
-import com.google.gerrit.server.config.PluginConfig;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.gerrit.server.events.ProjectNameKeyAdapter;
-import com.google.gerrit.server.events.SupplierSerializer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class CoreListener implements EventListener {
-  private static Logger log = LoggerFactory.getLogger(CoreListener.class);
-
-  protected static final String KEY_FILTER = "filter";
-  protected static final String FILTER_TYPE_DROP = "DROP";
-  protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
-
-  protected static final Gson gson =
-      new GsonBuilder()
-          .registerTypeAdapter(Supplier.class, new SupplierSerializer())
-          .registerTypeAdapter(Project.NameKey.class, new ProjectNameKeyAdapter())
-          .registerTypeAdapterFactory(EntitiesAdapterFactory.create())
-          .create();
-
-  protected final String pluginName;
-  protected final GerritServerConfigProvider gerritServerConfigProvider;
-  protected final DynamicSet<StreamEventListener> listeners;
-  protected final EventStore store;
-  protected Set<String> dropEventNames = new HashSet<>();
-
-  @Inject
-  protected CoreListener(
-      @PluginName String pluginName,
-      GerritServerConfigProvider gerritServerConfigProvider,
-      EventStore store,
-      DynamicSet<StreamEventListener> listeners) {
-    this.pluginName = pluginName;
-    this.gerritServerConfigProvider = gerritServerConfigProvider;
-    this.store = store;
-    this.listeners = listeners;
-    readAndParseCfg();
-  }
-
-  @Override
-  public void onEvent(Event event) {
-    if (dropEventNames.contains(event.getClass().getName())) {
-      return;
-    }
-    try {
-      store.add(gson.toJson(event));
-    } catch (IOException e) {
-      log.error("Cannot add event to event store", e);
-    }
-    for (StreamEventListener l : listeners) {
-      l.onStreamEventUpdate();
-    }
-  }
-
-  protected void readAndParseCfg() {
-    PluginConfig cfg =
-        PluginConfig.createFromGerritConfig(pluginName, gerritServerConfigProvider.loadConfig());
-    for (String filter : cfg.getStringList(KEY_FILTER)) {
-      String pieces[] = filter.split(" ");
-      if (pieces.length == 3
-          && FILTER_TYPE_DROP.equals(pieces[0])
-          && FILTER_ELEMENT_CLASSNAME.equals(pieces[1])) {
-        dropEventNames.add(pieces[2]);
-      } else {
-        log.error("Ignoring invalid filter: " + filter);
-      }
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
new file mode 100644
index 0000000..ef73a36
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
@@ -0,0 +1,157 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.config.GerritServerConfigProvider;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventBroker;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.ProjectEvent;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.gerrit.server.events.UserScopedEventListener;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.plugincontext.PluginSetContext;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class FileSystemEventBroker extends EventBroker {
+  private static final Logger log = LoggerFactory.getLogger(FileSystemEventBroker.class);
+  protected final EventStore store;
+  protected final Gson gson;
+  protected final DynamicSet<StreamEventListener> streamEventListeners;
+  protected volatile long lastSent;
+  protected static final String KEY_FILTER = "filter";
+  protected static final String FILTER_TYPE_DROP = "DROP";
+  protected static final String FILTER_ELEMENT_CLASSNAME = "classname";
+  protected Set<String> dropEventNames = new HashSet<>();
+
+  @Inject
+  public FileSystemEventBroker(
+      PluginSetContext<UserScopedEventListener> listeners,
+      PluginSetContext<EventListener> unrestrictedListeners,
+      PermissionBackend permissionBackend,
+      ProjectCache projectCache,
+      ChangeNotes.Factory notesFactory,
+      @Nullable @GerritInstanceId String gerritInstanceId,
+      EventStore store,
+      @EventGson Gson gson,
+      DynamicSet<StreamEventListener> streamEventListeners,
+      GerritServerConfigProvider gerritServerConfigProvider,
+      @PluginName String pluginName)
+      throws IOException {
+    super(
+        listeners,
+        unrestrictedListeners,
+        permissionBackend,
+        projectCache,
+        notesFactory,
+        gerritInstanceId);
+    this.store = store;
+    this.gson = gson;
+    this.streamEventListeners = streamEventListeners;
+    lastSent = store.getHead();
+    readAndParseCfg(pluginName, gerritServerConfigProvider);
+  }
+
+  @Override
+  public void postEvent(Change change, ChangeEvent event) throws PermissionBackendException {
+    storeEvent(event);
+    sendAllPendingEvents();
+  }
+
+  @Override
+  public void postEvent(Project.NameKey projectName, ProjectEvent event) {
+    storeEvent(event);
+    try {
+      sendAllPendingEvents();
+    } catch (PermissionBackendException e) {
+      log.error("Permission Exception while dispatching the event. Will be tried again.", e);
+    }
+  }
+
+  @Override
+  protected void fireEvent(BranchNameKey branchName, RefEvent event)
+      throws PermissionBackendException {
+    storeEvent(event);
+    sendAllPendingEvents();
+  }
+
+  @Override
+  public void postEvent(Event event) throws PermissionBackendException {
+    storeEvent(event);
+    sendAllPendingEvents();
+  }
+
+  protected void storeEvent(Event event) {
+    if (dropEventNames.contains(event.getClass().getName())) {
+      return;
+    }
+    try {
+      store.add(gson.toJson(event));
+    } catch (IOException ex) {
+      log.error("Cannot add event to event store", ex);
+    }
+  }
+
+  protected synchronized void sendAllPendingEvents() throws PermissionBackendException {
+    try {
+      long current = store.getHead();
+      while (lastSent < current) {
+        long next = lastSent + 1;
+        fireEvent(gson.fromJson(store.get(next), Event.class));
+        lastSent = next;
+      }
+    } catch (IOException e) {
+      // Next Event would re-try the events.
+    }
+    for (StreamEventListener l : streamEventListeners) {
+      l.onStreamEventUpdate();
+    }
+  }
+
+  protected void readAndParseCfg(String pluginName, GerritServerConfigProvider configProvider) {
+    PluginConfig cfg = PluginConfig.createFromGerritConfig(pluginName, configProvider.loadConfig());
+    for (String filter : cfg.getStringList(KEY_FILTER)) {
+      String pieces[] = filter.split(" ");
+      if (pieces.length == 3
+          && FILTER_TYPE_DROP.equals(pieces[0])
+          && FILTER_ELEMENT_CLASSNAME.equals(pieces[1])) {
+        dropEventNames.add(pieces[2]);
+      } else {
+        log.error("Ignoring invalid filter: " + filter);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/Module.java b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
index 27830fb..c0e1271 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
@@ -14,8 +14,9 @@
 
 package com.googlesource.gerrit.plugins.events;
 
+import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.events.EventDispatcher;
 import com.google.inject.AbstractModule;
 import com.googlesource.gerrit.plugins.events.fsstore.FsStore;
 
@@ -24,6 +25,6 @@
   protected void configure() {
     DynamicSet.setOf(binder(), StreamEventListener.class);
     bind(EventStore.class).to(FsStore.class);
-    DynamicSet.bind(binder(), EventListener.class).to(CoreListener.class);
+    DynamicItem.bind(binder(), EventDispatcher.class).to(FileSystemEventBroker.class);
   }
 }