Avoid direct KafkaConsumer creation

KafkaConsumer is a dependency for an AbractKafkaSubscriber so it should
be created by the factory injected by guice. This change will help us to
write AbstractKafkaSubscriver unit tests.

Change-Id: Id32458f14151ee427bcfd1557d069762648d2fb3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index e3ca413..4fa4b43 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -35,17 +35,16 @@
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 
 public abstract class AbstractKafkaSubcriber implements Runnable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final KafkaConsumer<byte[], byte[]> consumer;
+  private final Consumer<byte[], byte[]> consumer;
   private final ForwardedEventRouter eventRouter;
   private final DynamicSet<DroppedEventListener> droppedEventListeners;
   private final Gson gson;
@@ -58,6 +57,7 @@
 
   public AbstractKafkaSubcriber(
       KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ForwardedEventRouter eventRouter,
@@ -76,11 +76,7 @@
     final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(AbstractKafkaSubcriber.class.getClassLoader());
-      this.consumer =
-          new KafkaConsumer<>(
-              configuration.kafkaSubscriber().initPropsWith(instanceId),
-              keyDeserializer,
-              new ByteArrayDeserializer());
+      this.consumer = consumerFactory.create(keyDeserializer, instanceId);
     } finally {
       Thread.currentThread().setContextClassLoader(previousClassLoader);
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 09938db..8c0f124 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -33,6 +33,7 @@
   @Inject
   public IndexEventSubscriber(
       KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       IndexEventRouter eventRouter,
@@ -43,6 +44,7 @@
       MessageLogger msgLog) {
     super(
         configuration,
+        consumerFactory,
         keyDeserializer,
         valueDeserializer,
         eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index ca07e78..8adc836 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -33,6 +33,7 @@
   @Inject
   public KafkaCacheEvictionEventSubscriber(
       KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
@@ -43,6 +44,7 @@
       MessageLogger msgLog) {
     super(
         configuration,
+        consumerFactory,
         keyDeserializer,
         valueDeserializer,
         eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
new file mode 100644
index 0000000..9a5e19f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+@Singleton
+public class KafkaConsumerFactory {
+  private KafkaConfiguration config;
+
+  @Inject
+  public KafkaConsumerFactory(KafkaConfiguration configuration) {
+    this.config = configuration;
+  }
+
+  public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer, UUID instanceId) {
+    return new KafkaConsumer<>(
+        config.kafkaSubscriber().initPropsWith(instanceId),
+        keyDeserializer,
+        new ByteArrayDeserializer());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 2ba33e4..2c00441 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -33,6 +33,7 @@
   @Inject
   public ProjectUpdateEventSubscriber(
       KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       ProjectListUpdateRouter eventRouter,
@@ -43,6 +44,7 @@
       MessageLogger msgLog) {
     super(
         configuration,
+        consumerFactory,
         keyDeserializer,
         valueDeserializer,
         eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 77f3c85..b5f2e43 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -33,6 +33,7 @@
   @Inject
   public StreamEventSubscriber(
       KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
       Deserializer<byte[]> keyDeserializer,
       Deserializer<SourceAwareEventWrapper> valueDeserializer,
       StreamEventRouter eventRouter,
@@ -43,6 +44,7 @@
       MessageLogger msgLog) {
     super(
         configuration,
+        consumerFactory,
         keyDeserializer,
         valueDeserializer,
         eventRouter,