Fix the topic events replay Kafka REST-API

The topic replay functionality was not tested properly and
resulted in the unnoticed breakage of the events replay
with the Confluent's REST-Proxy.

Fix tests and the Kafka REST-API client implementation of
the events replay.

Bug: Issue 15630
Change-Id: I57639a044e0512e30eb52b5a488622bf1c1a3075
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
index 0f7d0c0..d06b427 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
@@ -198,20 +198,21 @@
   }
 
   public HttpPost createPostSeekTopicFromBeginning(
-      URI consumerInstanceURI, String topic, Set<Integer> partitions) {
-    HttpPost post = new HttpPost(consumerInstanceURI.resolve("/positions/beginning"));
+      URI consumerUri, String topic, Set<Integer> partitions) {
+    HttpPost post =
+        new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/positions/beginning"));
     post.addHeader(HttpHeaders.ACCEPT, "*/*");
     post.setConfig(createRequestConfig());
     post.setEntity(
         new StringEntity(
             String.format(
-                "{\"partitions\",[%s]}",
+                "{\"partitions\":[%s]}",
                 partitions.stream()
                     .map(
                         partition ->
                             String.format("{\"topic\":\"%s\",\"partition\":%d}", topic, partition))
                     .collect(Collectors.joining(","))),
-            UTF_8));
+            ContentType.create(KAFKA_V2, UTF_8)));
     return post;
   }
 
@@ -249,11 +250,14 @@
 
   protected String getStringEntity(HttpResponse response) throws IOException {
     HttpEntity entity = response.getEntity();
-    try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) {
-      entity.writeTo(outStream);
-      outStream.close();
-      return outStream.toString(UTF_8);
+    if (entity != null) {
+      try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) {
+        entity.writeTo(outStream);
+        outStream.close();
+        return outStream.toString(UTF_8);
+      }
     }
+    return "";
   }
 
   private <V> ListenableFuture<V> listenableFutureOf(Future<V> future) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
index 0f4e04c..5ec5f95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
@@ -196,7 +196,7 @@
         while (!closed.get()) {
           if (resetOffset.getAndSet(false)) {
             restClient
-                .map(getTopicPartitions(), this::seekToBeginning)
+                .mapAsync(getTopicPartitions(), this::seekToBeginning)
                 .get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
           }
 
@@ -230,13 +230,12 @@
       }
     }
 
-    private Void seekToBeginning(Set<Integer> partitions) {
+    private ListenableFuture<HttpResponse> seekToBeginning(Set<Integer> partitions) {
       ListenableFuture<HttpPost> post =
           restClient.map(
               kafkaRestConsumerUri,
               uri -> restClient.createPostSeekTopicFromBeginning(uri, topic, partitions));
-      restClient.map(post, restClient::execute);
-      return null;
+      return restClient.mapAsync(post, p -> restClient.execute(p, HttpStatus.SC_NO_CONTENT));
     }
 
     private ListenableFuture<Set<Integer>> getTopicPartitions() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 6868794..2cbc65a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -123,29 +123,38 @@
 
   public static class TestConsumer implements Consumer<EventMessage> {
     public final List<EventMessage> messages = new ArrayList<>();
-    private CountDownLatch lock;
+    private CountDownLatch[] locks;
 
     public TestConsumer(int numMessagesExpected) {
       resetExpectedMessages(numMessagesExpected);
     }
 
     public void resetExpectedMessages(int numMessagesExpected) {
-      lock = new CountDownLatch(numMessagesExpected);
+      locks = new CountDownLatch[numMessagesExpected];
+      for (int i = 0; i < numMessagesExpected; i++) {
+        locks[i] = new CountDownLatch(i + 1);
+      }
     }
 
     @Override
     public void accept(EventMessage message) {
       messages.add(message);
-      lock.countDown();
+      for (CountDownLatch countDownLatch : locks) {
+        countDownLatch.countDown();
+      }
     }
 
     public boolean await() {
-      return await(TEST_TIMEOUT, TEST_TIMEOUT_UNIT);
+      return await(locks.length);
     }
 
-    public boolean await(long timeout, TimeUnit unit) {
+    public boolean await(int numItems) {
+      return await(numItems, TEST_TIMEOUT, TEST_TIMEOUT_UNIT);
+    }
+
+    public boolean await(int numItems, long timeout, TimeUnit unit) {
       try {
-        return lock.await(timeout, unit);
+        return locks[numItems - 1].await(timeout, unit);
       } catch (InterruptedException e) {
         return false;
       }
@@ -256,21 +265,20 @@
     connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_reset";
-    TestConsumer testConsumer = new TestConsumer(1);
     EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
 
+    TestConsumer testConsumer = new TestConsumer(2);
     kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
-    kafkaBrokerApi.send(testTopic, testEventMessage);
 
-    assertThat(testConsumer.await()).isTrue();
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    assertThat(testConsumer.await(1)).isTrue();
     assertThat(testConsumer.messages).hasSize(1);
     assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
 
     kafkaBrokerApi.replayAllEvents(testTopic);
-
-    assertThat(testConsumer.await()).isTrue();
-    assertThat(testConsumer.messages).hasSize(1);
-    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+    assertThat(testConsumer.await(2)).isTrue();
+    assertThat(testConsumer.messages).hasSize(2);
+    assertThat(gson.toJson(testConsumer.messages.get(1))).isEqualTo(gson.toJson(testEventMessage));
   }
 
   protected String getKafkaRestApiUriString() {
@@ -279,7 +287,7 @@
 
   private void assertNoMoreExpectedMessages(TestConsumer testConsumer) {
     testConsumer.resetExpectedMessages(1);
-    assertThat(testConsumer.await(TEST_WAIT_FOR_MORE_MESSAGES_TIMEOUT, TEST_TIMEOUT_UNIT))
+    assertThat(testConsumer.await(1, TEST_WAIT_FOR_MORE_MESSAGES_TIMEOUT, TEST_TIMEOUT_UNIT))
         .isFalse();
   }
 }