Implement async send method as per 3.4.0-rc2 API
Bug: Issue 14411
Change-Id: If44260c12d924ba62a9c066058f2b7aa9fc93d69
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index e059b34..4b24917 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.4.0-rc0",
- sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+ artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+ sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 9a7c66a..7eabf2d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -17,6 +17,7 @@
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -42,7 +43,7 @@
}
@Override
- public boolean send(String topic, EventMessage event) {
+ public ListenableFuture<Boolean> send(String topic, EventMessage event) {
return publisher.publish(topic, event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index cc271b5..7b8d480 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -16,6 +16,7 @@
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
@@ -53,7 +54,7 @@
}
}
- public boolean publish(String topic, EventMessage event) {
+ public ListenableFuture<Boolean> publish(String topic, EventMessage event) {
return session.publish(topic, getPayload(event));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index bb79cb5..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@
package com.googlesource.gerrit.plugins.kafka.session;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.Objects;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -80,34 +86,35 @@
producer = null;
}
- public void publish(String messageBody) {
- publish(properties.getTopic(), messageBody);
+ public ListenableFuture<Boolean> publish(String messageBody) {
+ return publish(properties.getTopic(), messageBody);
}
- public boolean publish(String topic, String messageBody) {
+ public ListenableFuture<Boolean> publish(String topic, String messageBody) {
if (properties.isSendAsync()) {
return publishAsync(topic, messageBody);
}
return publishSync(topic, messageBody);
}
- private boolean publishSync(String topic, String messageBody) {
-
+ private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
+ SettableFuture<Boolean> resultF = SettableFuture.create();
try {
Future<RecordMetadata> future =
producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
RecordMetadata metadata = future.get();
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
publisherMetrics.incrementBrokerPublishedMessage();
- return true;
+ resultF.set(true);
+ return resultF;
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
- return false;
+ return Futures.immediateFailedFuture(e);
}
}
- private boolean publishAsync(String topic, String messageBody) {
+ private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
try {
Future<RecordMetadata> future =
producer.send(
@@ -121,11 +128,16 @@
publisherMetrics.incrementBrokerFailedToPublishMessage();
}
});
- return future != null;
+
+ // The transformation is lightweight, so we can afford using a directExecutor
+ return Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(future),
+ Objects::nonNull,
+ MoreExecutors.directExecutor());
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
- return false;
+ return Futures.immediateFailedFuture(e);
}
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index 211dd4f..8ea5f3f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -37,7 +37,7 @@
public void kafkaEventDeserializerShouldParseAKafkaEvent() {
final UUID eventId = UUID.randomUUID();
final String eventType = "event-type";
- final UUID sourceInstanceId = UUID.randomUUID();
+ final String sourceInstanceId = UUID.randomUUID().toString();
final long eventCreatedOn = 10L;
final String eventJson =
String.format(