Avoid streaming check on dropped events Since dropped events don't need to be sent out to streamListeners, avoid the extra overhead of checking to see if they need to be fired to the streamListeners. This helps reduce the load on the event system as each update check at a minimum requires reading the on disk "head" value. A side effect is also that these events will also not trigger pending events from other primaries to fire, which likely results in significant extra and not very fruitful polling (see below). Assuming that events are distributed approximately evenly over all primaries, then for every event on one primary, there should be approximately one other event on every other primary. So for each event which gets stored, there is a chance that 50% of the other primaries have also stored an event still pending on the current primary. In these cases, flushing pending events is very likely to improve event firing latency over polling, and the more primaries there are, the more valuable this is as there will be more pending events. This means that there should on average always be one pending when there are 3 primaries. However, when it comes to dropped events, this logic doesn't hold. If one primary has a dropped event, so likely will the others, but those dropped events on the other primaries do no affect the current primary since they do not require firing from the current primary! Thus there is a low chance that this extra polling will be useful. This wasted polling effort was amplified by the fact that events configured to be dropped likely have a higher frequency then most of the events which are not dropped. Release-Notes: Dropped events now result in less IO, and won't fire pending events from other primaries Change-Id: Ic87a53924dbdf4ce1b26450a5fb5fb46308c9a20
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java index 2791a19..e8bf937 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java +++ b/src/main/java/com/googlesource/gerrit/plugins/events/FileSystemEventBroker.java
@@ -106,17 +106,17 @@ @Override public void postEvent(Change change, ChangeEvent event) throws PermissionBackendException { - storeEvent(event); + Drop drop = storeEvent(event); super.postEvent(change, event); - fireEventForStreamListeners(); + fireEventForStreamListeners(drop); } @Override public void postEvent(Project.NameKey projectName, ProjectEvent event) { - storeEvent(event); + Drop drop = storeEvent(event); super.postEvent(projectName, event); try { - fireEventForStreamListeners(); + fireEventForStreamListeners(drop); } catch (PermissionBackendException e) { log.error("Permission Exception while dispatching the event. Will be tried again.", e); } @@ -125,26 +125,28 @@ @Override public void postEvent(BranchNameKey branchName, RefEvent event) throws PermissionBackendException { - storeEvent(event); + Drop drop = storeEvent(event); super.postEvent(branchName, event); - fireEventForStreamListeners(); + fireEventForStreamListeners(drop); } @Override public void postEvent(Event event) throws PermissionBackendException { - storeEvent(event); + Drop drop = storeEvent(event); super.postEvent(event); - fireEventForStreamListeners(); + fireEventForStreamListeners(drop); } - protected void storeEvent(Event event) { + protected Drop storeEvent(Event event) { if (!isDropEvent(event)) { try { store.add(gson.toJson(event)); + return Drop.FALSE; } catch (IOException ex) { log.error("Cannot add event to event store", ex); } } + return Drop.TRUE; } protected boolean isDropEvent(Event event) { @@ -155,20 +157,26 @@ return false; } - public synchronized void fireEventForStreamListeners() throws PermissionBackendException { - try { - long current = store.getHead(); - while (lastSent < current) { - long next = lastSent + 1; - fireEventForUserScopedEventListener( - Type.STREAM, gson.fromJson(store.get(next), Event.class)); - lastSent = next; + public void fireEventForStreamListeners() throws PermissionBackendException { + fireEventForStreamListeners(Drop.FALSE); + } + + protected synchronized void fireEventForStreamListeners(Drop drop) throws PermissionBackendException { + if (!Drop.TRUE.equals(drop)) { + try { + long current = store.getHead(); + while (lastSent < current) { + long next = lastSent + 1; + fireEventForUserScopedEventListener( + Type.STREAM, gson.fromJson(store.get(next), Event.class)); + lastSent = next; + } + } catch (IOException e) { + // Next Event would re-try the events. } - } catch (IOException e) { - // Next Event would re-try the events. - } - for (StreamEventListener l : streamEventListeners) { - l.onStreamEventUpdate(); + for (StreamEventListener l : streamEventListeners) { + l.onStreamEventUpdate(); + } } } @@ -226,11 +234,16 @@ } } - public enum Type { + protected enum Type { STREAM, NON_STREAM } + protected enum Drop { + TRUE, + FALSE + } + protected List<PluginSetEntryContext<UserScopedEventListener>> getListeners(Type type) { List<PluginSetEntryContext<UserScopedEventListener>> filteredListeners = new ArrayList<>(); for (PluginSetEntryContext<UserScopedEventListener> c : listeners) {