Add Kafka REST API authentication
Add basic username/password authentication to all calls to the Kafka
REST API.
Change-Id: I9b91b7750d49fe5664ed4e837ed16cdca633dff1
diff --git a/BUILD b/BUILD
index d9f938b..8889c80 100644
--- a/BUILD
+++ b/BUILD
@@ -28,6 +28,7 @@
junit_tests(
name = "events_kafka_tests",
srcs = glob(["src/test/java/**/*.java"]),
+ resources = glob(["src/test/resources/**/*"]),
tags = ["events-kafka"],
timeout = "long",
deps = [
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 62330e3..1b29748 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -40,6 +40,8 @@
private static final String PROPERTY_HTTP_WIRE_LOG = "httpWireLog";
private static final boolean DEFAULT_HTTP_WIRE_LOG = false;
private static final String PROPERTY_REST_API_URI = "restApiUri";
+ private static final String PROPERTY_REST_API_USERNAME = "restApiUsername";
+ private static final String PROPERTY_REST_API_PASSWORD = "restApiPassword";
private static final String PROPERTY_REST_API_TIMEOUT = "restApiTimeout";
private static final Duration DEFAULT_REST_API_TIMEOUT = Duration.ofSeconds(60);
private static final String PROPERTY_REST_API_THREADS = "restApiThreads";
@@ -64,6 +66,8 @@
private final boolean sendAsync;
private final ClientType clientType;
private final String restApiUriString;
+ private final String restApiUsername;
+ private final String restApiPassword;
private final boolean httpWireLog;
private final Duration restApiTimeout;
private final int restApiThreads;
@@ -86,6 +90,12 @@
throw new IllegalArgumentException("Missing REST API URI in Kafka properties");
}
+ restApiUsername = fromGerritConfig.getString(PROPERTY_REST_API_USERNAME);
+ restApiPassword = fromGerritConfig.getString(PROPERTY_REST_API_PASSWORD);
+ if (!Strings.isNullOrEmpty(restApiUsername) && Strings.isNullOrEmpty(restApiPassword)) {
+ throw new IllegalArgumentException("Missing REST API password in kafka properties");
+ }
+
httpWireLog = fromGerritConfig.getBoolean(PROPERTY_HTTP_WIRE_LOG, DEFAULT_HTTP_WIRE_LOG);
restApiTimeout =
Duration.ofMillis(
@@ -99,6 +109,8 @@
case NATIVE:
default:
restApiUriString = null;
+ restApiUsername = null;
+ restApiPassword = null;
httpWireLog = false;
restApiTimeout = null;
restApiThreads = 0;
@@ -111,7 +123,11 @@
@VisibleForTesting
public KafkaProperties(
- boolean sendAsync, ClientType clientType, @Nullable String restApiUriString) {
+ boolean sendAsync,
+ ClientType clientType,
+ @Nullable String restApiUriString,
+ @Nullable String restApiUsername,
+ @Nullable String restApiPassword) {
super();
setDefaults();
topic = DEFAULT_STREAM_EVENTS_TOPIC_NAME;
@@ -122,6 +138,8 @@
this.httpWireLog = false;
restApiTimeout = DEFAULT_REST_API_TIMEOUT;
restApiThreads = DEFAULT_REST_API_THREADS;
+ this.restApiUsername = restApiUsername;
+ this.restApiPassword = restApiPassword;
}
private void setDefaults() {
@@ -176,6 +194,14 @@
return getRestApiUri("");
}
+ public String getRestApiUsername() {
+ return restApiUsername;
+ }
+
+ public String getRestApiPassword() {
+ return restApiPassword;
+ }
+
public URI getRestApiUri(String kafkaRestId) throws URISyntaxException {
return new URI(restApiUriString.replace(REST_API_URI_ID_PLACEHOLDER, kafkaRestId));
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 9b4ef8f..7e6a49c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -45,7 +45,7 @@
@VisibleForTesting
public KafkaSubscriberProperties(
int pollingInterval, String groupId, int numberOfSubscribers, ClientType clientType) {
- this(pollingInterval, groupId, numberOfSubscribers, clientType, null);
+ this(pollingInterval, groupId, numberOfSubscribers, clientType, null, null, null);
}
@VisibleForTesting
@@ -54,8 +54,10 @@
String groupId,
int numberOfSubscribers,
ClientType clientType,
- String restApiUriString) {
- super(true, clientType, restApiUriString);
+ String restApiUriString,
+ String restApiUsername,
+ String restApiPassword) {
+ super(true, clientType, restApiUriString, restApiUsername, restApiPassword);
this.pollingInterval = pollingInterval;
this.groupId = groupId;
this.numberOfSubscribers = numberOfSubscribers;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java
new file mode 100644
index 0000000..ec50bf6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2011 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import com.google.common.base.Strings;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.net.URISyntaxException;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/** Looks up a remote's password in secure.config. */
+public class HttpAsyncClientBuilderFactory {
+ private final KafkaProperties config;
+
+ @Inject
+ public HttpAsyncClientBuilderFactory(KafkaProperties config) {
+ this.config = config;
+ }
+
+ public HttpAsyncClientBuilder create() throws URISyntaxException {
+ String user = config.getRestApiUsername();
+ String pass = config.getRestApiPassword();
+ HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients.custom();
+ if (!Strings.isNullOrEmpty(user) && !Strings.isNullOrEmpty(pass)) {
+ CredentialsProvider credsProvider = new BasicCredentialsProvider();
+ credsProvider.setCredentials(
+ new AuthScope(config.getRestApiUri().getHost(), config.getRestApiUri().getPort()),
+ new UsernamePasswordCredentials(user, pass));
+ httpAsyncClientBuilder.setDefaultCredentialsProvider(credsProvider);
+ }
+ return httpAsyncClientBuilder;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
index d06b427..047a96e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
@@ -50,7 +50,6 @@
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
@@ -78,9 +77,11 @@
public KafkaRestClient(
HttpHostProxy httpHostProxy,
@FutureExecutor ExecutorService executor,
- @Assisted KafkaProperties configuration) {
+ HttpAsyncClientBuilderFactory credentialsFactory,
+ @Assisted KafkaProperties configuration)
+ throws URISyntaxException {
proxy = httpHostProxy;
- httpclient = proxy.apply(HttpAsyncClients.custom()).build();
+ httpclient = proxy.apply(credentialsFactory.create()).build();
httpclient.start();
this.configuration = configuration;
kafkaRestApiTimeoutMsec = (int) configuration.getRestApiTimeout().toMillis();
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 24bdc8c..bfac4c5 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -93,3 +93,19 @@
: Send messages to Kafka asynchronously, detaching the calling process from the
acknowledge of the message being sent.
Default: true
+
+secure.config
+--------------------
+
+`plugin.@PLUGIN@.restApiUsername`
+: Username used for the authentication to the
+ [Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+ for sending/receiving messages through REST-API instead of using the native Kafka client.
+ **NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+ Default: unset
+
+`plugin.@PLUGIN@.restApiPassword`
+: Password used for the authentication to the
+ [Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+ **NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+ Default: unset
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
index 06c6801..5cbf585 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
@@ -20,6 +20,7 @@
import java.net.URISyntaxException;
import java.util.Map;
import org.junit.Ignore;
+import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@@ -31,11 +32,12 @@
public static final int KAFKA_REST_PORT = 8082;
private final String kafkaRestHostname;
- public KafkaRestContainer(KafkaContainer kafkaContainer) {
- this(kafkaContainer, null);
+ public KafkaRestContainer(KafkaContainer kafkaContainer, Boolean enableAuthentication) {
+ this(kafkaContainer, null, enableAuthentication);
}
- public KafkaRestContainer(KafkaContainer kafkaContainer, String kafkaRestId) {
+ public KafkaRestContainer(
+ KafkaContainer kafkaContainer, String kafkaRestId, Boolean enableAuthentication) {
super(restProxyImageFor(kafkaContainer));
kafkaRestHostname = KAFKA_REST_PROXY_HOSTNAME + Strings.nullToEmpty(kafkaRestId);
@@ -54,6 +56,18 @@
if (kafkaRestId != null) {
withEnv("KAFKA_REST_ID", kafkaRestId);
}
+ if (enableAuthentication) {
+ withEnv("KAFKA_REST_AUTHENTICATION_METHOD", "BASIC");
+ withEnv("KAFKA_REST_AUTHENTICATION_REALM", "KafkaRest");
+ withEnv("KAFKA_REST_AUTHENTICATION_ROLES", "GerritRole");
+ withEnv(
+ "KAFKAREST_OPTS",
+ "-Djava.security.auth.login.config=/etc/kafka-rest/rest-jaas.properties");
+ withClasspathResourceMapping(
+ "rest-jaas.properties", "/etc/kafka-rest/rest-jaas.properties", BindMode.READ_ONLY);
+ withClasspathResourceMapping(
+ "password.properties", "/etc/kafka-rest/password.properties", BindMode.READ_ONLY);
+ }
withCreateContainerCmdModifier(cmd -> cmd.withHostName(kafkaRestHostname));
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 37f15da..1991467 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,6 +17,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
+import com.google.common.base.Strings;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
@@ -65,6 +66,8 @@
static KafkaRestContainer kafkaRest;
static KafkaRestContainer kafkaRestWithId;
static GenericContainer<?> nginx;
+ static String restApiUsername;
+ static String restApiPassword;
static final int TEST_NUM_SUBSCRIBERS = 1;
static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
@@ -167,9 +170,9 @@
public static void beforeClass() throws Exception {
kafka = KafkaContainerProvider.get();
kafka.start();
- kafkaRestWithId = new KafkaRestContainer(kafka, KAFKA_REST_ID);
+ kafkaRestWithId = new KafkaRestContainer(kafka, KAFKA_REST_ID, isAuthenticationProvided());
kafkaRestWithId.start();
- kafkaRest = new KafkaRestContainer(kafka);
+ kafkaRest = new KafkaRestContainer(kafka, isAuthenticationProvided());
kafkaRest.start();
System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
@@ -194,6 +197,10 @@
}
}
+ private static boolean isAuthenticationProvided() {
+ return !Strings.isNullOrEmpty(restApiUsername) && !Strings.isNullOrEmpty(restApiPassword);
+ }
+
protected TestModule newTestModule(KafkaProperties kafkaProperties) {
return new TestModule(kafkaProperties);
}
@@ -221,7 +228,9 @@
@Test
public void shouldSendSyncAndReceiveToTopic() {
- connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
+ connectToKafka(
+ new KafkaProperties(
+ false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
String testTopic = "test_topic_sync";
TestConsumer testConsumer = new TestConsumer(1);
@@ -240,7 +249,9 @@
@Test
public void shouldSendAsyncAndReceiveToTopic() {
- connectToKafka(new KafkaProperties(true, clientType, getKafkaRestApiUriString()));
+ connectToKafka(
+ new KafkaProperties(
+ true, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
String testTopic = "test_topic_async";
TestConsumer testConsumer = new TestConsumer(1);
@@ -259,7 +270,9 @@
@Test
public void shouldSendToTopicAndResetOffset() {
- connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
+ connectToKafka(
+ new KafkaProperties(
+ false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
String testTopic = "test_topic_reset";
Event testEventMessage = new ProjectCreatedEvent();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
index c0c2bc7..5be61ab 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
@@ -56,7 +56,9 @@
TEST_GROUP_ID,
TEST_NUM_SUBSCRIBERS,
ClientType.REST,
- getKafkaRestApiUriString());
+ getKafkaRestApiUriString(),
+ restApiUsername,
+ restApiPassword);
bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
index 91b6286..cee073f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
@@ -35,9 +35,11 @@
+ " listen [::]:80 default_server;\\n"
+ " location /%s/ {\\n"
+ " proxy_pass http://%s:%d/; \\n"
+ + " proxy_set_header Authorization \"Basic Z2Vycml0OnNlY3JldA==\";\\n"
+ " }\\n"
+ " location / {\\n"
+ " proxy_pass http://%s:%d; \\n"
+ + " proxy_set_header Authorization \"Basic Z2Vycml0OnNlY3JldA==\";\\n"
+ " }\\n"
+ "}",
KAFKA_REST_ID,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java
new file mode 100644
index 0000000..abd054f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerRestApiWithAuthenticationTest extends KafkaBrokerRestApiTest {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ restApiUsername = "gerrit";
+ restApiPassword = "secret";
+ KafkaBrokerRestApiTest.beforeClass();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
index 9125989..6ee06e7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
@@ -56,7 +56,9 @@
TEST_GROUP_ID,
TEST_NUM_SUBSCRIBERS,
ClientType.REST,
- getApiUriString());
+ getApiUriString(),
+ null,
+ null);
bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
diff --git a/src/test/resources/password.properties b/src/test/resources/password.properties
new file mode 100644
index 0000000..bcff92d
--- /dev/null
+++ b/src/test/resources/password.properties
@@ -0,0 +1 @@
+gerrit: secret,GerritRole
\ No newline at end of file
diff --git a/src/test/resources/rest-jaas.properties b/src/test/resources/rest-jaas.properties
new file mode 100644
index 0000000..719a030
--- /dev/null
+++ b/src/test/resources/rest-jaas.properties
@@ -0,0 +1,5 @@
+KafkaRest {
+ org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+ debug="true"
+ file="/etc/kafka-rest/password.properties";
+};
\ No newline at end of file