Merge branch 'stable-3.4'
* stable-3.4:
Send/receive Event object instead of EventMessage
Use EventGsonProvider binding from Gerrit core
Use event deserialization logic from events broker
Implement async send method as per 3.4.0-rc2 API
Deserialize Event and EventMessage
Use EventGsonProvider from Gerrit core
Fix properties in EventConsumerIT tests
Change-Id: Ia534d42980f0949b3954e0eca23b8a74cc3be3ea
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index e059b34..e444876 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.4.0-rc0",
- sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+ artifact = "com.gerritforge:events-broker:3.4.0.4",
+ sha1 = "8d361d863382290e33828116e65698190118d0f1",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 1be52cb..2768dd3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,14 +14,11 @@
package com.googlesource.gerrit.plugins.kafka;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.EventListener;
-import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -39,7 +36,6 @@
@Override
protected void configure() {
- bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index 73c7509..a128af8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,11 +15,11 @@
package com.googlesource.gerrit.plugins.kafka.api;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.common.collect.Sets;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Scopes;
@@ -61,7 +61,7 @@
workQueue.createQueue(configuration.getNumberOfSubscribers(), "kafka-subscriber"));
bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
- bind(new TypeLiteral<Deserializer<EventMessage>>() {}).to(KafkaEventDeserializer.class);
+ bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class);
bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 9a7c66a..3ec21e0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -15,8 +15,9 @@
package com.googlesource.gerrit.plugins.kafka.api;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
@@ -42,12 +43,12 @@
}
@Override
- public boolean send(String topic, EventMessage event) {
+ public ListenableFuture<Boolean> send(String topic, Event event) {
return publisher.publish(topic, event);
}
@Override
- public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+ public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
KafkaEventSubscriber subscriber = subscriberProvider.get();
synchronized (subscribers) {
subscribers.add(subscriber);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java
deleted file mode 100644
index 2c5c1e7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.publish;
-
-import com.google.common.base.Supplier;
-import com.google.gerrit.server.events.SupplierSerializer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.inject.Provider;
-
-public class GsonProvider implements Provider<Gson> {
-
- @Override
- public Gson get() {
- return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
index cc271b5..e7670cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -14,9 +14,10 @@
package com.googlesource.gerrit.plugins.kafka.publish;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -31,7 +32,7 @@
private final Gson gson;
@Inject
- public KafkaPublisher(KafkaSession kafkaSession, Gson gson) {
+ public KafkaPublisher(KafkaSession kafkaSession, @EventGson Gson gson) {
this.session = kafkaSession;
this.gson = gson;
}
@@ -53,11 +54,11 @@
}
}
- public boolean publish(String topic, EventMessage event) {
+ public ListenableFuture<Boolean> publish(String topic, Event event) {
return session.publish(topic, getPayload(event));
}
- private String getPayload(EventMessage event) {
+ private String getPayload(Event event) {
return gson.toJson(event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index bb79cb5..0dc29e1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@
package com.googlesource.gerrit.plugins.kafka.session;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.util.Objects;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -80,34 +86,35 @@
producer = null;
}
- public void publish(String messageBody) {
- publish(properties.getTopic(), messageBody);
+ public ListenableFuture<Boolean> publish(String messageBody) {
+ return publish(properties.getTopic(), messageBody);
}
- public boolean publish(String topic, String messageBody) {
+ public ListenableFuture<Boolean> publish(String topic, String messageBody) {
if (properties.isSendAsync()) {
return publishAsync(topic, messageBody);
}
return publishSync(topic, messageBody);
}
- private boolean publishSync(String topic, String messageBody) {
-
+ private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
+ SettableFuture<Boolean> resultF = SettableFuture.create();
try {
Future<RecordMetadata> future =
producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
RecordMetadata metadata = future.get();
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
publisherMetrics.incrementBrokerPublishedMessage();
- return true;
+ resultF.set(true);
+ return resultF;
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
- return false;
+ return Futures.immediateFailedFuture(e);
}
}
- private boolean publishAsync(String topic, String messageBody) {
+ private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
try {
Future<RecordMetadata> future =
producer.send(
@@ -121,11 +128,16 @@
publisherMetrics.incrementBrokerFailedToPublishMessage();
}
});
- return future != null;
+
+ // The transformation is lightweight, so we can afford using a directExecutor
+ return Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(future),
+ Objects::nonNull,
+ MoreExecutors.directExecutor());
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
- return false;
+ return Futures.immediateFailedFuture(e);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
index bab2ad0..cad2f37 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -14,8 +14,8 @@
package com.googlesource.gerrit.plugins.kafka.subscribe;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.gson.Gson;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Map;
@@ -23,30 +23,27 @@
import org.apache.kafka.common.serialization.StringDeserializer;
@Singleton
-public class KafkaEventDeserializer implements Deserializer<EventMessage> {
+public class KafkaEventDeserializer implements Deserializer<Event> {
private final StringDeserializer stringDeserializer = new StringDeserializer();
- private Gson gson;
+ private EventDeserializer eventDeserializer;
// To be used when providing this deserializer with class name (then need to add a configuration
// entry to set the gson.provider
public KafkaEventDeserializer() {}
@Inject
- public KafkaEventDeserializer(Gson gson) {
- this.gson = gson;
+ public KafkaEventDeserializer(EventDeserializer eventDeserializer) {
+ this.eventDeserializer = eventDeserializer;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
- public EventMessage deserialize(String topic, byte[] data) {
- final EventMessage result =
- gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class);
- result.validate();
-
- return result;
+ public Event deserialize(String topic, byte[] data) {
+ String json = stringDeserializer.deserialize(topic, data);
+ return eventDeserializer.deserialize(json);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 7ef9d7b..90b82aa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -15,8 +15,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.inject.Inject;
@@ -39,14 +39,14 @@
private final OneOffRequestContext oneOffCtx;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Deserializer<EventMessage> valueDeserializer;
+ private final Deserializer<Event> valueDeserializer;
private final KafkaSubscriberProperties configuration;
private final ExecutorService executor;
private final KafkaEventSubscriberMetrics subscriberMetrics;
private final KafkaConsumerFactory consumerFactory;
private final Deserializer<byte[]> keyDeserializer;
- private java.util.function.Consumer<EventMessage> messageProcessor;
+ private java.util.function.Consumer<Event> messageProcessor;
private String topic;
private AtomicBoolean resetOffset = new AtomicBoolean(false);
@@ -57,7 +57,7 @@
KafkaSubscriberProperties configuration,
KafkaConsumerFactory consumerFactory,
Deserializer<byte[]> keyDeserializer,
- Deserializer<EventMessage> valueDeserializer,
+ Deserializer<Event> valueDeserializer,
OneOffRequestContext oneOffCtx,
@ConsumerExecutor ExecutorService executor,
KafkaEventSubscriberMetrics subscriberMetrics) {
@@ -71,7 +71,7 @@
this.valueDeserializer = valueDeserializer;
}
- public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) {
+ public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) {
this.topic = topic;
this.messageProcessor = messageProcessor;
logger.atInfo().log(
@@ -97,7 +97,7 @@
receiver.wakeup();
}
- public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+ public java.util.function.Consumer<Event> getMessageProcessor() {
return messageProcessor;
}
@@ -146,7 +146,7 @@
consumerRecords.forEach(
consumerRecord -> {
try (ManualRequestContext ctx = oneOffCtx.open()) {
- EventMessage event =
+ Event event =
valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
messageProcessor.accept(event);
} catch (Exception e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 4e27585..bd47223 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,8 +19,6 @@
import static org.junit.Assert.fail;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -33,6 +31,7 @@
import com.google.gerrit.extensions.common.ChangeMessageInfo;
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gson.Gson;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
@@ -40,7 +39,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.UUID;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -131,24 +129,22 @@
@Test
@UseLocalDisk
- @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group")
+ @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
@GerritConfig(
- name = "plugin.kafka-events.keyDeserializer",
+ name = "plugin.events-kafka.keyDeserializer",
value = "org.apache.kafka.common.serialization.StringDeserializer")
@GerritConfig(
- name = "plugin.kafka-events.valueDeserializer",
+ name = "plugin.events-kafka.valueDeserializer",
value = "org.apache.kafka.common.serialization.StringDeserializer")
- @GerritConfig(name = "plugin.kafka-events.pollingIntervalMs", value = "500")
+ @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
public void shouldReplayAllEvents() throws InterruptedException {
String topic = "a_topic";
- EventMessage eventMessage =
- new EventMessage(
- new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()),
- new ProjectCreatedEvent());
+ Event eventMessage = new ProjectCreatedEvent();
+ eventMessage.instanceId = "test-instance-id";
Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000);
- List<EventMessage> receivedEvents = new ArrayList<>();
+ List<Event> receivedEvents = new ArrayList<>();
BrokerApi kafkaBrokerApi = kafkaBrokerApi();
kafkaBrokerApi.send(topic, eventMessage);
@@ -157,14 +153,12 @@
waitUntil(() -> receivedEvents.size() == 1, WAIT_FOR_POLL_TIMEOUT);
- assertThat(receivedEvents.get(0).getHeader().eventId)
- .isEqualTo(eventMessage.getHeader().eventId);
+ assertThat(receivedEvents.get(0).instanceId).isEqualTo(eventMessage.instanceId);
kafkaBrokerApi.replayAllEvents(topic);
waitUntil(() -> receivedEvents.size() == 2, WAIT_FOR_POLL_TIMEOUT);
- assertThat(receivedEvents.get(1).getHeader().eventId)
- .isEqualTo(eventMessage.getHeader().eventId);
+ assertThat(receivedEvents.get(1).instanceId).isEqualTo(eventMessage.instanceId);
}
private BrokerApi kafkaBrokerApi() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 48350f9..d8df198 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,10 +17,10 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.util.IdGenerator;
@@ -39,7 +39,6 @@
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -62,7 +61,7 @@
private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
private static final int TEST_POLLING_INTERVAL_MSEC = 100;
private static final int TEST_THREAD_POOL_SIZE = 10;
- private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
+ private static final String TEST_INSTANCE_ID = "test-instance-id";
private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
private static final int TEST_TIMEOUT = 30;
@@ -87,7 +86,10 @@
@Override
protected void configure() {
- bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+ bind(Gson.class)
+ .annotatedWith(EventGson.class)
+ .toProvider(EventGsonProvider.class)
+ .in(Singleton.class);
bind(MetricMaker.class).toInstance(mock(MetricMaker.class, Answers.RETURNS_DEEP_STUBS));
bind(OneOffRequestContext.class)
.toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
@@ -105,8 +107,8 @@
}
}
- public static class TestConsumer implements Consumer<EventMessage> {
- public final List<EventMessage> messages = new ArrayList<>();
+ public static class TestConsumer implements Consumer<Event> {
+ public final List<Event> messages = new ArrayList<>();
private final CountDownLatch lock;
public TestConsumer(int numMessagesExpected) {
@@ -114,7 +116,7 @@
}
@Override
- public void accept(EventMessage message) {
+ public void accept(Event message) {
messages.add(message);
lock.countDown();
}
@@ -128,13 +130,6 @@
}
}
- public static class TestHeader extends Header {
-
- public TestHeader() {
- super(UUID.randomUUID(), TEST_INSTANCE_ID);
- }
- }
-
@BeforeClass
public static void beforeClass() throws Exception {
kafka = new KafkaContainer();
@@ -176,7 +171,8 @@
KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
String testTopic = "test_topic_sync";
TestConsumer testConsumer = new TestConsumer(1);
- EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+ Event testEventMessage = new ProjectCreatedEvent();
+ testEventMessage.instanceId = TEST_INSTANCE_ID;
kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
kafkaBrokerApi.send(testTopic, testEventMessage);
@@ -192,7 +188,8 @@
KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
String testTopic = "test_topic_async";
TestConsumer testConsumer = new TestConsumer(1);
- EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent());
+ Event testEventMessage = new ProjectCreatedEvent();
+ testEventMessage.instanceId = TEST_INSTANCE_ID;
kafkaBrokerApi.send(testTopic, testEventMessage);
kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
deleted file mode 100644
index e456a2a..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.subscribe;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.google.gson.Gson;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Test;
-
-public class KafkaEventDeserializerTest {
- private KafkaEventDeserializer deserializer;
-
- @Before
- public void setUp() {
- final Gson gson = new EventGsonProvider().get();
- deserializer = new KafkaEventDeserializer(gson);
- }
-
- @Test
- public void kafkaEventDeserializerShouldParseAKafkaEvent() {
- final UUID eventId = UUID.randomUUID();
- final String eventType = "event-type";
- final UUID sourceInstanceId = UUID.randomUUID();
- final long eventCreatedOn = 10L;
- final String eventJson =
- String.format(
- "{ "
- + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
- + "\"body\": { \"type\": \"project-created\" }"
- + "}",
- eventId, eventType, sourceInstanceId, eventCreatedOn);
- final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
-
- assertThat(event.getHeader().eventId).isEqualTo(eventId);
- assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId);
- }
-
- @Test(expected = RuntimeException.class)
- public void kafkaEventDeserializerShouldFailForInvalidJson() {
- deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
- }
-
- @Test(expected = RuntimeException.class)
- public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
- deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
- }
-}