Merge branch 'stable-3.1'

* stable-3.1:
  Allow to send message to Kafka asynchronously
  Fix Kafka container-test when running on CI
  Increase default number of subscribers
  Re-run consumer when exited because of an error
  Trace errors that cause a consumer thread to exit
  Add singleton scope to KafkaSubscriberProperties

Change-Id: I0a65e161439307407d0ac3946cdcebdc0b87ead6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 7bb1f4f..72d7f91 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kafka.config;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
 import com.google.gerrit.extensions.annotations.PluginName;
@@ -33,6 +34,7 @@
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
   private final String topic;
+  private final boolean sendAsync;
 
   @Inject
   public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -40,10 +42,20 @@
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
     topic = fromGerritConfig.getString("topic", "gerrit");
+    sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
 
+  @VisibleForTesting
+  public KafkaProperties(boolean sendAsync) {
+    super();
+    setDefaults();
+    topic = "gerrit";
+    this.sendAsync = sendAsync;
+    initDockerizedKafkaServer();
+  }
+
   private void setDefaults() {
     put("acks", "all");
     put("retries", 0);
@@ -80,6 +92,10 @@
     return topic;
   }
 
+  public boolean isSendAsync() {
+    return sendAsync;
+  }
+
   public String getBootstrapServers() {
     return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index c3fd917..52d4726 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -14,14 +14,17 @@
 
 package com.googlesource.gerrit.plugins.kafka.config;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
+@Singleton
 public class KafkaSubscriberProperties extends KafkaProperties {
   private static final long serialVersionUID = 1L;
   private static final String DEFAULT_POLLING_INTERVAL_MS = "1000";
-  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "4";
+  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
 
   private final Integer pollingInterval;
   private final String groupId;
@@ -39,6 +42,14 @@
         Integer.parseInt(getProperty("number.of.subscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
   }
 
+  @VisibleForTesting
+  public KafkaSubscriberProperties(int pollingInterval, String groupId, int numberOfSubscribers) {
+    super(true);
+    this.pollingInterval = pollingInterval;
+    this.groupId = groupId;
+    this.numberOfSubscribers = numberOfSubscribers;
+  }
+
   public Integer getPollingInterval() {
     return pollingInterval;
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 52b242a..a50ae0a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -77,6 +77,13 @@
   }
 
   public boolean publish(String topic, String messageBody) {
+    if (properties.isSendAsync()) {
+      return publishAsync(topic, messageBody);
+    }
+    return publishSync(topic, messageBody);
+  }
+
+  private boolean publishSync(String topic, String messageBody) {
     Future<RecordMetadata> future =
         producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
     try {
@@ -88,4 +95,18 @@
       return false;
     }
   }
+
+  private boolean publishAsync(String topic, String messageBody) {
+    Future<RecordMetadata> future =
+        producer.send(
+            new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
+            (metadata, e) -> {
+              if (metadata != null && e == null) {
+                LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+              } else {
+                LOGGER.error("Cannot send the message", e);
+              }
+            });
+    return future != null;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index d925348..40415a4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -24,6 +24,7 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -33,8 +34,8 @@
 
 public class KafkaEventSubscriber {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
 
-  private final Consumer<byte[], byte[]> consumer;
   private final OneOffRequestContext oneOffCtx;
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -42,12 +43,15 @@
   private final KafkaSubscriberProperties configuration;
   private final ExecutorService executor;
   private final KafkaEventSubscriberMetrics subscriberMetrics;
+  private final KafkaConsumerFactory consumerFactory;
+  private final Deserializer<byte[]> keyDeserializer;
 
   private java.util.function.Consumer<EventMessage> messageProcessor;
-
   private String topic;
   private AtomicBoolean resetOffset = new AtomicBoolean(false);
 
+  private volatile ReceiverJob receiver;
+
   @Inject
   public KafkaEventSubscriber(
       KafkaSubscriberProperties configuration,
@@ -62,14 +66,8 @@
     this.oneOffCtx = oneOffCtx;
     this.executor = executor;
     this.subscriberMetrics = subscriberMetrics;
-
-    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
-      this.consumer = consumerFactory.create(keyDeserializer);
-    } finally {
-      Thread.currentThread().setContextClassLoader(previousClassLoader);
-    }
+    this.consumerFactory = consumerFactory;
+    this.keyDeserializer = keyDeserializer;
     this.valueDeserializer = valueDeserializer;
   }
 
@@ -78,13 +76,25 @@
     this.messageProcessor = messageProcessor;
     logger.atInfo().log(
         "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
-    consumer.subscribe(Collections.singleton(topic));
-    executor.execute(new ReceiverJob());
+    runReceiver();
+  }
+
+  private void runReceiver() {
+    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
+      Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
+      consumer.subscribe(Collections.singleton(topic));
+      receiver = new ReceiverJob(consumer);
+      executor.execute(receiver);
+    } finally {
+      Thread.currentThread().setContextClassLoader(previousClassLoader);
+    }
   }
 
   public void shutdown() {
     closed.set(true);
-    consumer.wakeup();
+    receiver.wakeup();
   }
 
   public java.util.function.Consumer<EventMessage> getMessageProcessor() {
@@ -100,13 +110,26 @@
   }
 
   private class ReceiverJob implements Runnable {
+    private final Consumer<byte[], byte[]> consumer;
+
+    public ReceiverJob(Consumer<byte[], byte[]> consumer) {
+      this.consumer = consumer;
+    }
+
+    public void wakeup() {
+      consumer.wakeup();
+    }
 
     @Override
     public void run() {
-      consume();
+      try {
+        consume();
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+      }
     }
 
-    private void consume() {
+    private void consume() throws InterruptedException {
       try {
         while (!closed.get()) {
           if (resetOffset.getAndSet(false)) {
@@ -130,13 +153,28 @@
         }
       } catch (WakeupException e) {
         // Ignore exception if closing
-        if (!closed.get()) throw e;
+        if (!closed.get()) {
+          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
+          reconnectAfterFailure();
+        }
       } catch (Exception e) {
         subscriberMetrics.incrementSubscriberFailedToPollMessages();
-        throw e;
+        logger.atSevere().withCause(e).log(
+            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+        reconnectAfterFailure();
       } finally {
         consumer.close();
       }
     }
+
+    private void reconnectAfterFailure() throws InterruptedException {
+      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+      // for avoiding hammering exactly at the same interval in case of failure
+      long reconnectDelay =
+          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+      Thread.sleep(reconnectDelay);
+      runReceiver();
+    }
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index faa5bf1..bfd907f 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -42,4 +42,9 @@
 
 `plugin.kafka-events.pollingIntervalMs`
 :	Polling interval in msec for receiving messages from Kafka topic subscription.
-	Default: 1000
\ No newline at end of file
+	Default: 1000
+
+`plugin.kafka-events.sendAsync`
+:	Send messages to Kafka asynchronously, detaching the calling process from the
+	acknowledge of the message being sent.
+	Default: true
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
new file mode 100644
index 0000000..64a2973
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -0,0 +1,210 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.containers.KafkaContainer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerApiTest {
+  private static KafkaContainer kafka;
+
+  private static final int TEST_NUM_SUBSCRIBERS = 1;
+  private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
+  private static final int TEST_POLLING_INTERVAL_MSEC = 100;
+  private static final int TEST_THREAD_POOL_SIZE = 10;
+  private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
+  private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
+  private static final int TEST_TIMEOUT = 30;
+
+  private Injector injector;
+  private KafkaSession session;
+  private Gson gson;
+
+  public static class TestKafkaContainer extends KafkaContainer {
+    public TestKafkaContainer() {
+      addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+      addFixedExposedPort(ZOOKEEPER_PORT, ZOOKEEPER_PORT);
+    }
+
+    @Override
+    public String getBootstrapServers() {
+      return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), KAFKA_PORT);
+    }
+  }
+
+  public static class TestWorkQueue extends WorkQueue {
+
+    @Inject
+    public TestWorkQueue(IdGenerator idGenerator, MetricMaker metrics) {
+      super(idGenerator, TEST_THREAD_POOL_SIZE, metrics);
+    }
+  }
+
+  public static class TestModule extends AbstractModule {
+    private KafkaProperties kafkaProperties;
+
+    public TestModule(KafkaProperties kafkaProperties) {
+      this.kafkaProperties = kafkaProperties;
+    }
+
+    @Override
+    protected void configure() {
+      bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+      bind(MetricMaker.class).toInstance(mock(MetricMaker.class, Answers.RETURNS_DEEP_STUBS));
+      bind(OneOffRequestContext.class)
+          .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
+
+      bind(KafkaProperties.class).toInstance(kafkaProperties);
+      bind(KafkaSession.class).in(Scopes.SINGLETON);
+      KafkaSubscriberProperties kafkaSubscriberProperties =
+          new KafkaSubscriberProperties(
+              TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
+      bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+      bind(WorkQueue.class).to(TestWorkQueue.class);
+    }
+  }
+
+  public static class TestConsumer implements Consumer<EventMessage> {
+    public final List<EventMessage> messages = new ArrayList<>();
+    private final CountDownLatch lock;
+
+    public TestConsumer(int numMessagesExpected) {
+      lock = new CountDownLatch(numMessagesExpected);
+    }
+
+    @Override
+    public void accept(EventMessage message) {
+      messages.add(message);
+      lock.countDown();
+    }
+
+    public boolean await() {
+      try {
+        return lock.await(TEST_TIMEOUT, TEST_TIMOUT_UNIT);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    }
+  }
+
+  public static class TestHeader extends Header {
+
+    public TestHeader() {
+      super(UUID.randomUUID(), TEST_INSTANCE_ID);
+    }
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    kafka = new TestKafkaContainer();
+    kafka.start();
+    System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (kafka != null) {
+      kafka.stop();
+    }
+  }
+
+  public void connectToKafka(KafkaProperties kafkaProperties) {
+    Injector baseInjector = Guice.createInjector(new TestModule(kafkaProperties));
+    WorkQueue testWorkQueue = baseInjector.getInstance(WorkQueue.class);
+    KafkaSubscriberProperties kafkaSubscriberProperties =
+        baseInjector.getInstance(KafkaSubscriberProperties.class);
+    injector =
+        baseInjector.createChildInjector(
+            new KafkaApiModule(testWorkQueue, kafkaSubscriberProperties));
+    session = injector.getInstance(KafkaSession.class);
+    gson = injector.getInstance(Gson.class);
+
+    session.connect();
+  }
+
+  @After
+  public void teardown() {
+    if (session != null) {
+      session.disconnect();
+    }
+  }
+
+  @Test
+  public void shouldSendSyncAndReceiveToTopic() {
+    connectToKafka(new KafkaProperties(false));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = "test_topic_sync";
+    TestConsumer testConsumer = new TestConsumer(1);
+    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+  }
+
+  @Test
+  public void shouldSendAsyncAndReceiveToTopic() {
+    connectToKafka(new KafkaProperties(true));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = "test_topic_async";
+    TestConsumer testConsumer = new TestConsumer(1);
+    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+  }
+}