Split event consumer logic into separate steps

First step to separate Kafka consumer logic from message processing and
routing. This will help us to extract Kafka related code into separate
class and than finally move it into the BrokerApi

Feature: Issue 10829
Change-Id: Iced0349d6e8cd1173fcf503e53c2d2ae5d91360d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index 906b987..457555b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -28,6 +28,7 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
@@ -37,7 +38,6 @@
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -91,15 +91,33 @@
 
   @Override
   public void run() {
+    final String topic = configuration.getKafka().getTopicAlias(getTopic());
+    subscribe(topic);
+  }
+
+  protected abstract EventTopic getTopic();
+
+  public void subscribe(String topic) {
     try {
-      final String topic = configuration.getKafka().getTopicAlias(getTopic());
-      logger.atInfo().log(
-          "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, getTopic());
+
+      logger.atInfo().log("Kafka consumer subscribing to topic alias [%s]", topic);
       consumer.subscribe(Collections.singleton(topic));
       while (!closed.get()) {
         ConsumerRecords<byte[], byte[]> consumerRecords =
             consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
-        consumerRecords.forEach(this::processRecord);
+        consumerRecords.forEach(
+            consumerRecord -> {
+              try (ManualRequestContext ctx = oneOffCtx.open()) {
+                SourceAwareEventWrapper event =
+                    valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                processRecord(event);
+              } catch (Exception e) {
+                logger.atSevere().withCause(e).log(
+                    "Malformed event '%s': [Exception: %s]",
+                    new String(consumerRecord.value(), UTF_8));
+                subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+              }
+            });
       }
     } catch (WakeupException e) {
       // Ignore exception if closing
@@ -112,38 +130,27 @@
     }
   }
 
-  protected abstract EventTopic getTopic();
+  private void processRecord(SourceAwareEventWrapper event) {
 
-  private void processRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
-    try (ManualRequestContext ctx = oneOffCtx.open()) {
-
-      SourceAwareEventWrapper event =
-          valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
-
-      if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
-        logger.atFiner().log(
-            "Dropping event %s produced by our instanceId %s",
-            event.toString(), instanceId.toString());
-        droppedEventListeners.forEach(l -> l.onEventDropped(event));
-      } else {
-        try {
-          msgLog.log(Direction.CONSUME, event);
-          eventRouter.route(event.getEventBody(gson));
-          subscriberMetrics.incrementSubscriberConsumedMessage();
-        } catch (IOException e) {
-          logger.atSevere().withCause(e).log(
-              "Malformed event '%s': [Exception: %s]", event.getHeader().getEventType());
-          subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
-        } catch (PermissionBackendException | OrmException e) {
-          logger.atSevere().withCause(e).log(
-              "Cannot handle message %s: [Exception: %s]", event.getHeader().getEventType());
-          subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
-        }
+    if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
+      logger.atFiner().log(
+          "Dropping event %s produced by our instanceId %s",
+          event.toString(), instanceId.toString());
+      droppedEventListeners.forEach(l -> l.onEventDropped(event));
+    } else {
+      try {
+        msgLog.log(Direction.CONSUME, event);
+        eventRouter.route(event.getEventBody(gson));
+        subscriberMetrics.incrementSubscriberConsumedMessage();
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log(
+            "Malformed event '%s': [Exception: %s]", event.getHeader().getEventType());
+        subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+      } catch (PermissionBackendException | OrmException | CacheNotFoundException e) {
+        logger.atSevere().withCause(e).log(
+            "Cannot handle message %s: [Exception: %s]", event.getHeader().getEventType());
+        subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
       }
-    } catch (Exception e) {
-      logger.atSevere().withCause(e).log(
-          "Malformed event '%s': [Exception: %s]", new String(consumerRecord.value(), UTF_8));
-      subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
     }
   }