Merge branch 'stable-3.4'
* stable-3.4:
Fallback to default region chain when no region is configured
Bump up version of localstack from 0.12.8 to 0.12.17.5
Change-Id: I7d1fe7b3f2d9832c234f1032ff666f053374735f
diff --git a/BUILD b/BUILD
index 3456a2e..b201250 100644
--- a/BUILD
+++ b/BUILD
@@ -65,6 +65,7 @@
"@amazon-http-client-spi//jar",
"@amazon-kinesis-client//jar",
"@amazon-kinesis//jar",
+ "@awssdk-kinesis-producer//jar",
"@events-broker//jar",
],
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
index 02b65bf..5c5dc52 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
@@ -20,15 +20,18 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
+import software.amazon.awssdk.regions.providers.AwsRegionProviderChain;
@Singleton
public class KinesisProducerProvider implements Provider<KinesisProducer> {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Configuration configuration;
+ private final AwsRegionProviderChain regionProvider;
@Inject
- KinesisProducerProvider(Configuration configuration) {
+ KinesisProducerProvider(Configuration configuration, AwsRegionProviderChain regionProvider) {
this.configuration = configuration;
+ this.regionProvider = regionProvider;
}
@Override
@@ -39,7 +42,8 @@
.setMaxConnections(1)
.setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs());
- configuration.getRegion().ifPresent(r -> conf.setRegion(r.toString()));
+ conf.setRegion(configuration.getRegion().orElseGet(regionProvider::getRegion).toString());
+
configuration
.getEndpoint()
.ifPresent(
@@ -52,7 +56,7 @@
logger.atInfo().log(
"Kinesis producer configured. Request Timeout (ms):'%s'%s%s",
configuration.getPublishSingleRequestTimeoutMs(),
- configuration.getRegion().map(r -> String.format("|region: '%s'", r.id())).orElse(""),
+ String.format("|region: '%s'", conf.getRegion()),
configuration
.getEndpoint()
.map(e -> String.format("|endpoint: '%s'", e.toASCIIString()))
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
index 8380716..6fcf93f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -30,6 +30,8 @@
import com.google.inject.TypeLiteral;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import software.amazon.awssdk.regions.providers.AwsRegionProviderChain;
+import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@@ -76,6 +78,7 @@
bind(KinesisAsyncClient.class).toProvider(KinesisAsyncClientProvider.class).in(SINGLETON);
bind(DynamoDbAsyncClient.class).toProvider(DynamoDbAsyncClientProvider.class).in(SINGLETON);
bind(CloudWatchAsyncClient.class).toProvider(CloudWatchAsyncClientProvider.class).in(SINGLETON);
+ bind(AwsRegionProviderChain.class).toInstance(new DefaultAwsRegionProviderChain());
factory(SchedulerProvider.Factory.class);
bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
index 6128f3c..7449440 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -62,7 +62,7 @@
private static final int LOCALSTACK_PORT = 4566;
private LocalStackContainer localstack =
- new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.8"))
+ new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.17.5"))
.withServices(DYNAMODB, KINESIS, CLOUDWATCH)
.withEnv("USE_SSL", "true")
.withExposedPorts(LOCALSTACK_PORT);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProviderTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProviderTest.java
new file mode 100644
index 0000000..1a924c7
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProviderTest.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import java.util.Optional;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.regions.providers.AwsRegionProviderChain;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisProducerProviderTest {
+ private KinesisProducerProvider objectUnderTest;
+
+ @Mock Configuration configuration;
+ @Mock AwsRegionProviderChain regionProvider;
+
+ @Before
+ public void setup() {
+ long aRequestTimeout = 1000L;
+ when(configuration.getPublishSingleRequestTimeoutMs()).thenReturn(aRequestTimeout);
+ objectUnderTest = new KinesisProducerProvider(configuration, regionProvider);
+ }
+
+ @Test
+ public void shouldCallRegionProviderWhenRegionNotExplicitlyConfigured() {
+ when(configuration.getRegion()).thenReturn(Optional.empty());
+ when(regionProvider.getRegion()).thenReturn(Region.US_EAST_1);
+
+ KinesisProducer kinesisProducer = objectUnderTest.get();
+
+ verify(regionProvider).getRegion();
+ }
+
+ @Test
+ public void shouldNotCallRegionProviderWhenRegionIsExplicitlyConfigured() {
+ when(configuration.getRegion()).thenReturn(Optional.of(Region.US_EAST_1));
+
+ KinesisProducer kinesisProducer = objectUnderTest.get();
+
+ verify(regionProvider, never()).getRegion();
+ }
+}