Move Kafka consumer logic to a separate class

Moving Kafka consumer logic to a separate class will allow us
to use BrokerApi for message consuming.

Feature: Issue 10829
Change-Id: Ib070e132891c53ad33e9b7538213a3e1a3b5b2e0
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 457555b..64af68c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -14,13 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.permissions.PermissionBackendException;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.gwtorm.server.OrmException;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
@@ -31,105 +27,44 @@
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
 
 public abstract class AbstractKafkaSubcriber implements Runnable {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
-  private final Consumer<byte[], byte[]> consumer;
+  private final KafkaEventSubscriber subscriber;
   private final ForwardedEventRouter eventRouter;
   private final DynamicSet<DroppedEventListener> droppedEventListeners;
   private final Gson gson;
   private final UUID instanceId;
-  private final AtomicBoolean closed = new AtomicBoolean(false);
-  private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
-  private final KafkaConfiguration configuration;
-  private final KafkaConsumerFactory consumerFactory;
-  private final OneOffRequestContext oneOffCtx;
   private final MessageLogger msgLog;
   private SubscriberMetrics subscriberMetrics;
 
   public AbstractKafkaSubcriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      KafkaEventSubscriber subscriber,
       ForwardedEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
-    this.configuration = configuration;
-    this.consumerFactory = consumerFactory;
     this.eventRouter = eventRouter;
     this.droppedEventListeners = droppedEventListeners;
     this.gson = gson;
     this.instanceId = instanceId;
-    this.oneOffCtx = oneOffCtx;
     this.msgLog = msgLog;
     this.subscriberMetrics = subscriberMetrics;
-    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(AbstractKafkaSubcriber.class.getClassLoader());
-      this.consumer = consumerFactory.create(keyDeserializer, instanceId);
-    } finally {
-      Thread.currentThread().setContextClassLoader(previousClassLoader);
-    }
-    this.valueDeserializer = valueDeserializer;
+    this.subscriber = subscriber;
   }
 
   @Override
   public void run() {
-    final String topic = configuration.getKafka().getTopicAlias(getTopic());
-    subscribe(topic);
+    subscriber.subscribe(getTopic(), this::processRecord);
   }
 
   protected abstract EventTopic getTopic();
 
-  public void subscribe(String topic) {
-    try {
-
-      logger.atInfo().log("Kafka consumer subscribing to topic alias [%s]", topic);
-      consumer.subscribe(Collections.singleton(topic));
-      while (!closed.get()) {
-        ConsumerRecords<byte[], byte[]> consumerRecords =
-            consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
-        consumerRecords.forEach(
-            consumerRecord -> {
-              try (ManualRequestContext ctx = oneOffCtx.open()) {
-                SourceAwareEventWrapper event =
-                    valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
-                processRecord(event);
-              } catch (Exception e) {
-                logger.atSevere().withCause(e).log(
-                    "Malformed event '%s': [Exception: %s]",
-                    new String(consumerRecord.value(), UTF_8));
-                subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
-              }
-            });
-      }
-    } catch (WakeupException e) {
-      // Ignore exception if closing
-      if (!closed.get()) throw e;
-    } catch (Exception e) {
-      subscriberMetrics.incrementSubscriberFailedToPollMessages();
-      throw e;
-    } finally {
-      consumer.close();
-    }
-  }
-
   private void processRecord(SourceAwareEventWrapper event) {
 
     if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
@@ -156,7 +91,6 @@
 
   // Shutdown hook which can be called from a separate thread
   public void shutdown() {
-    closed.set(true);
-    consumer.wakeup();
+    subscriber.shutdown();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 9e55430..28d068e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
 public class IndexEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public IndexEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      KafkaEventSubscriber subscriber,
       IndexEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
+        subscriber,
         eventRouter,
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx,
         msgLog,
         subscriberMetrics);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index 6663a64..26725d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
 public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public KafkaCacheEvictionEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      KafkaEventSubscriber subscriber,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
+        subscriber,
         eventRouter,
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx,
         msgLog,
         subscriberMetrics);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
new file mode 100644
index 0000000..222cf94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventSubscriber {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Consumer<byte[], byte[]> consumer;
+  private final OneOffRequestContext oneOffCtx;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
+  private final KafkaConfiguration configuration;
+  private final SubscriberMetrics subscriberMetrics;
+
+  @Inject
+  public KafkaEventSubscriber(
+      KafkaConfiguration configuration,
+      KafkaConsumerFactory consumerFactory,
+      Deserializer<byte[]> keyDeserializer,
+      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      @InstanceId UUID instanceId,
+      OneOffRequestContext oneOffCtx,
+      SubscriberMetrics subscriberMetrics) {
+
+    this.configuration = configuration;
+    this.oneOffCtx = oneOffCtx;
+    this.subscriberMetrics = subscriberMetrics;
+
+    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+      this.consumer = consumerFactory.create(keyDeserializer, instanceId);
+    } finally {
+      Thread.currentThread().setContextClassLoader(previousClassLoader);
+    }
+    this.valueDeserializer = valueDeserializer;
+  }
+
+  public void subscribe(
+      EventTopic evenTopic, java.util.function.Consumer<SourceAwareEventWrapper> messageProcessor) {
+    try {
+      final String topic = configuration.getKafka().getTopicAlias(evenTopic);
+      logger.atInfo().log(
+          "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, evenTopic);
+      consumer.subscribe(Collections.singleton(topic));
+      while (!closed.get()) {
+        ConsumerRecords<byte[], byte[]> consumerRecords =
+            consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
+        consumerRecords.forEach(
+            consumerRecord -> {
+              try (ManualRequestContext ctx = oneOffCtx.open()) {
+                SourceAwareEventWrapper event =
+                    valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                messageProcessor.accept(event);
+              } catch (Exception e) {
+                logger.atSevere().withCause(e).log(
+                    "Malformed event '%s': [Exception: %s]",
+                    new String(consumerRecord.value(), UTF_8));
+                subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+              }
+            });
+      }
+    } catch (WakeupException e) {
+      // Ignore exception if closing
+      if (!closed.get()) throw e;
+    } catch (Exception e) {
+      subscriberMetrics.incrementSubscriberFailedToPollMessages();
+      throw e;
+    } finally {
+      consumer.close();
+    }
+  }
+
+  public void shutdown() {
+    closed.set(true);
+    consumer.wakeup();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index e581b0e..e47a515 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
 public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public ProjectUpdateEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      KafkaEventSubscriber subscriber,
       ProjectListUpdateRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
+        subscriber,
         eventRouter,
         droppedEventListeners,
         gson,
         instanceId,
-        oneOffCtx,
         msgLog,
         subscriberMetrics);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index dd882cd..f07d969 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
 import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -25,35 +24,25 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
 import java.util.UUID;
-import org.apache.kafka.common.serialization.Deserializer;
 
 @Singleton
 public class StreamEventSubscriber extends AbstractKafkaSubcriber {
   @Inject
   public StreamEventSubscriber(
-      KafkaConfiguration configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<SourceAwareEventWrapper> valueDeserializer,
+      KafkaEventSubscriber subscriber,
       StreamEventRouter eventRouter,
       DynamicSet<DroppedEventListener> droppedEventListeners,
       @BrokerGson Gson gson,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx,
       MessageLogger msgLog,
       SubscriberMetrics subscriberMetrics) {
     super(
-        configuration,
-        consumerFactory,
-        keyDeserializer,
-        valueDeserializer,
+        subscriber,
         eventRouter,
         droppedEventListeners,
         gson,
         instanceId,
-        oneOffCtx,
         msgLog,
         subscriberMetrics);
   }