AbstractKafkaSubcriber: Add listener for dropped events

For now there is no implementation for this listener, but later we may
want to add listener for metrics or use it for end 2 end integration
tests.

Change-Id: I8a51a41fe5c4b08c1c39fe63ffc5dd284e761a60
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 3af9047..ec9ee2c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
@@ -42,6 +43,7 @@
 
   private final KafkaConsumer<byte[], byte[]> consumer;
   private final ForwardedEventRouter eventRouter;
+  private final DynamicSet<DroppedEventListener> droppedEventListeners;
   private final Provider<Gson> gsonProvider;
   private final UUID instanceId;
   private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -54,11 +56,13 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ForwardedEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
     this.configuration = configuration;
     this.eventRouter = eventRouter;
+    this.droppedEventListeners = droppedEventListeners;
     this.gsonProvider = gsonProvider;
     this.instanceId = instanceId;
     this.oneOffCtx = oneOffCtx;
@@ -109,6 +113,7 @@
         logger.atFiner().log(
             "Dropping event %s produced by our instanceId %s",
             event.toString(), instanceId.toString());
+        droppedEventListeners.forEach(l -> l.onEventDropped(event));
       } else {
         try {
           logger.atInfo().log("Header[%s] Body[%s]", event.getHeader(), event.getBody());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
index 0c5dac9..01511be 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
new file mode 100644
index 0000000..418c58b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+public interface DroppedEventListener {
+  /**
+   * Invoked when any event is dropped.
+   *
+   * @param event information about the event.
+   */
+  void onEventDropped(SourceAwareEventWrapper event);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 0720cf2..abef7e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       IndexEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 6eafc22..5bc7669 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -61,5 +61,7 @@
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
           .to(ProjectUpdateEventSubscriber.class);
     }
+
+    DynamicSet.setOf(binder(), DroppedEventListener.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 581dc2f..648746e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ProjectListUpdateRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 3ac08d5..756af54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);