Allow to send message to Kafka asynchronously

Introduce a new configuration setting plugin.kafka-events.sendAsync
that allows to control the synchrony of the send() operation to a
Kafka producer.

The existing behaviour on stable-2.16 do not wait for the Kafka
broker to confirm that the message is sent to all the in-sync replicas.
By disabling the asynchronous send of message, the send() would wait
for the message to be sent  and return the boolean result
of the operation.

NOTE: The drawback of the enabling the sendAsync parameter is that
      the broker-api would only return the status of the successful
      invocation of the Produder.send() operation and not the actual
      ack received by the Broker at the successful replication to
      all the replicas.

Bug: Issue 12604
Change-Id: Iec5d1efb033d978bf12c47317895c68604dffecb
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 2adf28f..72d7f91 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -34,6 +34,7 @@
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
   private final String topic;
+  private final boolean sendAsync;
 
   @Inject
   public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
@@ -41,15 +42,17 @@
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
     topic = fromGerritConfig.getString("topic", "gerrit");
+    sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
 
   @VisibleForTesting
-  public KafkaProperties() {
+  public KafkaProperties(boolean sendAsync) {
     super();
     setDefaults();
     topic = "gerrit";
+    this.sendAsync = sendAsync;
     initDockerizedKafkaServer();
   }
 
@@ -89,6 +92,10 @@
     return topic;
   }
 
+  public boolean isSendAsync() {
+    return sendAsync;
+  }
+
   public String getBootstrapServers() {
     return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index c93db20..52d4726 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -44,6 +44,7 @@
 
   @VisibleForTesting
   public KafkaSubscriberProperties(int pollingInterval, String groupId, int numberOfSubscribers) {
+    super(true);
     this.pollingInterval = pollingInterval;
     this.groupId = groupId;
     this.numberOfSubscribers = numberOfSubscribers;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 52b242a..a50ae0a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -77,6 +77,13 @@
   }
 
   public boolean publish(String topic, String messageBody) {
+    if (properties.isSendAsync()) {
+      return publishAsync(topic, messageBody);
+    }
+    return publishSync(topic, messageBody);
+  }
+
+  private boolean publishSync(String topic, String messageBody) {
     Future<RecordMetadata> future =
         producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
     try {
@@ -88,4 +95,18 @@
       return false;
     }
   }
+
+  private boolean publishAsync(String topic, String messageBody) {
+    Future<RecordMetadata> future =
+        producer.send(
+            new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
+            (metadata, e) -> {
+              if (metadata != null && e == null) {
+                LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+              } else {
+                LOGGER.error("Cannot send the message", e);
+              }
+            });
+    return future != null;
+  }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index faa5bf1..bfd907f 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -42,4 +42,9 @@
 
 `plugin.kafka-events.pollingIntervalMs`
 :	Polling interval in msec for receiving messages from Kafka topic subscription.
-	Default: 1000
\ No newline at end of file
+	Default: 1000
+
+`plugin.kafka-events.sendAsync`
+:	Send messages to Kafka asynchronously, detaching the calling process from the
+	acknowledge of the message being sent.
+	Default: true
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index eafa191..64a2973 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -44,7 +44,6 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -55,9 +54,6 @@
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaBrokerApiTest {
   private static KafkaContainer kafka;
-  private static Injector injector;
-  private static KafkaSession session;
-  private static Gson gson;
 
   private static final int TEST_NUM_SUBSCRIBERS = 1;
   private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
@@ -67,6 +63,10 @@
   private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
   private static final int TEST_TIMEOUT = 30;
 
+  private Injector injector;
+  private KafkaSession session;
+  private Gson gson;
+
   public static class TestKafkaContainer extends KafkaContainer {
     public TestKafkaContainer() {
       addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
@@ -88,6 +88,11 @@
   }
 
   public static class TestModule extends AbstractModule {
+    private KafkaProperties kafkaProperties;
+
+    public TestModule(KafkaProperties kafkaProperties) {
+      this.kafkaProperties = kafkaProperties;
+    }
 
     @Override
     protected void configure() {
@@ -96,7 +101,6 @@
       bind(OneOffRequestContext.class)
           .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
 
-      KafkaProperties kafkaProperties = new KafkaProperties();
       bind(KafkaProperties.class).toInstance(kafkaProperties);
       bind(KafkaSession.class).in(Scopes.SINGLETON);
       KafkaSubscriberProperties kafkaSubscriberProperties =
@@ -142,16 +146,6 @@
     kafka = new TestKafkaContainer();
     kafka.start();
     System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
-
-    Injector baseInjector = Guice.createInjector(new TestModule());
-    WorkQueue testWorkQueue = baseInjector.getInstance(WorkQueue.class);
-    KafkaSubscriberProperties kafkaSubscriberProperties =
-        baseInjector.getInstance(KafkaSubscriberProperties.class);
-    injector =
-        baseInjector.createChildInjector(
-            new KafkaApiModule(testWorkQueue, kafkaSubscriberProperties));
-    session = injector.getInstance(KafkaSession.class);
-    gson = injector.getInstance(Gson.class);
   }
 
   @AfterClass
@@ -161,20 +155,32 @@
     }
   }
 
-  @Before
-  public void setup() {
+  public void connectToKafka(KafkaProperties kafkaProperties) {
+    Injector baseInjector = Guice.createInjector(new TestModule(kafkaProperties));
+    WorkQueue testWorkQueue = baseInjector.getInstance(WorkQueue.class);
+    KafkaSubscriberProperties kafkaSubscriberProperties =
+        baseInjector.getInstance(KafkaSubscriberProperties.class);
+    injector =
+        baseInjector.createChildInjector(
+            new KafkaApiModule(testWorkQueue, kafkaSubscriberProperties));
+    session = injector.getInstance(KafkaSession.class);
+    gson = injector.getInstance(Gson.class);
+
     session.connect();
   }
 
   @After
   public void teardown() {
-    session.disconnect();
+    if (session != null) {
+      session.disconnect();
+    }
   }
 
   @Test
-  public void shouldSendAndReceiveToTopic() {
+  public void shouldSendSyncAndReceiveToTopic() {
+    connectToKafka(new KafkaProperties(false));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
-    String testTopic = "test_topic";
+    String testTopic = "test_topic_sync";
     TestConsumer testConsumer = new TestConsumer(1);
     EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
 
@@ -185,4 +191,20 @@
     assertThat(testConsumer.messages).hasSize(1);
     assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
   }
+
+  @Test
+  public void shouldSendAsyncAndReceiveToTopic() {
+    connectToKafka(new KafkaProperties(true));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = "test_topic_async";
+    TestConsumer testConsumer = new TestConsumer(1);
+    EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+  }
 }