Use Kafka REST Proxy id to subscribe to the correct instance

When using a Kafka REST Proxy with multiple instances behind a
workload balancer, the consumers are allocated on one of the
allocated instances. The Kafka REST client needs to understand
which one is the instance that the client should talk to.

For deployments that can use workload balancers with sticky
sessions, there isn't any logic needed on the client side, because
the cookies allocated and stored on the client would do the job.
However, when using a standard K8s deployment[1], the sticky
session allocation isn't an option and any of the incoming
traffic may reach any of the pods.

Kafka REST Proxy allows with this change to specify the REST_PROXY_ID
with the placeholder ${rest_proxy_id} in the API URL, to help
redirecting the calls to the correct instance.

Example:
[plugin "events-kafka"]
  restApiUri = http://kafka-rest:8080/${kafka_rest_id}

[1] https://kubernetes.io/docs/concepts/workloads/controllers/deployment/

Change-Id: I396c3bc759ee05547e8c16cb4806b4fea63b833c
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index d8e22a0..62330e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -35,6 +35,8 @@
 
 @Singleton
 public class KafkaProperties extends java.util.Properties {
+  public static final String REST_API_URI_ID_PLACEHOLDER = "${kafka_rest_id}";
+
   private static final String PROPERTY_HTTP_WIRE_LOG = "httpWireLog";
   private static final boolean DEFAULT_HTTP_WIRE_LOG = false;
   private static final String PROPERTY_REST_API_URI = "restApiUri";
@@ -61,7 +63,7 @@
   private final String topic;
   private final boolean sendAsync;
   private final ClientType clientType;
-  private final URI restApiUri;
+  private final String restApiUriString;
   private final boolean httpWireLog;
   private final Duration restApiTimeout;
   private final int restApiThreads;
@@ -79,16 +81,11 @@
 
     switch (clientType) {
       case REST:
-        String restApiUriString = fromGerritConfig.getString(PROPERTY_REST_API_URI);
+        restApiUriString = fromGerritConfig.getString(PROPERTY_REST_API_URI);
         if (Strings.isNullOrEmpty(restApiUriString)) {
           throw new IllegalArgumentException("Missing REST API URI in Kafka properties");
         }
 
-        try {
-          restApiUri = new URI(restApiUriString);
-        } catch (URISyntaxException e) {
-          throw new IllegalArgumentException("Invalid Kafka REST API URI: " + restApiUriString, e);
-        }
         httpWireLog = fromGerritConfig.getBoolean(PROPERTY_HTTP_WIRE_LOG, DEFAULT_HTTP_WIRE_LOG);
         restApiTimeout =
             Duration.ofMillis(
@@ -101,7 +98,7 @@
         break;
       case NATIVE:
       default:
-        restApiUri = null;
+        restApiUriString = null;
         httpWireLog = false;
         restApiTimeout = null;
         restApiThreads = 0;
@@ -113,13 +110,14 @@
   }
 
   @VisibleForTesting
-  public KafkaProperties(boolean sendAsync, ClientType clientType, @Nullable URI restApiURI) {
+  public KafkaProperties(
+      boolean sendAsync, ClientType clientType, @Nullable String restApiUriString) {
     super();
     setDefaults();
     topic = DEFAULT_STREAM_EVENTS_TOPIC_NAME;
     this.sendAsync = sendAsync;
     this.clientType = clientType;
-    this.restApiUri = restApiURI;
+    this.restApiUriString = restApiUriString;
     initDockerizedKafkaServer();
     this.httpWireLog = false;
     restApiTimeout = DEFAULT_REST_API_TIMEOUT;
@@ -174,8 +172,12 @@
     return clientType;
   }
 
-  public URI getRestApiUri() {
-    return restApiUri;
+  public URI getRestApiUri() throws URISyntaxException {
+    return getRestApiUri("");
+  }
+
+  public URI getRestApiUri(String kafkaRestId) throws URISyntaxException {
+    return new URI(restApiUriString.replace(REST_API_URI_ID_PLACEHOLDER, kafkaRestId));
   }
 
   public boolean isHttpWireLog() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 1b0e4db..9b4ef8f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -19,7 +19,6 @@
 import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import java.net.URI;
 
 @Singleton
 public class KafkaSubscriberProperties extends KafkaProperties {
@@ -55,8 +54,8 @@
       String groupId,
       int numberOfSubscribers,
       ClientType clientType,
-      URI restApiURI) {
-    super(true, clientType, restApiURI);
+      String restApiUriString) {
+    super(true, clientType, restApiUriString);
     this.pollingInterval = pollingInterval;
     this.groupId = groupId;
     this.numberOfSubscribers = numberOfSubscribers;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
index 62021f8..0f7d0c0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
@@ -30,6 +30,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.Enumeration;
 import java.util.Set;
@@ -63,9 +64,9 @@
 
   private final HttpHostProxy proxy;
   private final CloseableHttpAsyncClient httpclient;
-  private final URI kafkaRestApiUri;
   private final ExecutorService futureExecutor;
   private final int kafkaRestApiTimeoutMsec;
+  private final KafkaProperties configuration;
 
   private static boolean logConfigured;
 
@@ -81,7 +82,7 @@
     proxy = httpHostProxy;
     httpclient = proxy.apply(HttpAsyncClients.custom()).build();
     httpclient.start();
-    kafkaRestApiUri = configuration.getRestApiUri();
+    this.configuration = configuration;
     kafkaRestApiTimeoutMsec = (int) configuration.getRestApiTimeout().toMillis();
     if (configuration.isHttpWireLog()) {
       enableHttpWireLog();
@@ -136,7 +137,7 @@
   }
 
   public HttpGet createGetTopic(String topic) {
-    HttpGet get = new HttpGet(kafkaRestApiUri.resolve("/topics/" + topic));
+    HttpGet get = new HttpGet(resolveKafkaRestApiUri("/topics/" + topic));
     get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2);
     get.setConfig(createRequestConfig());
     return get;
@@ -152,10 +153,7 @@
   public HttpPost createPostToConsumer(String consumerGroup) {
     HttpPost post =
         new HttpPost(
-            kafkaRestApiUri.resolve(
-                kafkaRestApiUri.getPath()
-                    + "/consumers/"
-                    + URLEncoder.encode(consumerGroup, UTF_8)));
+            resolveKafkaRestApiUri("/consumers/" + URLEncoder.encode(consumerGroup, UTF_8)));
     post.addHeader(HttpHeaders.ACCEPT, MediaType.ANY_TYPE.toString());
     post.setConfig(createRequestConfig());
     post.setEntity(
@@ -192,7 +190,7 @@
 
   public HttpPost createPostToTopic(String topic, HttpEntity postBodyEntity) {
     HttpPost post =
-        new HttpPost(kafkaRestApiUri.resolve("/topics/" + URLEncoder.encode(topic, UTF_8)));
+        new HttpPost(resolveKafkaRestApiUri("/topics/" + URLEncoder.encode(topic, UTF_8)));
     post.addHeader(HttpHeaders.ACCEPT, "*/*");
     post.setConfig(createRequestConfig());
     post.setEntity(postBodyEntity);
@@ -277,7 +275,22 @@
     httpclient.close();
   }
 
-  public URI resolveURI(String path) {
-    return kafkaRestApiUri.resolve(path);
+  public URI resolveKafkaRestApiUri(String path) {
+    try {
+      URI restApiUri = configuration.getRestApiUri();
+      return restApiUri.resolve(path);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid Kafka REST API URI", e);
+    }
+  }
+
+  public URI resolveKafkaRestApiUri(String kafkaRestId, String path) {
+    URI restApiUri;
+    try {
+      restApiUri = configuration.getRestApiUri(kafkaRestId);
+      return restApiUri.resolve(restApiUri.getPath() + path);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid Kafka REST API URI", e);
+    }
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index fbaef6f..69224e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -19,6 +19,7 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -74,7 +75,13 @@
         break;
 
       case REST:
-        URI kafkaProxyUri = properties.getRestApiUri();
+        URI kafkaProxyUri;
+        try {
+          kafkaProxyUri = properties.getRestApiUri();
+        } catch (URISyntaxException e) {
+          LOGGER.error("Invalid Kafka Proxy URI: session not started", e);
+          return;
+        }
         if (kafkaProxyUri == null) {
           LOGGER.warn("No Kafka Proxy URL property defined: session not started.");
           return;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
index 1b638a3..0f4e04c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
@@ -61,6 +61,12 @@
 public class KafkaEventRestSubscriber implements KafkaEventSubscriber {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+  // Prefix is a length of 'rest-consumer-' string
+  private static final int INSTANCE_ID_PREFIX_LEN = 14;
+  /**
+   * Suffix is a length of a unique identifier for example: '-9836fe85-d838-4722-97c9-4a7b-34e834d'
+   */
+  private static final int INSTANCE_ID_SUFFIX_LEN = 37;
 
   private final OneOffRequestContext oneOffCtx;
   private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -281,12 +287,27 @@
           new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)) {
         JsonObject responseJson = gson.fromJson(bodyReader, JsonObject.class);
         URI consumerUri = new URI(responseJson.get("base_uri").getAsString());
-        return Futures.immediateFuture(restClient.resolveURI(consumerUri.getPath()));
+        String instanceId = responseJson.get("instance_id").getAsString();
+
+        String restProxyId = getRestProxyId(instanceId);
+        return Futures.immediateFuture(
+            restClient.resolveKafkaRestApiUri(restProxyId, consumerUri.getPath()));
       } catch (UnsupportedOperationException | IOException | URISyntaxException e) {
         return Futures.immediateFailedFuture(e);
       }
     }
 
+    private String getRestProxyId(String instanceId) {
+      int instanceIdLen = instanceId.length();
+      if (instanceIdLen <= INSTANCE_ID_SUFFIX_LEN + INSTANCE_ID_PREFIX_LEN) {
+        // Kafka Rest Proxy instance id is not mandatory
+        return "";
+      }
+
+      return instanceId.substring(
+          INSTANCE_ID_PREFIX_LEN, instanceId.length() - INSTANCE_ID_SUFFIX_LEN);
+    }
+
     private ListenableFuture<ConsumerRecords<byte[], byte[]>> convertRecords(
         HttpResponse response) {
       try (Reader bodyReader = new InputStreamReader(response.getEntity().getContent())) {
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index afe775e..24bdc8c 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -62,6 +62,10 @@
 :	URL of the
 	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
 	for sending/receiving messages through REST-API instead of using the native Kafka client.
+	The value can use the `${kafka_rest_id}` placeholder which will be replaced at runtime using
+	the `KAFKA_REST_ID` associated with the Kafka REST-API Proxy that is answering the call,
+	typically needed when having multiple proxies behind a workload balancer and sticky session
+	allocation isn't an option.
 	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
 	Default: unset
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
index dbcc455..06c6801 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
@@ -14,8 +14,11 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
+import com.github.dockerjava.api.model.ContainerNetwork;
+import com.google.common.base.Strings;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 import org.junit.Ignore;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
@@ -24,13 +27,19 @@
 @Ignore
 public class KafkaRestContainer extends GenericContainer<KafkaRestContainer> {
 
-  private static final String KAFKA_REST_PROXY_HOSTNAME = "restproxy";
-
+  public static final String KAFKA_REST_PROXY_HOSTNAME = "restproxy";
   public static final int KAFKA_REST_PORT = 8082;
+  private final String kafkaRestHostname;
 
   public KafkaRestContainer(KafkaContainer kafkaContainer) {
+    this(kafkaContainer, null);
+  }
+
+  public KafkaRestContainer(KafkaContainer kafkaContainer, String kafkaRestId) {
     super(restProxyImageFor(kafkaContainer));
 
+    kafkaRestHostname = KAFKA_REST_PROXY_HOSTNAME + Strings.nullToEmpty(kafkaRestId);
+
     withNetwork(kafkaContainer.getNetwork());
 
     withExposedPorts(KAFKA_REST_PORT);
@@ -41,8 +50,11 @@
     withEnv("KAFKA_REST_BOOTSTRAP_SERVERS", bootstrapServers);
     withEnv("KAFKA_REST_LISTENERS", "http://0.0.0.0:" + KAFKA_REST_PORT);
     withEnv("KAFKA_REST_CLIENT_SECURITY_PROTOCOL", "PLAINTEXT");
-    withEnv("KAFKA_REST_HOST_NAME", KAFKA_REST_PROXY_HOSTNAME);
-    withCreateContainerCmdModifier(cmd -> cmd.withHostName(KAFKA_REST_PROXY_HOSTNAME));
+    withEnv("KAFKA_REST_HOST_NAME", kafkaRestHostname);
+    if (kafkaRestId != null) {
+      withEnv("KAFKA_REST_ID", kafkaRestId);
+    }
+    withCreateContainerCmdModifier(cmd -> cmd.withHostName(kafkaRestHostname));
   }
 
   private static DockerImageName restProxyImageFor(KafkaContainer kafkaContainer) {
@@ -58,4 +70,9 @@
       throw new IllegalArgumentException("Invalid Kafka API URI", e);
     }
   }
+
+  public String getKafkaRestContainerIP() {
+    Map<String, ContainerNetwork> networks = getContainerInfo().getNetworkSettings().getNetworks();
+    return networks.values().iterator().next().getIpAddress();
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 2a2d6d4..6868794 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -40,7 +40,6 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -57,6 +56,7 @@
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -64,10 +64,13 @@
 
   static KafkaContainer kafka;
   static KafkaRestContainer kafkaRest;
+  static KafkaRestContainer kafkaRestWithId;
+  static GenericContainer<?> nginx;
 
   static final int TEST_NUM_SUBSCRIBERS = 1;
   static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
   static final int TEST_POLLING_INTERVAL_MSEC = 100;
+  static final String KAFKA_REST_ID = "kafka-rest-instance-0";
   private static final int TEST_THREAD_POOL_SIZE = 10;
   private static final UUID TEST_INSTANCE_ID = UUID.randomUUID();
   private static final TimeUnit TEST_TIMEOUT_UNIT = TimeUnit.SECONDS;
@@ -160,8 +163,11 @@
   public static void beforeClass() throws Exception {
     kafka = KafkaContainerProvider.get();
     kafka.start();
+    kafkaRestWithId = new KafkaRestContainer(kafka, KAFKA_REST_ID);
+    kafkaRestWithId.start();
     kafkaRest = new KafkaRestContainer(kafka);
     kafkaRest.start();
+
     System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
   }
 
@@ -172,8 +178,15 @@
 
   @AfterClass
   public static void afterClass() {
-    if (kafka != null) {
-      kafka.stop();
+    stopContainer(kafka);
+    stopContainer(kafkaRest);
+    stopContainer(kafkaRestWithId);
+    stopContainer(nginx);
+  }
+
+  private static void stopContainer(GenericContainer<?> container) {
+    if (container != null) {
+      container.stop();
     }
   }
 
@@ -204,7 +217,7 @@
 
   @Test
   public void shouldSendSyncAndReceiveToTopic() {
-    connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiURI()));
+    connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_sync";
     TestConsumer testConsumer = new TestConsumer(1);
@@ -222,7 +235,7 @@
 
   @Test
   public void shouldSendAsyncAndReceiveToTopic() {
-    connectToKafka(new KafkaProperties(true, clientType, getKafkaRestApiURI()));
+    connectToKafka(new KafkaProperties(true, clientType, getKafkaRestApiUriString()));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_async";
     TestConsumer testConsumer = new TestConsumer(1);
@@ -240,7 +253,7 @@
 
   @Test
   public void shouldSendToTopicAndResetOffset() {
-    connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiURI()));
+    connectToKafka(new KafkaProperties(false, clientType, getKafkaRestApiUriString()));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
     String testTopic = "test_topic_reset";
     TestConsumer testConsumer = new TestConsumer(1);
@@ -260,7 +273,7 @@
     assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
   }
 
-  protected URI getKafkaRestApiURI() {
+  protected String getKafkaRestApiUriString() {
     return null;
   }
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
index d5f3c64..c0c2bc7 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
@@ -23,7 +23,6 @@
 import com.googlesource.gerrit.plugins.kafka.rest.FutureExecutor;
 import com.googlesource.gerrit.plugins.kafka.rest.HttpHostProxy;
 import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
-import java.net.URI;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.kafka.clients.producer.Producer;
@@ -32,7 +31,7 @@
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
-public class KafkaBrokerRestApiTest extends KafkaBrokerApiTest {
+public class KafkaBrokerRestApiTest extends KafkaBrokerRestApiTestBase {
 
   @Override
   @Before
@@ -57,7 +56,7 @@
                 TEST_GROUP_ID,
                 TEST_NUM_SUBSCRIBERS,
                 ClientType.REST,
-                kafkaRest.getApiURI());
+                getKafkaRestApiUriString());
         bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
 
         bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
@@ -68,7 +67,7 @@
   }
 
   @Override
-  protected URI getKafkaRestApiURI() {
-    return kafkaRest.getApiURI();
+  protected String getKafkaRestApiUriString() {
+    return kafkaRest.getApiURI().toString();
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
new file mode 100644
index 0000000..91b6286
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.googlesource.gerrit.plugins.kafka.KafkaRestContainer;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+@Ignore
+public class KafkaBrokerRestApiTestBase extends KafkaBrokerApiTest {
+  private static final String NGINX_IMAGE = "nginx:1.21.5";
+
+  @SuppressWarnings("resource")
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    KafkaBrokerApiTest.beforeClass();
+    String nginxKafkaConf =
+        String.format(
+            "server {\\n"
+                + "  listen       80  default_server;\\n"
+                + "  listen  [::]:80  default_server;\\n"
+                + "  location     /%s/ {\\n"
+                + "    proxy_pass http://%s:%d/; \\n"
+                + "  }\\n"
+                + "  location     / {\\n"
+                + "    proxy_pass http://%s:%d; \\n"
+                + "  }\\n"
+                + "}",
+            KAFKA_REST_ID,
+            kafkaRestWithId.getKafkaRestContainerIP(),
+            KafkaRestContainer.KAFKA_REST_PORT,
+            kafkaRestWithId.getKafkaRestContainerIP(),
+            KafkaRestContainer.KAFKA_REST_PORT);
+    nginx =
+        new GenericContainer<>(
+                new ImageFromDockerfile()
+                    .withDockerfileFromBuilder(
+                        builder ->
+                            builder
+                                .from(NGINX_IMAGE)
+                                .run(
+                                    "sh",
+                                    "-c",
+                                    String.format(
+                                        "echo '%s' | tee /etc/nginx/conf.d/default.conf",
+                                        nginxKafkaConf))
+                                .build()))
+            .withExposedPorts(80)
+            .withNetwork(kafkaRestWithId.getNetwork())
+            .waitingFor(new HttpWaitStrategy());
+    nginx.start();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
new file mode 100644
index 0000000..9125989
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
@@ -0,0 +1,81 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaRestProducer;
+import com.googlesource.gerrit.plugins.kafka.rest.FutureExecutor;
+import com.googlesource.gerrit.plugins.kafka.rest.HttpHostProxy;
+import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.kafka.clients.producer.Producer;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerRestApiWithIdPrefixTest extends KafkaBrokerRestApiTestBase {
+
+  @Override
+  @Before
+  public void setup() {
+    clientType = ClientType.REST;
+  }
+
+  @Override
+  protected TestModule newTestModule(KafkaProperties kafkaProperties) {
+    return new TestModule(kafkaProperties) {
+
+      @Override
+      protected void bindKafkaClientImpl() {
+        bind(new TypeLiteral<Producer<String, String>>() {}).to(KafkaRestProducer.class);
+        bind(ExecutorService.class)
+            .annotatedWith(FutureExecutor.class)
+            .toInstance(Executors.newCachedThreadPool());
+
+        KafkaSubscriberProperties kafkaSubscriberProperties =
+            new KafkaSubscriberProperties(
+                TEST_POLLING_INTERVAL_MSEC,
+                TEST_GROUP_ID,
+                TEST_NUM_SUBSCRIBERS,
+                ClientType.REST,
+                getApiUriString());
+        bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+
+        bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
+
+        install(new FactoryModuleBuilder().build(KafkaRestClient.Factory.class));
+      }
+    };
+  }
+
+  @Override
+  protected String getKafkaRestApiUriString() {
+    return getApiUriString();
+  }
+
+  private static String getApiUriString() {
+    return String.format(
+        "http://%s:%d/%s",
+        nginx.getHost(),
+        nginx.getLivenessCheckPortNumbers().iterator().next(),
+        KafkaProperties.REST_API_URI_ID_PLACEHOLDER);
+  }
+}