Merge branch 'stable-3.3' into stable-3.4
* stable-3.3:
Deserialize Event and EventMessage
Change-Id: I80ad1d967b164e2284cd2bcbc63190630fbbd4e2
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index bd4656f..4b24917 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.3.2",
- sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+ artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+ sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 1be52cb..2940b5e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,9 +14,9 @@
package com.googlesource.gerrit.plugins.kafka;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 9a7c66a..7eabf2d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -17,6 +17,7 @@
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -42,7 +43,7 @@
}
@Override
- public boolean send(String topic, EventMessage event) {
+ public ListenableFuture<Boolean> send(String topic, EventMessage event) {
return publisher.publish(topic, event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index cc271b5..7b8d480 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -16,6 +16,7 @@
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
@@ -53,7 +54,7 @@
}
}
- public boolean publish(String topic, EventMessage event) {
+ public ListenableFuture<Boolean> publish(String topic, EventMessage event) {
return session.publish(topic, getPayload(event));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index bb79cb5..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@
package com.googlesource.gerrit.plugins.kafka.session;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.Objects;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -80,34 +86,35 @@
producer = null;
}
- public void publish(String messageBody) {
- publish(properties.getTopic(), messageBody);
+ public ListenableFuture<Boolean> publish(String messageBody) {
+ return publish(properties.getTopic(), messageBody);
}
- public boolean publish(String topic, String messageBody) {
+ public ListenableFuture<Boolean> publish(String topic, String messageBody) {
if (properties.isSendAsync()) {
return publishAsync(topic, messageBody);
}
return publishSync(topic, messageBody);
}
- private boolean publishSync(String topic, String messageBody) {
-
+ private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
+ SettableFuture<Boolean> resultF = SettableFuture.create();
try {
Future<RecordMetadata> future =
producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
RecordMetadata metadata = future.get();
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
publisherMetrics.incrementBrokerPublishedMessage();
- return true;
+ resultF.set(true);
+ return resultF;
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
- return false;
+ return Futures.immediateFailedFuture(e);
}
}
- private boolean publishAsync(String topic, String messageBody) {
+ private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
try {
Future<RecordMetadata> future =
producer.send(
@@ -121,11 +128,16 @@
publisherMetrics.incrementBrokerFailedToPublishMessage();
}
});
- return future != null;
+
+ // The transformation is lightweight, so we can afford using a directExecutor
+ return Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(future),
+ Objects::nonNull,
+ MoreExecutors.directExecutor());
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
- return false;
+ return Futures.immediateFailedFuture(e);
}
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 118a868..7ae9279 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,7 +19,6 @@
import static org.junit.Assert.fail;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
@@ -33,6 +32,7 @@
import com.google.gerrit.extensions.common.ChangeMessageInfo;
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gson.Gson;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 48350f9..75f3330 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,10 +17,10 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.util.IdGenerator;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
index 4074919..f5b6861 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
@@ -17,8 +17,8 @@
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gson.Gson;
import java.util.UUID;
import org.junit.Before;