Merge branch 'stable-3.3' into stable-3.4
* stable-3.3:
Fallback to default region chain when no region is configured
Bump up version of localstack from 0.12.8 to 0.12.17.5
Change-Id: I7d84df9d1c657a64dac8377c0b273e3ef912423f
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0006f37..388a15e 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.3.2",
- sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+ artifact = "com.gerritforge:events-broker:3.4.0.4",
+ sha1 = "8d361d863382290e33828116e65698190118d0f1",
)
maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 4e4ca3f..0316c06 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -15,9 +15,9 @@
package com.googlesource.gerrit.plugins.kinesis;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
-import com.google.gson.Gson;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import java.util.Collections;
import java.util.Set;
@@ -28,31 +28,24 @@
class KinesisBrokerApi implements BrokerApi {
private final KinesisConsumer.Factory consumerFactory;
- private final Gson gson;
private final KinesisPublisher kinesisPublisher;
private final Set<KinesisConsumer> consumers;
@Inject
public KinesisBrokerApi(
- Gson gson, KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
- this.gson = gson;
+ KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
this.kinesisPublisher = kinesisPublisher;
this.consumerFactory = consumerFactory;
this.consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
}
@Override
- public boolean send(String streamName, EventMessage event) {
- return sendWithResult(streamName, event).isSuccess();
- }
-
- PublishResult sendWithResult(String streamName, EventMessage event) {
- return kinesisPublisher.publish(
- streamName, gson.toJson(event), event.getHeader().sourceInstanceId.toString());
+ public ListenableFuture<Boolean> send(String streamName, Event event) {
+ return kinesisPublisher.publish(streamName, event);
}
@Override
- public void receiveAsync(String streamName, Consumer<EventMessage> eventConsumer) {
+ public void receiveAsync(String streamName, Consumer<Event> eventConsumer) {
KinesisConsumer consumer = consumerFactory.create(streamName, eventConsumer);
consumers.add(consumer);
consumer.subscribe(streamName, eventConsumer);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
index 4a610b5..b563e26 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -14,8 +14,8 @@
package com.googlesource.gerrit.plugins.kinesis;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -26,7 +26,7 @@
class KinesisConsumer {
interface Factory {
- KinesisConsumer create(String topic, Consumer<EventMessage> messageProcessor);
+ KinesisConsumer create(String topic, Consumer<Event> messageProcessor);
}
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -36,7 +36,7 @@
private final ExecutorService executor;
private Scheduler kinesisScheduler;
- private java.util.function.Consumer<EventMessage> messageProcessor;
+ private java.util.function.Consumer<Event> messageProcessor;
private String streamName;
private AtomicBoolean resetOffset = new AtomicBoolean(false);
@@ -52,8 +52,7 @@
this.executor = executor;
}
- public void subscribe(
- String streamName, java.util.function.Consumer<EventMessage> messageProcessor) {
+ public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
this.streamName = streamName;
this.messageProcessor = messageProcessor;
@@ -61,7 +60,7 @@
runReceiver(messageProcessor);
}
- private void runReceiver(java.util.function.Consumer<EventMessage> messageProcessor) {
+ private void runReceiver(java.util.function.Consumer<Event> messageProcessor) {
this.kinesisScheduler =
schedulerFactory.create(streamName, resetOffset.getAndSet(false), messageProcessor).get();
executor.execute(kinesisScheduler);
@@ -81,7 +80,7 @@
logger.atInfo().log("Shutdown kinesis consumer of stream %s completed.", getStreamName());
}
- public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+ public java.util.function.Consumer<Event> getMessageProcessor() {
return messageProcessor;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 9547191..158f8ac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -14,21 +14,21 @@
package com.googlesource.gerrit.plugins.kinesis;
-import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -46,7 +46,7 @@
@Inject
public KinesisPublisher(
- Gson gson,
+ @EventGson Gson gson,
KinesisProducer kinesisProducer,
Configuration configuration,
@ProducerCallbackExecutor ExecutorService callBackExecutor) {
@@ -58,83 +58,37 @@
@Override
public void onEvent(Event event) {
- publish(configuration.getStreamEventsTopic(), gson.toJson(event), event.getType());
+ publish(configuration.getStreamEventsTopic(), event);
}
- PublishResult publish(String streamName, String stringEvent, String partitionKey) {
+ ListenableFuture<Boolean> publish(String streamName, Event event) {
if (configuration.isSendAsync()) {
- return publishAsync(streamName, stringEvent, partitionKey);
+ return publishAsync(streamName, gson.toJson(event), event.getType());
}
- return publishSync(streamName, stringEvent, partitionKey);
+ return publishSync(streamName, gson.toJson(event), event.getType());
}
- private PublishResult publishSync(String streamName, String stringEvent, String partitionKey) {
- logger.atFiner().log(
- "KINESIS PRODUCER - Attempt to publish event %s to stream %s [PK: %s]",
- stringEvent, streamName, partitionKey);
-
- UserRecordResult result = null;
+ private ListenableFuture<Boolean> publishSync(
+ String streamName, String stringEvent, String partitionKey) {
+ SettableFuture<Boolean> resultFuture = SettableFuture.create();
try {
- result =
- kinesisProducer
- .addUserRecord(streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()))
- .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS);
-
- List<Attempt> attemptsDetails = result.getAttempts();
- int numberOfAttempts = attemptsDetails.size();
- if (result.isSuccessful()) {
- logger.atFine().log(
- "KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
- stringEvent,
- result.getShardId(),
- partitionKey,
- result.getSequenceNumber(),
- numberOfAttempts);
- return PublishResult.success(numberOfAttempts);
- } else {
- int currentIdx = numberOfAttempts - 1;
- int previousIdx = currentIdx - 1;
- Attempt current = attemptsDetails.get(currentIdx);
- if (previousIdx >= 0) {
- Attempt previous = attemptsDetails.get(previousIdx);
- logger.atSevere().log(
- String.format(
- "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s. Previous failure - %s : %s",
- stringEvent,
- partitionKey,
- current.getErrorCode(),
- current.getErrorMessage(),
- previous.getErrorCode(),
- previous.getErrorMessage()));
- } else {
- logger.atSevere().log(
- String.format(
- "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s.",
- stringEvent, partitionKey, current.getErrorCode(), current.getErrorMessage()));
- }
- }
- } catch (InterruptedException e) {
- logger.atSevere().withCause(e).log(
- String.format(
- "KINESIS PRODUCER - Interrupted publishing event '%s' [PK: %s]",
- stringEvent, partitionKey));
- } catch (ExecutionException e) {
- logger.atSevere().withCause(e).log(
- String.format(
- "KINESIS PRODUCER - Error when publishing event '%s' [PK: %s]",
- stringEvent, partitionKey));
- } catch (TimeoutException e) {
- logger.atSevere().withCause(e).log(
- String.format(
- "KINESIS PRODUCER - Timeout when publishing event '%s' [PK: %s]",
- stringEvent, partitionKey));
+ resultFuture.set(
+ publishAsync(streamName, stringEvent, partitionKey)
+ .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS));
+ } catch (CancellationException
+ | ExecutionException
+ | InterruptedException
+ | TimeoutException futureException) {
+ logger.atSevere().withCause(futureException).log(
+ "KINESIS PRODUCER - Failed publishing event %s [PK: %s]", stringEvent, partitionKey);
+ resultFuture.set(false);
}
- return PublishResult.failure(
- Optional.ofNullable(result).map(r -> r.getAttempts().size()).orElse(0));
+ return resultFuture;
}
- private PublishResult publishAsync(String streamName, String stringEvent, String partitionKey) {
+ private ListenableFuture<Boolean> publishAsync(
+ String streamName, String stringEvent, String partitionKey) {
try {
ListenableFuture<UserRecordResult> publishF =
kinesisProducer.addUserRecord(
@@ -162,12 +116,13 @@
}
},
callBackExecutor);
+
+ return Futures.transform(
+ publishF, res -> res != null && res.isSuccessful(), callBackExecutor);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"KINESIS PRODUCER - Error when publishing event %s [PK: %s]", stringEvent, partitionKey);
- return PublishResult.failure(1);
+ return Futures.immediateFailedFuture(e);
}
-
- return PublishResult.success(1);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
index b92e8db..5b93b4c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -14,18 +14,13 @@
package com.googlesource.gerrit.plugins.kinesis;
-import static java.util.Objects.requireNonNull;
-
-import com.gerritforge.gerrit.eventbroker.EventMessage;
-import com.gerritforge.gerrit.eventbroker.EventMessage.Header;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
-import java.util.UUID;
import java.util.function.Consumer;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
@@ -38,20 +33,22 @@
class KinesisRecordProcessor implements ShardRecordProcessor {
interface Factory {
- KinesisRecordProcessor create(Consumer<EventMessage> recordProcessor);
+ KinesisRecordProcessor create(Consumer<Event> recordProcessor);
}
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final Consumer<EventMessage> recordProcessor;
+ private final Consumer<Event> recordProcessor;
private final OneOffRequestContext oneOffCtx;
- private final Gson gson;
+ private final EventDeserializer eventDeserializer;
@Inject
KinesisRecordProcessor(
- @Assisted Consumer<EventMessage> recordProcessor, OneOffRequestContext oneOffCtx, Gson gson) {
+ @Assisted Consumer<Event> recordProcessor,
+ OneOffRequestContext oneOffCtx,
+ EventDeserializer eventDeserializer) {
this.recordProcessor = recordProcessor;
this.oneOffCtx = oneOffCtx;
- this.gson = gson;
+ this.eventDeserializer = eventDeserializer;
}
@Override
@@ -76,7 +73,7 @@
String jsonMessage = new String(byteRecord);
logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
try (ManualRequestContext ctx = oneOffCtx.open()) {
- EventMessage eventMessage = deserialise(jsonMessage);
+ Event eventMessage = eventDeserializer.deserialize(jsonMessage);
recordProcessor.accept(eventMessage);
} catch (Exception e) {
logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
@@ -87,23 +84,6 @@
}
}
- private EventMessage deserialise(String json) {
- EventMessage result = gson.fromJson(json, EventMessage.class);
- if (result.getEvent() == null && result.getHeader() == null) {
- Event event = deserialiseEvent(json);
- result = new EventMessage(new Header(UUID.randomUUID(), event.instanceId), event);
- }
- result.validate();
- return result;
- }
-
- private Event deserialiseEvent(String json) {
- Event event = gson.fromJson(json, Event.class);
- requireNonNull(event.type, "Event type cannot be null");
- requireNonNull(event.instanceId, "Event instance id cannot be null");
- return event;
- }
-
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
logger.atInfo().log("Lost lease, so terminating.");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
index 557ad0f..8571a1f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.kinesis;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.function.Consumer;
@@ -23,16 +23,15 @@
class KinesisRecordProcessorFactory implements ShardRecordProcessorFactory {
interface Factory {
- KinesisRecordProcessorFactory create(Consumer<EventMessage> recordProcessor);
+ KinesisRecordProcessorFactory create(Consumer<Event> recordProcessor);
}
- private final Consumer<EventMessage> recordProcessor;
+ private final Consumer<Event> recordProcessor;
private final KinesisRecordProcessor.Factory processorFactory;
@Inject
KinesisRecordProcessorFactory(
- @Assisted Consumer<EventMessage> recordProcessor,
- KinesisRecordProcessor.Factory processorFactory) {
+ @Assisted Consumer<Event> recordProcessor, KinesisRecordProcessor.Factory processorFactory) {
this.recordProcessor = recordProcessor;
this.processorFactory = processorFactory;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index b503d0e..10d752c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -18,7 +18,6 @@
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.common.collect.Sets;
import com.google.gerrit.extensions.events.LifecycleListener;
@@ -26,10 +25,8 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.events.EventListener;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Scopes;
-import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -79,7 +76,6 @@
factory(SchedulerProvider.Factory.class);
bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
- bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
factory(KinesisConsumer.Factory.class);
DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
deleted file mode 100644
index 202f3e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kinesis;
-
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-abstract class PublishResult {
- public abstract boolean isSuccess();
-
- public abstract int attempts();
-
- public static PublishResult success(int attempts) {
- return new AutoValue_PublishResult(true, attempts);
- }
-
- public static PublishResult failure(int attempts) {
- return new AutoValue_PublishResult(false, attempts);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
index 19079bf..4c59b49 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -16,7 +16,7 @@
import static com.googlesource.gerrit.plugins.kinesis.Configuration.cosumerLeaseName;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.server.events.Event;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
@@ -35,7 +35,7 @@
SchedulerProvider create(
String streamName,
boolean fromBeginning,
- java.util.function.Consumer<EventMessage> messageProcessor);
+ java.util.function.Consumer<Event> messageProcessor);
}
private final ConfigsBuilder configsBuilder;
@@ -53,7 +53,7 @@
KinesisRecordProcessorFactory.Factory kinesisRecordProcessorFactory,
@Assisted String streamName,
@Assisted boolean fromBeginning,
- @Assisted java.util.function.Consumer<EventMessage> messageProcessor) {
+ @Assisted java.util.function.Consumer<Event> messageProcessor) {
this.configuration = configuration;
this.kinesisAsyncClient = kinesisAsyncClient;
this.streamName = streamName;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 1ff7faa..f70a5be 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -15,21 +15,26 @@
package com.googlesource.gerrit.plugins.kinesis;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.WaitUtil;
import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
@@ -51,8 +56,10 @@
// lease on the newly created stream
private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
+ private static final long SEND_TIMEOUT_MILLIS = 200;
private static final int LOCALSTACK_PORT = 4566;
+
private LocalStackContainer localstack =
new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.17.5"))
.withServices(DYNAMODB, KINESIS, CLOUDWATCH)
@@ -106,11 +113,30 @@
EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
- kinesisBroker().send(streamName, eventMessage());
+ Event event = eventMessage();
+ kinesisBroker().send(streamName, event);
WaitUtil.waitUntil(
() -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
- assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
- .isEqualTo(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId);
+ compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+ @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+ public void shouldConsumeAnEventWithoutInstanceId() throws Exception {
+ String streamName = UUID.randomUUID().toString();
+ createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+ EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+ kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
+
+ Event event = eventMessage();
+ event.instanceId = null;
+
+ kinesisBroker().send(streamName, event);
+ WaitUtil.waitUntil(
+ () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+ compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
}
@Test
@@ -123,13 +149,12 @@
EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
- EventMessage event = eventMessage();
+ Event event = eventMessage();
kinesisBroker().send(streamName, event);
WaitUtil.waitUntil(
() -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
- assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
- .isEqualTo(event.getHeader().eventId);
+ compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
eventConsumerCounter.clear();
kinesisBroker().disconnect();
@@ -138,8 +163,7 @@
WaitUtil.waitUntil(
() -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
- assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
- .isEqualTo(event.getHeader().eventId);
+ compareEvents(eventConsumerCounter.getConsumedMessages().get(0), event);
}
@Test
@@ -147,48 +171,52 @@
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.publishTimeoutMs", value = "10000")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
- public void sendingSynchronouslyShouldRetryUntilSuccessful() {
+ public void sendingSynchronouslyShouldBeSuccessful()
+ throws InterruptedException, ExecutionException {
String streamName = UUID.randomUUID().toString();
createStreamAsync(streamName);
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isTrue();
- assertThat(publishResult.attempts()).isGreaterThan(1);
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ assertThat(result.get()).isTrue();
}
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
- public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut() {
+ public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut()
+ throws InterruptedException, ExecutionException, TimeoutException {
String streamName = "not-existing-stream";
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isFalse();
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ assertThat(result.get(SEND_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).isFalse();
}
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
- public void sendingAsynchronouslyShouldBeImmediatelySuccessfulEvenWhenStreamDoesNotExist() {
+ public void sendingAsynchronouslyShouldFailWhenStreamDoesNotExist() {
String streamName = "not-existing-stream";
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isTrue();
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
+ assertThat(executionException)
+ .hasMessageThat()
+ .contains("com.amazonaws.services.kinesis.producer.UserRecordFailedException");
}
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
- public void sendingAsynchronouslyShouldBeImmediatelySuccessful() {
+ public void sendingAsynchronouslyShouldBeSuccessful()
+ throws InterruptedException, ExecutionException {
String streamName = UUID.randomUUID().toString();
createStreamAsync(streamName);
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isTrue();
- assertThat(publishResult.attempts()).isEqualTo(1);
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ assertThat(result.get()).isTrue();
}
public KinesisBrokerApi kinesisBroker() {
@@ -214,20 +242,27 @@
CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
}
- private EventMessage eventMessage() {
- return new EventMessage(
- new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+ private Event eventMessage() {
+ Event event = new ProjectCreatedEvent();
+ event.instanceId = "instance-id";
+ return event;
}
- private static class EventConsumerCounter implements Consumer<EventMessage> {
- List<EventMessage> consumedMessages = new ArrayList<>();
+ private void compareEvents(Event event, Event expectedEvent) {
+ assertThat(event.type).isEqualTo(expectedEvent.type);
+ assertThat(event.eventCreatedOn).isEqualTo(expectedEvent.eventCreatedOn);
+ assertThat(event.instanceId).isEqualTo(expectedEvent.instanceId);
+ }
+
+ private static class EventConsumerCounter implements Consumer<Event> {
+ List<Event> consumedMessages = new ArrayList<>();
@Override
- public void accept(EventMessage eventMessage) {
+ public void accept(Event eventMessage) {
consumedMessages.add(eventMessage);
}
- public List<EventMessage> getConsumedMessages() {
+ public List<Event> getConsumedMessages() {
return consumedMessages;
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
index 2401034..d488ab8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorTest.java
@@ -18,12 +18,13 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
-import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.EventDeserializer;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.ProjectCreatedEvent;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
@@ -47,29 +48,29 @@
public class KinesisRecordProcessorTest {
private KinesisRecordProcessor objectUnderTest;
private Gson gson = new EventGsonProvider().get();
+ private EventDeserializer eventDeserializer = new EventDeserializer(gson);
- @Mock Consumer<EventMessage> succeedingConsumer;
- @Captor ArgumentCaptor<EventMessage> eventMessageCaptor;
+ @Mock Consumer<Event> succeedingConsumer;
+ @Captor ArgumentCaptor<Event> eventMessageCaptor;
@Mock OneOffRequestContext oneOffCtx;
@Mock ManualRequestContext requestContext;
@Before
public void setup() {
when(oneOffCtx.open()).thenReturn(requestContext);
- objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, gson);
+ objectUnderTest = new KinesisRecordProcessor(succeedingConsumer, oneOffCtx, eventDeserializer);
}
@Test
public void shouldSkipEventWithoutSourceInstanceId() {
Event event = new ProjectCreatedEvent();
- EventMessage messageWithoutSourceInstanceId =
- new EventMessage(new EventMessage.Header(UUID.randomUUID(), (String) null), event);
+ event.instanceId = UUID.randomUUID().toString();
- ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(messageWithoutSourceInstanceId));
+ ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
objectUnderTest.processRecords(kinesisInput);
- verify(succeedingConsumer, never()).accept(messageWithoutSourceInstanceId);
+ verify(succeedingConsumer, never()).accept(event);
}
@Test
@@ -84,19 +85,19 @@
verify(succeedingConsumer, only()).accept(eventMessageCaptor.capture());
- EventMessage result = eventMessageCaptor.getValue();
- assertThat(result.getHeader().sourceInstanceId).isEqualTo(instanceId);
+ Event result = eventMessageCaptor.getValue();
+ assertThat(result.instanceId).isEqualTo(instanceId);
}
@Test
- public void shouldSkipEventObjectWithoutInstanceId() {
+ public void shouldProcessEventObjectWithoutInstanceId() {
Event event = new ProjectCreatedEvent();
event.instanceId = null;
ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
objectUnderTest.processRecords(kinesisInput);
- verify(succeedingConsumer, never()).accept(any());
+ verify(succeedingConsumer, times(1)).accept(any());
}
@Test
@@ -141,7 +142,7 @@
ProcessRecordsInput kinesisInput = sampleMessage(gson.toJson(event));
objectUnderTest.processRecords(kinesisInput);
- verify(succeedingConsumer, only()).accept(any(EventMessage.class));
+ verify(succeedingConsumer, only()).accept(any(Event.class));
}
private ProcessRecordsInput sampleMessage(String message) {