Fix the topic events replay Kafka REST-API
The topic replay functionality was not tested properly and
resulted in the unnoticed breakage of the events replay
with the Confluent's REST-Proxy.
Fix tests and the Kafka REST-API client implementation of
the events replay.
Bug: Issue 15630
Change-Id: I57639a044e0512e30eb52b5a488622bf1c1a3075
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
index 0f7d0c0..d06b427 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
@@ -198,20 +198,21 @@
}
public HttpPost createPostSeekTopicFromBeginning(
- URI consumerInstanceURI, String topic, Set<Integer> partitions) {
- HttpPost post = new HttpPost(consumerInstanceURI.resolve("/positions/beginning"));
+ URI consumerUri, String topic, Set<Integer> partitions) {
+ HttpPost post =
+ new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/positions/beginning"));
post.addHeader(HttpHeaders.ACCEPT, "*/*");
post.setConfig(createRequestConfig());
post.setEntity(
new StringEntity(
String.format(
- "{\"partitions\",[%s]}",
+ "{\"partitions\":[%s]}",
partitions.stream()
.map(
partition ->
String.format("{\"topic\":\"%s\",\"partition\":%d}", topic, partition))
.collect(Collectors.joining(","))),
- UTF_8));
+ ContentType.create(KAFKA_V2, UTF_8)));
return post;
}
@@ -249,11 +250,14 @@
protected String getStringEntity(HttpResponse response) throws IOException {
HttpEntity entity = response.getEntity();
- try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) {
- entity.writeTo(outStream);
- outStream.close();
- return outStream.toString(UTF_8);
+ if (entity != null) {
+ try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) {
+ entity.writeTo(outStream);
+ outStream.close();
+ return outStream.toString(UTF_8);
+ }
}
+ return "";
}
private <V> ListenableFuture<V> listenableFutureOf(Future<V> future) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
index 0f4e04c..5ec5f95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
@@ -196,7 +196,7 @@
while (!closed.get()) {
if (resetOffset.getAndSet(false)) {
restClient
- .map(getTopicPartitions(), this::seekToBeginning)
+ .mapAsync(getTopicPartitions(), this::seekToBeginning)
.get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
}
@@ -230,13 +230,12 @@
}
}
- private Void seekToBeginning(Set<Integer> partitions) {
+ private ListenableFuture<HttpResponse> seekToBeginning(Set<Integer> partitions) {
ListenableFuture<HttpPost> post =
restClient.map(
kafkaRestConsumerUri,
uri -> restClient.createPostSeekTopicFromBeginning(uri, topic, partitions));
- restClient.map(post, restClient::execute);
- return null;
+ return restClient.mapAsync(post, p -> restClient.execute(p, HttpStatus.SC_NO_CONTENT));
}
private ListenableFuture<Set<Integer>> getTopicPartitions() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 6868794..2cbc65a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -123,29 +123,38 @@
public static class TestConsumer implements Consumer<EventMessage> {
public final List<EventMessage> messages = new ArrayList<>();
- private CountDownLatch lock;
+ private CountDownLatch[] locks;
public TestConsumer(int numMessagesExpected) {
resetExpectedMessages(numMessagesExpected);
}
public void resetExpectedMessages(int numMessagesExpected) {
- lock = new CountDownLatch(numMessagesExpected);
+ locks = new CountDownLatch[numMessagesExpected];
+ for (int i = 0; i < numMessagesExpected; i++) {
+ locks[i] = new CountDownLatch(i + 1);
+ }
}
@Override
public void accept(EventMessage message) {
messages.add(message);
- lock.countDown();
+ for (CountDownLatch countDownLatch : locks) {
+ countDownLatch.countDown();
+ }
}
public boolean await() {
- return await(TEST_TIMEOUT, TEST_TIMEOUT_UNIT);
+ return await(locks.length);
}
- public boolean await(long timeout, TimeUnit unit) {
+ public boolean await(int numItems) {
+ return await(numItems, TEST_TIMEOUT, TEST_TIMEOUT_UNIT);
+ }
+
+ public boolean await(int numItems, long timeout, TimeUnit unit) {
try {
- return lock.await(timeout, unit);
+ return locks[numItems - 1].await(timeout, unit);
} catch (InterruptedException e) {
return false;
}
@@ -256,21 +265,20 @@
connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
String testTopic = "test_topic_reset";
- TestConsumer testConsumer = new TestConsumer(1);
EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+ TestConsumer testConsumer = new TestConsumer(2);
kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
- kafkaBrokerApi.send(testTopic, testEventMessage);
- assertThat(testConsumer.await()).isTrue();
+ kafkaBrokerApi.send(testTopic, testEventMessage);
+ assertThat(testConsumer.await(1)).isTrue();
assertThat(testConsumer.messages).hasSize(1);
assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
kafkaBrokerApi.replayAllEvents(testTopic);
-
- assertThat(testConsumer.await()).isTrue();
- assertThat(testConsumer.messages).hasSize(1);
- assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+ assertThat(testConsumer.await(2)).isTrue();
+ assertThat(testConsumer.messages).hasSize(2);
+ assertThat(gson.toJson(testConsumer.messages.get(1))).isEqualTo(gson.toJson(testEventMessage));
}
protected String getKafkaRestApiUriString() {
@@ -279,7 +287,7 @@
private void assertNoMoreExpectedMessages(TestConsumer testConsumer) {
testConsumer.resetExpectedMessages(1);
- assertThat(testConsumer.await(TEST_WAIT_FOR_MORE_MESSAGES_TIMEOUT, TEST_TIMEOUT_UNIT))
+ assertThat(testConsumer.await(1, TEST_WAIT_FOR_MORE_MESSAGES_TIMEOUT, TEST_TIMEOUT_UNIT))
.isFalse();
}
}