Merge branch 'stable-3.3'
* stable-3.3:
Add message content validation
Use events-broker 3.3.2
Change-Id: I963bde4e53eb11faee8077cc1784b0af020caa73
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 0006f37..7aa154e 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -109,8 +109,8 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.3.2",
- sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+ artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+ sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
)
maven_jar(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
index 4e4ca3f..1c053b4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -17,6 +17,7 @@
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.inject.Inject;
import java.util.Collections;
@@ -42,11 +43,7 @@
}
@Override
- public boolean send(String streamName, EventMessage event) {
- return sendWithResult(streamName, event).isSuccess();
- }
-
- PublishResult sendWithResult(String streamName, EventMessage event) {
+ public ListenableFuture<Boolean> send(String streamName, EventMessage event) {
return kinesisPublisher.publish(
streamName, gson.toJson(event), event.getHeader().sourceInstanceId.toString());
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
index 9547191..225649a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -14,21 +14,20 @@
package com.googlesource.gerrit.plugins.kinesis;
-import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -61,80 +60,34 @@
publish(configuration.getStreamEventsTopic(), gson.toJson(event), event.getType());
}
- PublishResult publish(String streamName, String stringEvent, String partitionKey) {
+ ListenableFuture<Boolean> publish(String streamName, String stringEvent, String partitionKey) {
if (configuration.isSendAsync()) {
return publishAsync(streamName, stringEvent, partitionKey);
}
return publishSync(streamName, stringEvent, partitionKey);
}
- private PublishResult publishSync(String streamName, String stringEvent, String partitionKey) {
- logger.atFiner().log(
- "KINESIS PRODUCER - Attempt to publish event %s to stream %s [PK: %s]",
- stringEvent, streamName, partitionKey);
-
- UserRecordResult result = null;
+ private ListenableFuture<Boolean> publishSync(
+ String streamName, String stringEvent, String partitionKey) {
+ SettableFuture<Boolean> resultFuture = SettableFuture.create();
try {
- result =
- kinesisProducer
- .addUserRecord(streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()))
- .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS);
-
- List<Attempt> attemptsDetails = result.getAttempts();
- int numberOfAttempts = attemptsDetails.size();
- if (result.isSuccessful()) {
- logger.atFine().log(
- "KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
- stringEvent,
- result.getShardId(),
- partitionKey,
- result.getSequenceNumber(),
- numberOfAttempts);
- return PublishResult.success(numberOfAttempts);
- } else {
- int currentIdx = numberOfAttempts - 1;
- int previousIdx = currentIdx - 1;
- Attempt current = attemptsDetails.get(currentIdx);
- if (previousIdx >= 0) {
- Attempt previous = attemptsDetails.get(previousIdx);
- logger.atSevere().log(
- String.format(
- "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s. Previous failure - %s : %s",
- stringEvent,
- partitionKey,
- current.getErrorCode(),
- current.getErrorMessage(),
- previous.getErrorCode(),
- previous.getErrorMessage()));
- } else {
- logger.atSevere().log(
- String.format(
- "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s.",
- stringEvent, partitionKey, current.getErrorCode(), current.getErrorMessage()));
- }
- }
- } catch (InterruptedException e) {
- logger.atSevere().withCause(e).log(
- String.format(
- "KINESIS PRODUCER - Interrupted publishing event '%s' [PK: %s]",
- stringEvent, partitionKey));
- } catch (ExecutionException e) {
- logger.atSevere().withCause(e).log(
- String.format(
- "KINESIS PRODUCER - Error when publishing event '%s' [PK: %s]",
- stringEvent, partitionKey));
- } catch (TimeoutException e) {
- logger.atSevere().withCause(e).log(
- String.format(
- "KINESIS PRODUCER - Timeout when publishing event '%s' [PK: %s]",
- stringEvent, partitionKey));
+ resultFuture.set(
+ publishAsync(streamName, stringEvent, partitionKey)
+ .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS));
+ } catch (CancellationException
+ | ExecutionException
+ | InterruptedException
+ | TimeoutException futureException) {
+ logger.atSevere().withCause(futureException).log(
+ "KINESIS PRODUCER - Failed publishing event %s [PK: %s]", stringEvent, partitionKey);
+ resultFuture.set(false);
}
- return PublishResult.failure(
- Optional.ofNullable(result).map(r -> r.getAttempts().size()).orElse(0));
+ return resultFuture;
}
- private PublishResult publishAsync(String streamName, String stringEvent, String partitionKey) {
+ private ListenableFuture<Boolean> publishAsync(
+ String streamName, String stringEvent, String partitionKey) {
try {
ListenableFuture<UserRecordResult> publishF =
kinesisProducer.addUserRecord(
@@ -162,12 +115,13 @@
}
},
callBackExecutor);
+
+ return Futures.transform(
+ publishF, res -> res != null && res.isSuccessful(), callBackExecutor);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"KINESIS PRODUCER - Error when publishing event %s [PK: %s]", stringEvent, partitionKey);
- return PublishResult.failure(1);
+ return Futures.immediateFailedFuture(e);
}
-
- return PublishResult.success(1);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index 2629fe7..23a8a1d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -18,13 +18,13 @@
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
-import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.common.collect.Sets;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventGsonProvider;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
deleted file mode 100644
index 202f3e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kinesis;
-
-import com.google.auto.value.AutoValue;
-
-@AutoValue
-abstract class PublishResult {
- public abstract boolean isSuccess();
-
- public abstract int attempts();
-
- public static PublishResult success(int attempts) {
- return new AutoValue_PublishResult(true, attempts);
- }
-
- public static PublishResult failure(int attempts) {
- return new AutoValue_PublishResult(false, attempts);
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 7b5dded..d7e4d07 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -15,12 +15,14 @@
package com.googlesource.gerrit.plugins.kinesis;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.WaitUtil;
@@ -30,6 +32,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
@@ -51,8 +56,10 @@
// lease on the newly created stream
private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
+ private static final long SEND_TIMEOUT_MILLIS = 200;
private static final int LOCALSTACK_PORT = 4566;
+
private LocalStackContainer localstack =
new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.8"))
.withServices(DYNAMODB, KINESIS, CLOUDWATCH)
@@ -147,48 +154,52 @@
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.publishTimeoutMs", value = "10000")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
- public void sendingSynchronouslyShouldRetryUntilSuccessful() {
+ public void sendingSynchronouslyShouldBeSuccessful()
+ throws InterruptedException, ExecutionException {
String streamName = UUID.randomUUID().toString();
createStreamAsync(streamName);
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isTrue();
- assertThat(publishResult.attempts()).isGreaterThan(1);
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ assertThat(result.get()).isTrue();
}
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
- public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut() {
+ public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut()
+ throws InterruptedException, ExecutionException, TimeoutException {
String streamName = "not-existing-stream";
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isFalse();
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ assertThat(result.get(SEND_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)).isFalse();
}
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
- public void sendingAsynchronouslyShouldBeImmediatelySuccessfulEvenWhenStreamDoesNotExist() {
+ public void sendingAsynchronouslyShouldFailWhenStreamDoesNotExist() {
String streamName = "not-existing-stream";
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isTrue();
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ ExecutionException executionException = assertThrows(ExecutionException.class, result::get);
+ assertThat(executionException)
+ .hasMessageThat()
+ .contains("com.amazonaws.services.kinesis.producer.UserRecordFailedException");
}
@Test
@GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
@GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
@GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
- public void sendingAsynchronouslyShouldBeImmediatelySuccessful() {
+ public void sendingAsynchronouslyShouldBeSuccessful()
+ throws InterruptedException, ExecutionException {
String streamName = UUID.randomUUID().toString();
createStreamAsync(streamName);
- PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
- assertThat(publishResult.isSuccess()).isTrue();
- assertThat(publishResult.attempts()).isEqualTo(1);
+ ListenableFuture<Boolean> result = kinesisBroker().send(streamName, eventMessage());
+ assertThat(result.get()).isTrue();
}
public KinesisBrokerApi kinesisBroker() {