Avoid direct KafkaConsumer creation
KafkaConsumer is a dependency for an AbractKafkaSubscriber so it should
be created by the factory injected by guice. This change will help us to
write AbstractKafkaSubscriver unit tests.
Change-Id: Id32458f14151ee427bcfd1557d069762648d2fb3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index e3ca413..4fa4b43 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -35,17 +35,16 @@
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
public abstract class AbstractKafkaSubcriber implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final KafkaConsumer<byte[], byte[]> consumer;
+ private final Consumer<byte[], byte[]> consumer;
private final ForwardedEventRouter eventRouter;
private final DynamicSet<DroppedEventListener> droppedEventListeners;
private final Gson gson;
@@ -58,6 +57,7 @@
public AbstractKafkaSubcriber(
KafkaConfiguration configuration,
+ KafkaConsumerFactory consumerFactory,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
ForwardedEventRouter eventRouter,
@@ -76,11 +76,7 @@
final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(AbstractKafkaSubcriber.class.getClassLoader());
- this.consumer =
- new KafkaConsumer<>(
- configuration.kafkaSubscriber().initPropsWith(instanceId),
- keyDeserializer,
- new ByteArrayDeserializer());
+ this.consumer = consumerFactory.create(keyDeserializer, instanceId);
} finally {
Thread.currentThread().setContextClassLoader(previousClassLoader);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 09938db..8c0f124 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -33,6 +33,7 @@
@Inject
public IndexEventSubscriber(
KafkaConfiguration configuration,
+ KafkaConsumerFactory consumerFactory,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
IndexEventRouter eventRouter,
@@ -43,6 +44,7 @@
MessageLogger msgLog) {
super(
configuration,
+ consumerFactory,
keyDeserializer,
valueDeserializer,
eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index ca07e78..8adc836 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -33,6 +33,7 @@
@Inject
public KafkaCacheEvictionEventSubscriber(
KafkaConfiguration configuration,
+ KafkaConsumerFactory consumerFactory,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
StreamEventRouter eventRouter,
@@ -43,6 +44,7 @@
MessageLogger msgLog) {
super(
configuration,
+ consumerFactory,
keyDeserializer,
valueDeserializer,
eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
new file mode 100644
index 0000000..9a5e19f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+
+@Singleton
+public class KafkaConsumerFactory {
+ private KafkaConfiguration config;
+
+ @Inject
+ public KafkaConsumerFactory(KafkaConfiguration configuration) {
+ this.config = configuration;
+ }
+
+ public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer, UUID instanceId) {
+ return new KafkaConsumer<>(
+ config.kafkaSubscriber().initPropsWith(instanceId),
+ keyDeserializer,
+ new ByteArrayDeserializer());
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 2ba33e4..2c00441 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -33,6 +33,7 @@
@Inject
public ProjectUpdateEventSubscriber(
KafkaConfiguration configuration,
+ KafkaConsumerFactory consumerFactory,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
ProjectListUpdateRouter eventRouter,
@@ -43,6 +44,7 @@
MessageLogger msgLog) {
super(
configuration,
+ consumerFactory,
keyDeserializer,
valueDeserializer,
eventRouter,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 77f3c85..b5f2e43 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -33,6 +33,7 @@
@Inject
public StreamEventSubscriber(
KafkaConfiguration configuration,
+ KafkaConsumerFactory consumerFactory,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
StreamEventRouter eventRouter,
@@ -43,6 +44,7 @@
MessageLogger msgLog) {
super(
configuration,
+ consumerFactory,
keyDeserializer,
valueDeserializer,
eventRouter,