Add Kafka REST API authentication

Add basic username/password authentication to all calls to the Kafka
REST API.

Change-Id: I9b91b7750d49fe5664ed4e837ed16cdca633dff1
diff --git a/BUILD b/BUILD
index d9f938b..8889c80 100644
--- a/BUILD
+++ b/BUILD
@@ -28,6 +28,7 @@
 junit_tests(
     name = "events_kafka_tests",
     srcs = glob(["src/test/java/**/*.java"]),
+    resources = glob(["src/test/resources/**/*"]),
     tags = ["events-kafka"],
     timeout = "long",
     deps = [
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 62330e3..1b29748 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -40,6 +40,8 @@
   private static final String PROPERTY_HTTP_WIRE_LOG = "httpWireLog";
   private static final boolean DEFAULT_HTTP_WIRE_LOG = false;
   private static final String PROPERTY_REST_API_URI = "restApiUri";
+  private static final String PROPERTY_REST_API_USERNAME = "restApiUsername";
+  private static final String PROPERTY_REST_API_PASSWORD = "restApiPassword";
   private static final String PROPERTY_REST_API_TIMEOUT = "restApiTimeout";
   private static final Duration DEFAULT_REST_API_TIMEOUT = Duration.ofSeconds(60);
   private static final String PROPERTY_REST_API_THREADS = "restApiThreads";
@@ -64,6 +66,8 @@
   private final boolean sendAsync;
   private final ClientType clientType;
   private final String restApiUriString;
+  private final String restApiUsername;
+  private final String restApiPassword;
   private final boolean httpWireLog;
   private final Duration restApiTimeout;
   private final int restApiThreads;
@@ -86,6 +90,12 @@
           throw new IllegalArgumentException("Missing REST API URI in Kafka properties");
         }
 
+        restApiUsername = fromGerritConfig.getString(PROPERTY_REST_API_USERNAME);
+        restApiPassword = fromGerritConfig.getString(PROPERTY_REST_API_PASSWORD);
+        if (!Strings.isNullOrEmpty(restApiUsername) && Strings.isNullOrEmpty(restApiPassword)) {
+          throw new IllegalArgumentException("Missing REST API password in kafka properties");
+        }
+
         httpWireLog = fromGerritConfig.getBoolean(PROPERTY_HTTP_WIRE_LOG, DEFAULT_HTTP_WIRE_LOG);
         restApiTimeout =
             Duration.ofMillis(
@@ -99,6 +109,8 @@
       case NATIVE:
       default:
         restApiUriString = null;
+        restApiUsername = null;
+        restApiPassword = null;
         httpWireLog = false;
         restApiTimeout = null;
         restApiThreads = 0;
@@ -111,7 +123,11 @@
 
   @VisibleForTesting
   public KafkaProperties(
-      boolean sendAsync, ClientType clientType, @Nullable String restApiUriString) {
+      boolean sendAsync,
+      ClientType clientType,
+      @Nullable String restApiUriString,
+      @Nullable String restApiUsername,
+      @Nullable String restApiPassword) {
     super();
     setDefaults();
     topic = DEFAULT_STREAM_EVENTS_TOPIC_NAME;
@@ -122,6 +138,8 @@
     this.httpWireLog = false;
     restApiTimeout = DEFAULT_REST_API_TIMEOUT;
     restApiThreads = DEFAULT_REST_API_THREADS;
+    this.restApiUsername = restApiUsername;
+    this.restApiPassword = restApiPassword;
   }
 
   private void setDefaults() {
@@ -176,6 +194,14 @@
     return getRestApiUri("");
   }
 
+  public String getRestApiUsername() {
+    return restApiUsername;
+  }
+
+  public String getRestApiPassword() {
+    return restApiPassword;
+  }
+
   public URI getRestApiUri(String kafkaRestId) throws URISyntaxException {
     return new URI(restApiUriString.replace(REST_API_URI_ID_PLACEHOLDER, kafkaRestId));
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 9b4ef8f..7e6a49c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -45,7 +45,7 @@
   @VisibleForTesting
   public KafkaSubscriberProperties(
       int pollingInterval, String groupId, int numberOfSubscribers, ClientType clientType) {
-    this(pollingInterval, groupId, numberOfSubscribers, clientType, null);
+    this(pollingInterval, groupId, numberOfSubscribers, clientType, null, null, null);
   }
 
   @VisibleForTesting
@@ -54,8 +54,10 @@
       String groupId,
       int numberOfSubscribers,
       ClientType clientType,
-      String restApiUriString) {
-    super(true, clientType, restApiUriString);
+      String restApiUriString,
+      String restApiUsername,
+      String restApiPassword) {
+    super(true, clientType, restApiUriString, restApiUsername, restApiPassword);
     this.pollingInterval = pollingInterval;
     this.groupId = groupId;
     this.numberOfSubscribers = numberOfSubscribers;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java
new file mode 100644
index 0000000..ec50bf6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2011 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import com.google.common.base.Strings;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.net.URISyntaxException;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/** Looks up a remote's password in secure.config. */
+public class HttpAsyncClientBuilderFactory {
+  private final KafkaProperties config;
+
+  @Inject
+  public HttpAsyncClientBuilderFactory(KafkaProperties config) {
+    this.config = config;
+  }
+
+  public HttpAsyncClientBuilder create() throws URISyntaxException {
+    String user = config.getRestApiUsername();
+    String pass = config.getRestApiPassword();
+    HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients.custom();
+    if (!Strings.isNullOrEmpty(user) && !Strings.isNullOrEmpty(pass)) {
+      CredentialsProvider credsProvider = new BasicCredentialsProvider();
+      credsProvider.setCredentials(
+          new AuthScope(config.getRestApiUri().getHost(), config.getRestApiUri().getPort()),
+          new UsernamePasswordCredentials(user, pass));
+      httpAsyncClientBuilder.setDefaultCredentialsProvider(credsProvider);
+    }
+    return httpAsyncClientBuilder;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
index d06b427..047a96e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
@@ -50,7 +50,6 @@
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
 import org.apache.log4j.Appender;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
@@ -78,9 +77,11 @@
   public KafkaRestClient(
       HttpHostProxy httpHostProxy,
       @FutureExecutor ExecutorService executor,
-      @Assisted KafkaProperties configuration) {
+      HttpAsyncClientBuilderFactory credentialsFactory,
+      @Assisted KafkaProperties configuration)
+      throws URISyntaxException {
     proxy = httpHostProxy;
-    httpclient = proxy.apply(HttpAsyncClients.custom()).build();
+    httpclient = proxy.apply(credentialsFactory.create()).build();
     httpclient.start();
     this.configuration = configuration;
     kafkaRestApiTimeoutMsec = (int) configuration.getRestApiTimeout().toMillis();
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 24bdc8c..bfac4c5 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -93,3 +93,19 @@
 :	Send messages to Kafka asynchronously, detaching the calling process from the
 	acknowledge of the message being sent.
 	Default: true
+
+secure.config
+--------------------
+
+`plugin.@PLUGIN@.restApiUsername`
+:	Username used for the authentication to the
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	for sending/receiving messages through REST-API instead of using the native Kafka client.
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: unset
+
+`plugin.@PLUGIN@.restApiPassword`
+:	Password used for the authentication to the
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: unset
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
index 06c6801..5cbf585 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
@@ -20,6 +20,7 @@
 import java.net.URISyntaxException;
 import java.util.Map;
 import org.junit.Ignore;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
@@ -31,11 +32,12 @@
   public static final int KAFKA_REST_PORT = 8082;
   private final String kafkaRestHostname;
 
-  public KafkaRestContainer(KafkaContainer kafkaContainer) {
-    this(kafkaContainer, null);
+  public KafkaRestContainer(KafkaContainer kafkaContainer, Boolean enableAuthentication) {
+    this(kafkaContainer, null, enableAuthentication);
   }
 
-  public KafkaRestContainer(KafkaContainer kafkaContainer, String kafkaRestId) {
+  public KafkaRestContainer(
+      KafkaContainer kafkaContainer, String kafkaRestId, Boolean enableAuthentication) {
     super(restProxyImageFor(kafkaContainer));
 
     kafkaRestHostname = KAFKA_REST_PROXY_HOSTNAME + Strings.nullToEmpty(kafkaRestId);
@@ -54,6 +56,18 @@
     if (kafkaRestId != null) {
       withEnv("KAFKA_REST_ID", kafkaRestId);
     }
+    if (enableAuthentication) {
+      withEnv("KAFKA_REST_AUTHENTICATION_METHOD", "BASIC");
+      withEnv("KAFKA_REST_AUTHENTICATION_REALM", "KafkaRest");
+      withEnv("KAFKA_REST_AUTHENTICATION_ROLES", "GerritRole");
+      withEnv(
+          "KAFKAREST_OPTS",
+          "-Djava.security.auth.login.config=/etc/kafka-rest/rest-jaas.properties");
+      withClasspathResourceMapping(
+          "rest-jaas.properties", "/etc/kafka-rest/rest-jaas.properties", BindMode.READ_ONLY);
+      withClasspathResourceMapping(
+          "password.properties", "/etc/kafka-rest/password.properties", BindMode.READ_ONLY);
+    }
     withCreateContainerCmdModifier(cmd -> cmd.withHostName(kafkaRestHostname));
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 37f15da..1991467 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,6 +17,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.mock;
 
+import com.google.common.base.Strings;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
@@ -65,6 +66,8 @@
   static KafkaRestContainer kafkaRest;
   static KafkaRestContainer kafkaRestWithId;
   static GenericContainer<?> nginx;
+  static String restApiUsername;
+  static String restApiPassword;
 
   static final int TEST_NUM_SUBSCRIBERS = 1;
   static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
@@ -167,9 +170,9 @@
   public static void beforeClass() throws Exception {
     kafka = KafkaContainerProvider.get();
     kafka.start();
-    kafkaRestWithId = new KafkaRestContainer(kafka, KAFKA_REST_ID);
+    kafkaRestWithId = new KafkaRestContainer(kafka, KAFKA_REST_ID, isAuthenticationProvided());
     kafkaRestWithId.start();
-    kafkaRest = new KafkaRestContainer(kafka);
+    kafkaRest = new KafkaRestContainer(kafka, isAuthenticationProvided());
     kafkaRest.start();
 
     System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
@@ -194,6 +197,10 @@
     }
   }
 
+  private static boolean isAuthenticationProvided() {
+    return !Strings.isNullOrEmpty(restApiUsername) && !Strings.isNullOrEmpty(restApiPassword);
+  }
+
   protected TestModule newTestModule(KafkaProperties kafkaProperties) {
     return new TestModule(kafkaProperties);
   }
@@ -221,7 +228,9 @@
 
   @Test
   public void shouldSendSyncAndReceiveToTopic() {
-    connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_sync";
     TestConsumer testConsumer = new TestConsumer(1);
@@ -240,7 +249,9 @@
 
   @Test
   public void shouldSendAsyncAndReceiveToTopic() {
-    connectToKafka(new KafkaProperties(true, clientType, getKafkaRestApiUriString()));
+    connectToKafka(
+        new KafkaProperties(
+            true, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_async";
     TestConsumer testConsumer = new TestConsumer(1);
@@ -259,7 +270,9 @@
 
   @Test
   public void shouldSendToTopicAndResetOffset() {
-    connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_reset";
     Event testEventMessage = new ProjectCreatedEvent();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
index c0c2bc7..5be61ab 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
@@ -56,7 +56,9 @@
                 TEST_GROUP_ID,
                 TEST_NUM_SUBSCRIBERS,
                 ClientType.REST,
-                getKafkaRestApiUriString());
+                getKafkaRestApiUriString(),
+                restApiUsername,
+                restApiPassword);
         bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
 
         bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
index 91b6286..cee073f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
@@ -35,9 +35,11 @@
                 + "  listen  [::]:80  default_server;\\n"
                 + "  location     /%s/ {\\n"
                 + "    proxy_pass http://%s:%d/; \\n"
+                + "	   proxy_set_header Authorization \"Basic Z2Vycml0OnNlY3JldA==\";\\n"
                 + "  }\\n"
                 + "  location     / {\\n"
                 + "    proxy_pass http://%s:%d; \\n"
+                + "	   proxy_set_header Authorization \"Basic Z2Vycml0OnNlY3JldA==\";\\n"
                 + "  }\\n"
                 + "}",
             KAFKA_REST_ID,
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java
new file mode 100644
index 0000000..abd054f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerRestApiWithAuthenticationTest extends KafkaBrokerRestApiTest {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    restApiUsername = "gerrit";
+    restApiPassword = "secret";
+    KafkaBrokerRestApiTest.beforeClass();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
index 9125989..6ee06e7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
@@ -56,7 +56,9 @@
                 TEST_GROUP_ID,
                 TEST_NUM_SUBSCRIBERS,
                 ClientType.REST,
-                getApiUriString());
+                getApiUriString(),
+                null,
+                null);
         bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
 
         bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
diff --git a/src/test/resources/password.properties b/src/test/resources/password.properties
new file mode 100644
index 0000000..bcff92d
--- /dev/null
+++ b/src/test/resources/password.properties
@@ -0,0 +1 @@
+gerrit: secret,GerritRole
\ No newline at end of file
diff --git a/src/test/resources/rest-jaas.properties b/src/test/resources/rest-jaas.properties
new file mode 100644
index 0000000..719a030
--- /dev/null
+++ b/src/test/resources/rest-jaas.properties
@@ -0,0 +1,5 @@
+KafkaRest {
+    org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+    debug="true"
+    file="/etc/kafka-rest/password.properties";
+};
\ No newline at end of file