Merge branch 'stable-2.16'

* stable-2.16:
  Remove redundant StandardKeyEncoder reference
  Format build files with buildifier 0.19.2
  E2E integration test using Kafka docker container
  AbstractKafkaSubcriber: Add listener for dropped events
  Removed unused PeerInfoStrategy enum

Change-Id: Icded855cba1d1a0041605b9081712ea1f188726d
diff --git a/BUILD b/BUILD
index 08d9d5c..38da77a 100644
--- a/BUILD
+++ b/BUILD
@@ -17,8 +17,8 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
-        "@kafka_client//jar",
         "@commons-lang3//jar",
+        "@kafka_client//jar",
     ],
 )
 
@@ -45,5 +45,6 @@
         "@wiremock//jar",
         "@kafka_client//jar",
         "@testcontainers-kafka//jar",
+        "//lib/testcontainers",
     ],
 )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 472f962..678edd5 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -54,4 +54,4 @@
         name = "commons-lang3",
         artifact = "org.apache.commons:commons-lang3:3.6",
         sha1 = "9d28a6b23650e8a7e9063c04588ace6cf7012c17",
-    )
\ No newline at end of file
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index f6c617d..acfaae2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
@@ -62,13 +63,13 @@
   private final KafkaSubscriber subscriber;
   private final Kafka kafka;
 
-  public enum PeerInfoStrategy {
-    STATIC
-  }
-
   @Inject
   Configuration(PluginConfigFactory pluginConfigFactory, @PluginName String pluginName) {
-    Config cfg = pluginConfigFactory.getGlobalPluginConfig(pluginName);
+    this(pluginConfigFactory.getGlobalPluginConfig(pluginName));
+  }
+
+  @VisibleForTesting
+  public Configuration(Config cfg) {
     kafka = new Kafka(cfg);
     publisher = new KafkaPublisher(cfg);
     subscriber = new KafkaSubscriber(cfg);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 0b4efaa..0a0fe98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -38,12 +38,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class Module extends AbstractModule {
+public class Module extends AbstractModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private final Configuration config;
 
   @Inject
-  Module(Configuration config) {
+  public Module(Configuration config) {
     this.config = config;
   }
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
index 3481ac9..8a3b6dc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -14,10 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
-import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
@@ -30,7 +29,7 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 
-public class BrokerForwarderModule extends AbstractModule {
+public class BrokerForwarderModule extends LifecycleModule {
   private final KafkaPublisher kafkaPublisher;
 
   public BrokerForwarderModule(KafkaPublisher kafkaPublisher) {
@@ -40,7 +39,7 @@
   @Override
   protected void configure() {
     bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
-    DynamicSet.bind(binder(), LifecycleListener.class).to(BrokerPublisher.class);
+    listener().to(BrokerPublisher.class);
     bind(BrokerSession.class).to(KafkaSession.class);
 
     if (kafkaPublisher.enabledEvent(EventFamily.INDEX_EVENT)) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 3af9047..ec9ee2c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.permissions.PermissionBackendException;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
@@ -42,6 +43,7 @@
 
   private final KafkaConsumer<byte[], byte[]> consumer;
   private final ForwardedEventRouter eventRouter;
+  private final DynamicSet<DroppedEventListener> droppedEventListeners;
   private final Provider<Gson> gsonProvider;
   private final UUID instanceId;
   private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -54,11 +56,13 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ForwardedEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
     this.configuration = configuration;
     this.eventRouter = eventRouter;
+    this.droppedEventListeners = droppedEventListeners;
     this.gsonProvider = gsonProvider;
     this.instanceId = instanceId;
     this.oneOffCtx = oneOffCtx;
@@ -109,6 +113,7 @@
         logger.atFiner().log(
             "Dropping event %s produced by our instanceId %s",
             event.toString(), instanceId.toString());
+        droppedEventListeners.forEach(l -> l.onEventDropped(event));
       } else {
         try {
           logger.atInfo().log("Header[%s] Body[%s]", event.getHeader(), event.getBody());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
index 0c5dac9..01511be 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
new file mode 100644
index 0000000..418c58b
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+public interface DroppedEventListener {
+  /**
+   * Invoked when any event is dropped.
+   *
+   * @param event information about the event.
+   */
+  void onEventDropped(SourceAwareEventWrapper event);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 0720cf2..abef7e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       IndexEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 6eafc22..5bc7669 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -61,5 +61,7 @@
       DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
           .to(ProjectUpdateEventSubscriber.class);
     }
+
+    DynamicSet.setOf(binder(), DroppedEventListener.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 581dc2f..648746e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ProjectListUpdateRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 3ac08d5..756af54 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
@@ -34,6 +35,7 @@
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
+      DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
       OneOffRequestContext oneOffCtx) {
@@ -42,6 +44,7 @@
         keyDeserializer,
         valueDeserializer,
         eventRouter,
+        droppedEventListeners,
         gsonProvider,
         instanceId,
         oneOffCtx);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
index 68edd88..d47643a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -22,8 +22,6 @@
 
 import com.google.gerrit.reviewdb.client.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
-import com.google.gwtorm.client.KeyUtil;
-import com.google.gwtorm.server.StandardKeyEncoder;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation;
 import java.io.IOException;
@@ -101,8 +99,6 @@
 
     assertThat(Context.isForwardedEvent()).isFalse();
     try {
-      // Had to put this here to avoid a NPE during the index call
-      KeyUtil.setEncoderImpl(new StandardKeyEncoder());
       handler.index(uuid, Operation.INDEX, Optional.empty());
       fail("should have thrown an IOException");
     } catch (IOException e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
new file mode 100644
index 0000000..98953bf
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -0,0 +1,154 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.LogThreshold;
+import com.google.gerrit.acceptance.NoHttpd;
+import com.google.gerrit.acceptance.PushOneCommit;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.extensions.api.changes.ReviewInput;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+import com.googlesource.gerrit.plugins.multisite.Module;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Test;
+import org.testcontainers.containers.KafkaContainer;
+
+@NoHttpd
+@LogThreshold(level = "INFO")
+@TestPlugin(
+    name = "multi-site",
+    sysModule =
+        "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule")
+public class EventConsumerIT extends LightweightPluginDaemonTest {
+  private static final int QUEUE_POLL_TIMEOUT_MSECS = 2000;
+
+  public static class KafkaTestContainerModule extends LifecycleModule {
+
+    public class KafkaStopAtShutdown implements LifecycleListener {
+      private final KafkaContainer kafka;
+
+      public KafkaStopAtShutdown(KafkaContainer kafka) {
+        this.kafka = kafka;
+      }
+
+      @Override
+      public void stop() {
+        kafka.stop();
+      }
+
+      @Override
+      public void start() {
+        // Do nothing
+      }
+    }
+
+    @Override
+    protected void configure() {
+      final KafkaContainer kafka = new KafkaContainer();
+      kafka.start();
+
+      Config config = new Config();
+      config.setString("kafka", null, "bootstrapServers", kafka.getBootstrapServers());
+      config.setBoolean("kafka", "publisher", "enabled", true);
+      config.setBoolean("kafka", "subscriber", "enabled", true);
+      Configuration multiSiteConfig = new Configuration(config);
+      bind(Configuration.class).toInstance(multiSiteConfig);
+      install(new Module(multiSiteConfig));
+
+      listener().toInstance(new KafkaStopAtShutdown(kafka));
+    }
+  }
+
+  @Test
+  public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
+    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
+    drainQueue(droppedEventsQueue);
+
+    createChange();
+    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue, 2);
+    assertThat(createdChangeEvents).hasSize(2);
+
+    assertThat(createdChangeEvents).contains("change-index");
+    assertThat(createdChangeEvents).contains("ref-updated");
+  }
+
+  @Test
+  public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
+    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
+    PushOneCommit.Result r = createChange();
+    drainQueue(droppedEventsQueue);
+
+    ReviewInput in = ReviewInput.recommend();
+    in.message = "LGTM";
+    gApi.changes().id(r.getChangeId()).revision("current").review(in);
+
+    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue, 2);
+
+    assertThat(createdChangeEvents).hasSize(2);
+    assertThat(createdChangeEvents).contains("change-index");
+    assertThat(createdChangeEvents).contains("comment-added");
+  }
+
+  private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
+    LinkedBlockingQueue<SourceAwareEventWrapper> droppedEvents = new LinkedBlockingQueue<>();
+
+    TypeLiteral<DynamicSet<DroppedEventListener>> type =
+        new TypeLiteral<DynamicSet<DroppedEventListener>>() {};
+    plugin
+        .getSysInjector()
+        .getInstance(Key.get(type))
+        .add(
+            "multi-site",
+            new DroppedEventListener() {
+              @Override
+              public void onEventDropped(SourceAwareEventWrapper event) {
+                droppedEvents.offer(event);
+              }
+            });
+    return droppedEvents;
+  }
+
+  private List<String> receiveFromQueue(
+      LinkedBlockingQueue<SourceAwareEventWrapper> queue, int numEvents)
+      throws InterruptedException {
+    List<String> eventsList = new ArrayList<>();
+    for (int i = 0; i < numEvents; i++) {
+      SourceAwareEventWrapper event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS);
+      if (event != null) {
+        eventsList.add(event.getHeader().getEventType());
+      }
+    }
+    return eventsList;
+  }
+
+  private void drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
+      throws InterruptedException {
+    while (queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS) != null) {
+      // Just consume the event
+    }
+  }
+}