Merge branch 'stable-3.4' into stable-3.5

* stable-3.4:
  Register Kafka consumers with external groupId when plugin starts
  Make KafkaBrokerApi class implement ExtendedBrokerApi interface
  Consume events-broker from source
  Add validation dependency on events-broker module
  Add .gitignore file in the project
  Pass correct amount of arguments to Malformed event log line
  Add Kafka REST API authentication
  Fix the topic events replay Kafka REST-API
  Use Kafka REST Proxy id to subscribe to the correct instance
  Fix Kafka REST Proxy accepts header for topic meta-data
  Kafka REST Client: avoid clashes between clients
  Fix threshold of HTTP wire logging
  Delete subscription at the end of ReceiverJob
  Update kafka-client 2.1.0 -> 2.1.1
  Increase patience to 30s for shouldReplayAllEvents test
  Remove unused RequestConfigProvider
  REST ClientType: Make thread pool and timeouts configuration
  Extract configuration properties into constants
  Manage Kafka clientType when starting session
  Receive messages through Kafka REST API
  Send messages through Kafka REST API
  Abstract Publisher/Subscriber into generic interfaces
  Wait at most for 5s for an empty topic
  Assert that messages are acknowledged in KafkaBrokerApiTest
  Add Kafka REST-API container in test
  Remove access to deprecated poll(long) method
  Use explicit Kafka image:tag in tests
  Do not connect KafkaSession without bootstrap servers

Change-Id: I120c9fffe052195f31b1132e0c6fc0fd35680840
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..9ad228d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,91 @@
+# Created by https://www.toptal.com/developers/gitignore/api/intellij+iml
+# Edit at https://www.toptal.com/developers/gitignore?templates=intellij+iml
+
+### Intellij+iml ###
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# AWS User-specific
+.idea/**/aws.xml
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn.  Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+*.iws
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# SonarLint plugin
+.idea/sonarlint/
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+
+### Intellij+iml Patch ###
+# Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-249601023
+
+*.iml
+modules.xml
+.idea/misc.xml
+*.ipr
+
+# End of https://www.toptal.com/developers/gitignore/api/intellij+iml
\ No newline at end of file
diff --git a/BUILD b/BUILD
index b169571..9a517b6 100644
--- a/BUILD
+++ b/BUILD
@@ -18,19 +18,26 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
-        "@events-broker//jar",
+        ":events-broker-neverlink",
+        "//lib/httpcomponents:httpclient",
+        "@httpasyncclient//jar",
+        "@httpcore-nio//jar",
         "@kafka-client//jar",
     ],
 )
 
 junit_tests(
     name = "events_kafka_tests",
+    timeout = "long",
     srcs = glob(["src/test/java/**/*.java"]),
+    resources = glob(["src/test/resources/**/*"]),
     tags = ["events-kafka"],
     deps = [
         ":events-kafka__plugin_test_deps",
-        "@events-broker//jar",
+        "//plugins/events-broker",
         "@kafka-client//jar",
+        "@testcontainers-kafka//jar",
+        "@testcontainers//jar",
     ],
 )
 
@@ -50,3 +57,9 @@
         "@jna//jar",
     ],
 )
+
+java_library(
+    name = "events-broker-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/events-broker"],
+)
diff --git a/Jenkinsfile b/Jenkinsfile
index b26b19f..589453c 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,3 @@
 pluginPipeline(formatCheckId: 'gerritforge:plugins-events-kafka-code-style',
-                buildCheckId: 'gerritforge:plugins-events-kafka-build-test')
+                buildCheckId: 'gerritforge:plugins-events-kafka-build-test',
+                extraModules: ['events-broker'])
diff --git a/README.md b/README.md
index ae1184d..432d7d8 100644
--- a/README.md
+++ b/README.md
@@ -29,11 +29,15 @@
 ---------------------
 Kafka plugin can be build as a regular 'in-tree' plugin. That means that is required to
 clone a Gerrit source tree first and then to have the Kafka plugin source directory into
-the /plugins path. Additionally, the plugins/external_plugin_deps.bzl file needs to be
+the /plugins path. The plugin depends on [events-broker](https://gerrit.googlesource.com/modules/events-broker)
+which is linked directly from source with the same 'in-tree' plugin structure.
+
+Additionally, the plugins/external_plugin_deps.bzl file needs to be
 updated to match the Kafka plugin one.
 
     git clone --recursive https://gerrit.googlesource.com/gerrit
     git clone https://gerrit.googlesource.com/plugins/events-kafka gerrit/plugins/events-kafka
+    git clone https://gerrit.googlesource.com/modules/events-broker gerrit/plugins/events-broker
     cd gerrit
     rm plugins/external_plugin_deps.bzl
     ln -s ./events-kafka/external_plugin_deps.bzl plugins/.
@@ -45,6 +49,7 @@
 The output is created in
 
     bazel-genfiles/plugins/events-kafka/events-kafka.jar
+    bazel-genfiles/plugins/events-broker/events-broker.jar
 
 Minimum Configuration
 ---------------------
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index db6d899..1f23015 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -7,6 +7,18 @@
         sha1 = "a7b72831768ccfd69128385130409ae1a0e52f5f",
     )
 
+    maven_jar(
+        name = "httpcore-nio",
+        artifact = "org.apache.httpcomponents:httpcore-nio:4.4.12",
+        sha1 = "84cd29eca842f31db02987cfedea245af020198b",
+    )
+
+    maven_jar(
+        name = "httpasyncclient",
+        artifact = "org.apache.httpcomponents:httpasyncclient:4.1.4",
+        sha1 = "f3a3240681faae3fa46b573a4c7e50cec9db0d86",
+    )
+
     TESTCONTAINERS_VERSION = "1.15.3"
 
     maven_jar(
@@ -58,9 +70,3 @@
         artifact = "com.fasterxml.jackson.core:jackson-annotations:2.10.3",
         sha1 = "0f63b3b1da563767d04d2e4d3fc1ae0cdeffebe7",
     )
-
-    maven_jar(
-        name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.5.0-alpha-202108041529",
-        sha1 = "309fe8cc08c46593d9990d4e5c448cc85e5a62b0",
-    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index 4dc394b..9ed5165 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -15,7 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -27,13 +29,19 @@
 
   private final KafkaPublisher publisher;
   private final Set<TopicSubscriber> consumers;
-  private final BrokerApi brokerApi;
+  private final Set<TopicSubscriberWithGroupId> consumersWithGroupId;
+  private final ExtendedBrokerApi brokerApi;
 
   @Inject
-  public Manager(KafkaPublisher publisher, Set<TopicSubscriber> consumers, BrokerApi brokerApi) {
+  public Manager(
+      KafkaPublisher publisher,
+      Set<TopicSubscriber> consumers,
+      Set<TopicSubscriberWithGroupId> consumersWithGroupId,
+      BrokerApi brokerApi) {
     this.publisher = publisher;
     this.consumers = consumers;
-    this.brokerApi = brokerApi;
+    this.brokerApi = (ExtendedBrokerApi) brokerApi;
+    this.consumersWithGroupId = consumersWithGroupId;
   }
 
   @Override
@@ -42,6 +50,15 @@
     consumers.forEach(
         topicSubscriber ->
             brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+
+    consumersWithGroupId.forEach(
+        topicSubscriberWithGroupId -> {
+          TopicSubscriber topicSubscriber = topicSubscriberWithGroupId.topicSubscriber();
+          brokerApi.receiveAsync(
+              topicSubscriber.topic(),
+              topicSubscriberWithGroupId.groupId(),
+              topicSubscriber.consumer());
+        });
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 618fea8..0a1ff35 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -17,24 +17,42 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventListener;
+import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
+import com.google.inject.Scopes;
 import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaPublisherProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaRestProducer;
+import com.googlesource.gerrit.plugins.kafka.rest.FutureExecutor;
+import com.googlesource.gerrit.plugins.kafka.rest.HttpHostProxy;
+import com.googlesource.gerrit.plugins.kafka.rest.HttpHostProxyProvider;
+import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import java.util.concurrent.ExecutorService;
+import org.apache.kafka.clients.producer.Producer;
 
 class Module extends AbstractModule {
-
   private final KafkaApiModule kafkaBrokerModule;
+  private final KafkaProperties kafkaConf;
+  private final WorkQueue workQueue;
   private final KafkaPublisherProperties configuration;
 
   @Inject
-  public Module(KafkaApiModule kafkaBrokerModule, KafkaPublisherProperties configuration) {
+  public Module(
+      KafkaApiModule kafkaBrokerModule,
+      KafkaPublisherProperties configuration,
+      KafkaProperties kafkaConf,
+      WorkQueue workQueue) {
     this.kafkaBrokerModule = kafkaBrokerModule;
     this.configuration = configuration;
+    this.kafkaConf = kafkaConf;
+    this.workQueue = workQueue;
   }
 
   @Override
@@ -45,8 +63,25 @@
       DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
     }
 
-    bind(new TypeLiteral<KafkaProducer<String, String>>() {})
-        .toProvider(KafkaProducerProvider.class);
+    ClientType clientType = kafkaConf.getClientType();
+    switch (clientType) {
+      case NATIVE:
+        bind(new TypeLiteral<Producer<String, String>>() {})
+            .toProvider(KafkaProducerProvider.class);
+        break;
+      case REST:
+        bind(ExecutorService.class)
+            .annotatedWith(FutureExecutor.class)
+            .toInstance(
+                workQueue.createQueue(
+                    kafkaConf.getRestApiThreads(), "KafkaRestClientThreadPool", true));
+        bind(HttpHostProxy.class).toProvider(HttpHostProxyProvider.class).in(Scopes.SINGLETON);
+        bind(new TypeLiteral<Producer<String, String>>() {}).to(KafkaRestProducer.class);
+        install(new FactoryModuleBuilder().build(KafkaRestClient.Factory.class));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported Kafka client type " + clientType);
+    }
 
     install(kafkaBrokerModule);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
index a128af8..d3ee3f1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,7 +15,9 @@
 package com.googlesource.gerrit.plugins.kafka.api;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.collect.Sets;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
@@ -25,9 +27,14 @@
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventDeserializer;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventNativeSubscriber;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventRestSubscriber;
+import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -36,6 +43,7 @@
 @Singleton
 public class KafkaApiModule extends LifecycleModule {
   private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+  private Set<TopicSubscriberWithGroupId> activeConsumersWithGroupId = Sets.newHashSet();
   private WorkQueue workQueue;
   private KafkaSubscriberProperties configuration;
 
@@ -48,12 +56,33 @@
   @Inject(optional = true)
   public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
     if (previousBrokerApi != null && previousBrokerApi.get() != null) {
-      this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+      BrokerApi api = previousBrokerApi.get();
+      if (api instanceof ExtendedBrokerApi) {
+        this.activeConsumersWithGroupId = ((ExtendedBrokerApi) api).topicSubscribersWithGroupId();
+      }
+      this.activeConsumers = api.topicSubscribers();
     }
   }
 
   @Override
   protected void configure() {
+    ClientType clientType = configuration.getClientType();
+    switch (clientType) {
+      case NATIVE:
+        install(
+            new FactoryModuleBuilder()
+                .implement(KafkaEventSubscriber.class, KafkaEventNativeSubscriber.class)
+                .build(KafkaEventSubscriber.Factory.class));
+        break;
+      case REST:
+        install(
+            new FactoryModuleBuilder()
+                .implement(KafkaEventSubscriber.class, KafkaEventRestSubscriber.class)
+                .build(KafkaEventSubscriber.Factory.class));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported Kafka client type " + clientType);
+    }
 
     bind(ExecutorService.class)
         .annotatedWith(ConsumerExecutor.class)
@@ -63,6 +92,8 @@
     bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
     bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class);
     bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+    bind(new TypeLiteral<Set<TopicSubscriberWithGroupId>>() {})
+        .toInstance(activeConsumersWithGroupId);
 
     DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
index 3ec21e0..d1d6961 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -14,31 +14,32 @@
 
 package com.googlesource.gerrit.plugins.kafka.api;
 
-import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher;
 import com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-public class KafkaBrokerApi implements BrokerApi {
+public class KafkaBrokerApi implements ExtendedBrokerApi {
 
   private final KafkaPublisher publisher;
-  private final Provider<KafkaEventSubscriber> subscriberProvider;
+  private final KafkaEventSubscriber.Factory kafkaEventSubscriberFactory;
   private List<KafkaEventSubscriber> subscribers;
 
   @Inject
   public KafkaBrokerApi(
-      KafkaPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
+      KafkaPublisher publisher, KafkaEventSubscriber.Factory kafkaEventSubscriberFactory) {
     this.publisher = publisher;
-    this.subscriberProvider = subscriberProvider;
+    this.kafkaEventSubscriberFactory = kafkaEventSubscriberFactory;
     subscribers = new ArrayList<>();
   }
 
@@ -49,11 +50,12 @@
 
   @Override
   public void receiveAsync(String topic, Consumer<Event> eventConsumer) {
-    KafkaEventSubscriber subscriber = subscriberProvider.get();
-    synchronized (subscribers) {
-      subscribers.add(subscriber);
-    }
-    subscriber.subscribe(topic, eventConsumer);
+    receiveAsync(topic, eventConsumer, Optional.empty());
+  }
+
+  @Override
+  public void receiveAsync(String topic, String groupId, Consumer<Event> eventConsumer) {
+    receiveAsync(topic, eventConsumer, Optional.ofNullable(groupId));
   }
 
   @Override
@@ -67,14 +69,36 @@
   @Override
   public Set<TopicSubscriber> topicSubscribers() {
     return subscribers.stream()
+        .filter(s -> !s.getExternalGroupId().isPresent())
         .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
         .collect(Collectors.toSet());
   }
 
   @Override
+  public Set<TopicSubscriberWithGroupId> topicSubscribersWithGroupId() {
+    return subscribers.stream()
+        .filter(s -> s.getExternalGroupId().isPresent())
+        .map(
+            s ->
+                TopicSubscriberWithGroupId.topicSubscriberWithGroupId(
+                    s.getExternalGroupId().get(),
+                    TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor())))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
   public void replayAllEvents(String topic) {
     subscribers.stream()
         .filter(subscriber -> topic.equals(subscriber.getTopic()))
         .forEach(subscriber -> subscriber.resetOffset());
   }
+
+  private void receiveAsync(
+      String topic, Consumer<Event> eventConsumer, Optional<String> externalGroupId) {
+    KafkaEventSubscriber subscriber = kafkaEventSubscriberFactory.create(externalGroupId);
+    synchronized (subscribers) {
+      subscribers.add(subscriber);
+    }
+    subscriber.subscribe(topic, eventConsumer);
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 0c58c1b..05cda98 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -17,18 +17,42 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Strings;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.PluginConfig;
 import com.google.gerrit.server.config.PluginConfigFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 @Singleton
 public class KafkaProperties extends java.util.Properties {
+  public static final String REST_API_URI_ID_PLACEHOLDER = "${kafka_rest_id}";
+
+  private static final String PROPERTY_HTTP_WIRE_LOG = "httpWireLog";
+  private static final boolean DEFAULT_HTTP_WIRE_LOG = false;
+  private static final String PROPERTY_REST_API_URI = "restApiUri";
+  private static final String PROPERTY_REST_API_USERNAME = "restApiUsername";
+  private static final String PROPERTY_REST_API_PASSWORD = "restApiPassword";
+  private static final String PROPERTY_REST_API_TIMEOUT = "restApiTimeout";
+  private static final Duration DEFAULT_REST_API_TIMEOUT = Duration.ofSeconds(60);
+  private static final String PROPERTY_REST_API_THREADS = "restApiThreads";
+  private static final int DEFAULT_REST_API_THREADS = 10;
+  private static final String PROPERTY_CLIENT_TYPE = "clientType";
+  private static final ClientType DEFAULT_CLIENT_TYPE = ClientType.NATIVE;
+  private static final String PROPERTY_SEND_ASYNC = "sendAsync";
+  private static final boolean DEFAULT_SEND_ASYNC = true;
+  private static final String PROPERTY_STREAM_EVENTS_TOPIC_NAME = "topic";
+  private static final String DEFAULT_STREAM_EVENTS_TOPIC_NAME = "gerrit";
+
   private static final long serialVersionUID = 0L;
   public static final String SEND_STREAM_EVENTS_FIELD = "sendStreamEvents";
   public static final String STREAM_EVENTS_TOPIC_FIELD = "topic";
@@ -40,31 +64,93 @@
 
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
+  public enum ClientType {
+    NATIVE,
+    REST;
+  }
+
   private final String topic;
   private final boolean sendAsync;
   private final boolean sendStreamEvents;
+  private final ClientType clientType;
+  private final String restApiUriString;
+  private final String restApiUsername;
+  private final String restApiPassword;
+  private final boolean httpWireLog;
+  private final Duration restApiTimeout;
+  private final int restApiThreads;
 
   @Inject
   public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
     super();
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
-    topic = fromGerritConfig.getString(STREAM_EVENTS_TOPIC_FIELD, STREAM_EVENTS_TOPIC_DEFAULT);
-    sendAsync = fromGerritConfig.getBoolean(SEND_ASYNC_FIELD, SEND_ASYNC_DEFAULT);
     sendStreamEvents =
         fromGerritConfig.getBoolean(SEND_STREAM_EVENTS_FIELD, SEND_STREAM_EVENTS_DEFAULT);
+    topic =
+        fromGerritConfig.getString(
+            PROPERTY_STREAM_EVENTS_TOPIC_NAME, DEFAULT_STREAM_EVENTS_TOPIC_NAME);
+    sendAsync = fromGerritConfig.getBoolean(PROPERTY_SEND_ASYNC, DEFAULT_SEND_ASYNC);
+    clientType = fromGerritConfig.getEnum(PROPERTY_CLIENT_TYPE, DEFAULT_CLIENT_TYPE);
+
+    switch (clientType) {
+      case REST:
+        restApiUriString = fromGerritConfig.getString(PROPERTY_REST_API_URI);
+        if (Strings.isNullOrEmpty(restApiUriString)) {
+          throw new IllegalArgumentException("Missing REST API URI in Kafka properties");
+        }
+
+        restApiUsername = fromGerritConfig.getString(PROPERTY_REST_API_USERNAME);
+        restApiPassword = fromGerritConfig.getString(PROPERTY_REST_API_PASSWORD);
+        if (!Strings.isNullOrEmpty(restApiUsername) && Strings.isNullOrEmpty(restApiPassword)) {
+          throw new IllegalArgumentException("Missing REST API password in kafka properties");
+        }
+
+        httpWireLog = fromGerritConfig.getBoolean(PROPERTY_HTTP_WIRE_LOG, DEFAULT_HTTP_WIRE_LOG);
+        restApiTimeout =
+            Duration.ofMillis(
+                ConfigUtil.getTimeUnit(
+                    fromGerritConfig.getString(PROPERTY_REST_API_TIMEOUT),
+                    DEFAULT_REST_API_TIMEOUT.toMillis(),
+                    TimeUnit.MILLISECONDS));
+        restApiThreads =
+            fromGerritConfig.getInt(PROPERTY_REST_API_THREADS, DEFAULT_REST_API_THREADS);
+        break;
+      case NATIVE:
+      default:
+        restApiUriString = null;
+        restApiUsername = null;
+        restApiPassword = null;
+        httpWireLog = false;
+        restApiTimeout = null;
+        restApiThreads = 0;
+        break;
+    }
+
     applyConfig(fromGerritConfig);
     initDockerizedKafkaServer();
   }
 
   @VisibleForTesting
-  public KafkaProperties(boolean sendAsync) {
+  public KafkaProperties(
+      boolean sendAsync,
+      ClientType clientType,
+      @Nullable String restApiUriString,
+      @Nullable String restApiUsername,
+      @Nullable String restApiPassword) {
     super();
     setDefaults();
-    topic = "gerrit";
+    topic = DEFAULT_STREAM_EVENTS_TOPIC_NAME;
     this.sendAsync = sendAsync;
     this.sendStreamEvents = true;
+    this.clientType = clientType;
+    this.restApiUriString = restApiUriString;
     initDockerizedKafkaServer();
+    this.httpWireLog = false;
+    restApiTimeout = DEFAULT_REST_API_TIMEOUT;
+    restApiThreads = DEFAULT_REST_API_THREADS;
+    this.restApiUsername = restApiUsername;
+    this.restApiPassword = restApiPassword;
   }
 
   private void setDefaults() {
@@ -114,4 +200,36 @@
   public boolean isSendStreamEvents() {
     return sendStreamEvents;
   }
+
+  public ClientType getClientType() {
+    return clientType;
+  }
+
+  public URI getRestApiUri() throws URISyntaxException {
+    return getRestApiUri("");
+  }
+
+  public String getRestApiUsername() {
+    return restApiUsername;
+  }
+
+  public String getRestApiPassword() {
+    return restApiPassword;
+  }
+
+  public URI getRestApiUri(String kafkaRestId) throws URISyntaxException {
+    return new URI(restApiUriString.replace(REST_API_URI_ID_PLACEHOLDER, kafkaRestId));
+  }
+
+  public boolean isHttpWireLog() {
+    return httpWireLog;
+  }
+
+  public Duration getRestApiTimeout() {
+    return restApiTimeout;
+  }
+
+  public int getRestApiThreads() {
+    return restApiThreads;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
index 94e8e62..f9100ef 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaSubscriberProperties.java
@@ -43,8 +43,21 @@
   }
 
   @VisibleForTesting
-  public KafkaSubscriberProperties(int pollingInterval, String groupId, int numberOfSubscribers) {
-    super(true);
+  public KafkaSubscriberProperties(
+      int pollingInterval, String groupId, int numberOfSubscribers, ClientType clientType) {
+    this(pollingInterval, groupId, numberOfSubscribers, clientType, null, null, null);
+  }
+
+  @VisibleForTesting
+  public KafkaSubscriberProperties(
+      int pollingInterval,
+      String groupId,
+      int numberOfSubscribers,
+      ClientType clientType,
+      String restApiUriString,
+      String restApiUsername,
+      String restApiPassword) {
+    super(true, clientType, restApiUriString, restApiUsername, restApiPassword);
     this.pollingInterval = pollingInterval;
     this.groupId = groupId;
     this.numberOfSubscribers = numberOfSubscribers;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaRestProducer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaRestProducer.java
new file mode 100644
index 0000000..8799cef
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaRestProducer.java
@@ -0,0 +1,137 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.publish;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+
+public class KafkaRestProducer implements Producer<String, String> {
+  private static final RecordMetadata ZEROS_RECORD_METADATA =
+      new RecordMetadata(null, 0, 0, 0, null, 0, 0);
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final String KAFKA_V2_JSON = "application/vnd.kafka.json.v2+json";
+  private final KafkaRestClient restClient;
+
+  @Inject
+  public KafkaRestProducer(KafkaProperties kafkaConf, KafkaRestClient.Factory restClientFactory) {
+    restClient = restClientFactory.create(kafkaConf);
+  }
+
+  @Override
+  public void initTransactions() {
+    unsupported();
+  }
+
+  @Override
+  public void beginTransaction() throws ProducerFencedException {
+    unsupported();
+  }
+
+  @Override
+  public void sendOffsetsToTransaction(
+      Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
+      throws ProducerFencedException {
+    unsupported();
+  }
+
+  @Override
+  public void commitTransaction() throws ProducerFencedException {
+    unsupported();
+  }
+
+  @Override
+  public void abortTransaction() throws ProducerFencedException {
+    unsupported();
+  }
+
+  @Override
+  public Future<RecordMetadata> send(ProducerRecord<String, String> record) {
+    return send(record, null);
+  }
+
+  @Override
+  public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) {
+    HttpPost post =
+        restClient.createPostToTopic(
+            record.topic(),
+            new StringEntity(
+                getRecordAsJson(record),
+                ContentType.create(KAFKA_V2_JSON, StandardCharsets.UTF_8)));
+    return restClient.mapAsync(
+        restClient.execute(post, HttpStatus.SC_OK),
+        (res) -> Futures.immediateFuture(ZEROS_RECORD_METADATA));
+  }
+
+  @Override
+  public void flush() {
+    unsupported();
+  }
+
+  @Override
+  public List<PartitionInfo> partitionsFor(String topic) {
+    return unsupported();
+  }
+
+  @Override
+  public Map<MetricName, ? extends Metric> metrics() {
+    return unsupported();
+  }
+
+  @Override
+  public void close() {
+    try {
+      restClient.close();
+    } catch (IOException e) {
+      logger.atWarning().withCause(e).log("Unable to close httpclient");
+    }
+  }
+
+  @Override
+  public void close(long timeout, TimeUnit unit) {
+    close();
+  }
+
+  private String getRecordAsJson(ProducerRecord<String, String> record) {
+    return String.format(
+        "{\"records\":[{\"key\":\"%s\",\"value\":%s}]}", record.key(), record.value());
+  }
+
+  private <T> T unsupported() {
+    throw new IllegalArgumentException("Unsupported method");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/FutureExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/FutureExecutor.java
new file mode 100644
index 0000000..2ff0840
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/FutureExecutor.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+@BindingAnnotation
+public @interface FutureExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java
new file mode 100644
index 0000000..ec50bf6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpAsyncClientBuilderFactory.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2011 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import com.google.common.base.Strings;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.net.URISyntaxException;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/** Looks up a remote's password in secure.config. */
+public class HttpAsyncClientBuilderFactory {
+  private final KafkaProperties config;
+
+  @Inject
+  public HttpAsyncClientBuilderFactory(KafkaProperties config) {
+    this.config = config;
+  }
+
+  public HttpAsyncClientBuilder create() throws URISyntaxException {
+    String user = config.getRestApiUsername();
+    String pass = config.getRestApiPassword();
+    HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients.custom();
+    if (!Strings.isNullOrEmpty(user) && !Strings.isNullOrEmpty(pass)) {
+      CredentialsProvider credsProvider = new BasicCredentialsProvider();
+      credsProvider.setCredentials(
+          new AuthScope(config.getRestApiUri().getHost(), config.getRestApiUri().getPort()),
+          new UsernamePasswordCredentials(user, pass));
+      httpAsyncClientBuilder.setDefaultCredentialsProvider(credsProvider);
+    }
+    return httpAsyncClientBuilder;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpHostProxy.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpHostProxy.java
new file mode 100644
index 0000000..595b95a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpHostProxy.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import com.google.gerrit.common.Nullable;
+import java.net.URL;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig.Builder;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+
+public class HttpHostProxy {
+  private final URL proxyUrl;
+  private final String username;
+  private final String password;
+
+  public HttpHostProxy(URL proxyUrl, @Nullable String username, @Nullable String password) {
+    this.proxyUrl = proxyUrl;
+    this.username = username;
+    this.password = password;
+  }
+
+  public Builder apply(Builder clientBuilder) {
+    if (proxyUrl != null) {
+      clientBuilder.setProxy(
+          new HttpHost(proxyUrl.getHost(), proxyUrl.getPort(), proxyUrl.getProtocol()));
+    }
+    return clientBuilder;
+  }
+
+  public HttpAsyncClientBuilder apply(HttpAsyncClientBuilder custom) {
+    if (proxyUrl != null && username != null && password != null) {
+      CredentialsProvider credsProvider = new BasicCredentialsProvider();
+      credsProvider.setCredentials(
+          new AuthScope(proxyUrl.getHost(), proxyUrl.getPort()),
+          new UsernamePasswordCredentials(username, password));
+      custom.setDefaultCredentialsProvider(credsProvider);
+    }
+    return custom;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpHostProxyProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpHostProxyProvider.java
new file mode 100644
index 0000000..7c82ecd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/HttpHostProxyProvider.java
@@ -0,0 +1,54 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.server.config.GerritServerConfig;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.net.MalformedURLException;
+import java.net.URL;
+import org.eclipse.jgit.lib.Config;
+
+public class HttpHostProxyProvider implements Provider<HttpHostProxy> {
+  private URL proxyUrl;
+  private String proxyUser;
+  private String proxyPassword;
+
+  @Inject
+  HttpHostProxyProvider(@GerritServerConfig Config config) throws MalformedURLException {
+    String proxyUrlStr = config.getString("http", null, "proxy");
+    if (!Strings.isNullOrEmpty(proxyUrlStr)) {
+      proxyUrl = new URL(proxyUrlStr);
+      proxyUser = config.getString("http", null, "proxyUsername");
+      proxyPassword = config.getString("http", null, "proxyPassword");
+      String userInfo = proxyUrl.getUserInfo();
+      if (userInfo != null) {
+        int c = userInfo.indexOf(':');
+        if (0 < c) {
+          proxyUser = userInfo.substring(0, c);
+          proxyPassword = userInfo.substring(c + 1);
+        } else {
+          proxyUser = userInfo;
+        }
+      }
+    }
+  }
+
+  @Override
+  public HttpHostProxy get() {
+    return new HttpHostProxy(proxyUrl, proxyUser, proxyPassword);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
new file mode 100644
index 0000000..047a96e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/rest/KafkaRestClient.java
@@ -0,0 +1,301 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.rest;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Function;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.net.MediaType;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.common.Nullable;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.config.RequestConfig.Builder;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class KafkaRestClient {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final String KAFKA_V2_JSON = "application/vnd.kafka.json.v2+json";
+  private static final String KAFKA_V2 = "application/vnd.kafka.v2+json";
+
+  private final HttpHostProxy proxy;
+  private final CloseableHttpAsyncClient httpclient;
+  private final ExecutorService futureExecutor;
+  private final int kafkaRestApiTimeoutMsec;
+  private final KafkaProperties configuration;
+
+  private static boolean logConfigured;
+
+  public interface Factory {
+    KafkaRestClient create(KafkaProperties configuration);
+  }
+
+  @Inject
+  public KafkaRestClient(
+      HttpHostProxy httpHostProxy,
+      @FutureExecutor ExecutorService executor,
+      HttpAsyncClientBuilderFactory credentialsFactory,
+      @Assisted KafkaProperties configuration)
+      throws URISyntaxException {
+    proxy = httpHostProxy;
+    httpclient = proxy.apply(credentialsFactory.create()).build();
+    httpclient.start();
+    this.configuration = configuration;
+    kafkaRestApiTimeoutMsec = (int) configuration.getRestApiTimeout().toMillis();
+    if (configuration.isHttpWireLog()) {
+      enableHttpWireLog();
+    }
+    this.futureExecutor = executor;
+  }
+
+  public static void enableHttpWireLog() {
+    if (!logConfigured) {
+      Logger httpWireLoggger = Logger.getLogger("org.apache.http.wire");
+      httpWireLoggger.setLevel(Level.DEBUG);
+
+      @SuppressWarnings("rawtypes")
+      Enumeration rootLoggerAppenders = LogManager.getRootLogger().getAllAppenders();
+      while (rootLoggerAppenders.hasMoreElements()) {
+        Appender logAppender = (Appender) rootLoggerAppenders.nextElement();
+        if (logAppender instanceof AppenderSkeleton) {
+          ((AppenderSkeleton) logAppender).setThreshold(Level.DEBUG);
+        }
+        httpWireLoggger.addAppender(logAppender);
+      }
+
+      logConfigured = true;
+    }
+  }
+
+  public ListenableFuture<HttpResponse> execute(HttpRequestBase request, int... expectedStatuses) {
+    return Futures.transformAsync(
+        listenableFutureOf(httpclient.execute(request, null)),
+        (res) -> {
+          IOException exc =
+              getResponseException(
+                  String.format("HTTP %s %s FAILED", request.getMethod(), request.getURI()),
+                  res,
+                  expectedStatuses);
+          if (exc == null) {
+            return Futures.immediateFuture(res);
+          }
+          return Futures.immediateFailedFuture(exc);
+        },
+        futureExecutor);
+  }
+
+  public <I, O> ListenableFuture<O> mapAsync(
+      ListenableFuture<I> inputFuture, AsyncFunction<? super I, ? extends O> mapFunction) {
+    return Futures.transformAsync(inputFuture, mapFunction, futureExecutor);
+  }
+
+  public <I, O> ListenableFuture<O> map(
+      ListenableFuture<I> inputFuture, Function<? super I, ? extends O> mapFunction) {
+    return Futures.transform(inputFuture, mapFunction, futureExecutor);
+  }
+
+  public HttpGet createGetTopic(String topic) {
+    HttpGet get = new HttpGet(resolveKafkaRestApiUri("/topics/" + topic));
+    get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2);
+    get.setConfig(createRequestConfig());
+    return get;
+  }
+
+  public HttpGet createGetRecords(URI consumerUri) {
+    HttpGet get = new HttpGet(consumerUri.resolve(consumerUri.getPath() + "/records"));
+    get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2_JSON);
+    get.setConfig(createRequestConfig());
+    return get;
+  }
+
+  public HttpPost createPostToConsumer(String consumerGroup) {
+    HttpPost post =
+        new HttpPost(
+            resolveKafkaRestApiUri("/consumers/" + URLEncoder.encode(consumerGroup, UTF_8)));
+    post.addHeader(HttpHeaders.ACCEPT, MediaType.ANY_TYPE.toString());
+    post.setConfig(createRequestConfig());
+    post.setEntity(
+        new StringEntity(
+            "{\"format\": \"json\",\"auto.offset.reset\": \"earliest\", \"auto.commit.enable\":\"true\", \"consumer.request.timeout.ms\": \"1000\"}",
+            ContentType.create(KAFKA_V2, UTF_8)));
+    return post;
+  }
+
+  public HttpDelete createDeleteToConsumer(URI consumerUri) {
+    HttpDelete delete = new HttpDelete(consumerUri);
+    delete.addHeader(HttpHeaders.ACCEPT, "*/*");
+    delete.setConfig(createRequestConfig());
+    return delete;
+  }
+
+  public HttpDelete createDeleteToConsumerSubscriptions(URI consumerUri) {
+    URI subscriptionUri = consumerUri.resolve("subscription");
+    HttpDelete delete = new HttpDelete(subscriptionUri);
+    delete.addHeader(HttpHeaders.ACCEPT, "*/*");
+    delete.setConfig(createRequestConfig());
+    return delete;
+  }
+
+  public HttpPost createPostToSubscribe(URI consumerUri, String topic) {
+    HttpPost post = new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/subscription"));
+    post.addHeader(HttpHeaders.ACCEPT, "*/*");
+    post.setConfig(createRequestConfig());
+    post.setEntity(
+        new StringEntity(
+            String.format("{\"topics\":[\"%s\"]}", topic), ContentType.create(KAFKA_V2, UTF_8)));
+    return post;
+  }
+
+  public HttpPost createPostToTopic(String topic, HttpEntity postBodyEntity) {
+    HttpPost post =
+        new HttpPost(resolveKafkaRestApiUri("/topics/" + URLEncoder.encode(topic, UTF_8)));
+    post.addHeader(HttpHeaders.ACCEPT, "*/*");
+    post.setConfig(createRequestConfig());
+    post.setEntity(postBodyEntity);
+    return post;
+  }
+
+  public HttpPost createPostSeekTopicFromBeginning(
+      URI consumerUri, String topic, Set<Integer> partitions) {
+    HttpPost post =
+        new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/positions/beginning"));
+    post.addHeader(HttpHeaders.ACCEPT, "*/*");
+    post.setConfig(createRequestConfig());
+    post.setEntity(
+        new StringEntity(
+            String.format(
+                "{\"partitions\":[%s]}",
+                partitions.stream()
+                    .map(
+                        partition ->
+                            String.format("{\"topic\":\"%s\",\"partition\":%d}", topic, partition))
+                    .collect(Collectors.joining(","))),
+            ContentType.create(KAFKA_V2, UTF_8)));
+    return post;
+  }
+
+  @Nullable
+  public IOException getResponseException(
+      String errorMessage, HttpResponse response, int... okHttpStatuses) {
+    int responseHttpStatus = response.getStatusLine().getStatusCode();
+    if (okHttpStatuses.length == 0) {
+      okHttpStatuses =
+          new int[] {HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT};
+    }
+    for (int httpStatus : okHttpStatuses) {
+      if (responseHttpStatus == httpStatus) {
+        return null;
+      }
+    }
+
+    String responseBody = "";
+    try {
+      responseBody = getStringEntity(response);
+    } catch (IOException e) {
+      logger.atWarning().withCause(e).log(
+          "Unable to extrace the string entity for response %d (%s)",
+          response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase());
+    }
+
+    return new IOException(
+        String.format(
+            "%s\nHTTP status %d (%s)\n%s",
+            errorMessage,
+            response.getStatusLine().getStatusCode(),
+            response.getStatusLine().getReasonPhrase(),
+            responseBody));
+  }
+
+  protected String getStringEntity(HttpResponse response) throws IOException {
+    HttpEntity entity = response.getEntity();
+    if (entity != null) {
+      try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) {
+        entity.writeTo(outStream);
+        outStream.close();
+        return outStream.toString(UTF_8);
+      }
+    }
+    return "";
+  }
+
+  private <V> ListenableFuture<V> listenableFutureOf(Future<V> future) {
+    return JdkFutureAdapters.listenInPoolThread(future, futureExecutor);
+  }
+
+  private RequestConfig createRequestConfig() {
+    Builder configBuilder =
+        RequestConfig.custom()
+            .setConnectionRequestTimeout(kafkaRestApiTimeoutMsec)
+            .setConnectTimeout(kafkaRestApiTimeoutMsec)
+            .setSocketTimeout(kafkaRestApiTimeoutMsec);
+    configBuilder = proxy.apply(configBuilder);
+    RequestConfig config = configBuilder.build();
+    return config;
+  }
+
+  public void close() throws IOException {
+    httpclient.close();
+  }
+
+  public URI resolveKafkaRestApiUri(String path) {
+    try {
+      URI restApiUri = configuration.getRestApiUri();
+      return restApiUri.resolve(path);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid Kafka REST API URI", e);
+    }
+  }
+
+  public URI resolveKafkaRestApiUri(String kafkaRestId, String path) {
+    URI restApiUri;
+    try {
+      restApiUri = configuration.getRestApiUri(kafkaRestId);
+      return restApiUri.resolve(restApiUri.getPath() + path);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid Kafka REST API URI", e);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
index b1f11f7..4fb98b2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaProducerProvider.java
@@ -18,8 +18,9 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 
-public class KafkaProducerProvider implements Provider<KafkaProducer<String, String>> {
+public class KafkaProducerProvider implements Provider<Producer<String, String>> {
   private final KafkaProperties properties;
 
   @Inject
@@ -28,7 +29,7 @@
   }
 
   @Override
-  public KafkaProducer<String, String> get() {
+  public Producer<String, String> get() {
     return new KafkaProducer<>(properties);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 0dc29e1..a0df313 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -23,9 +23,10 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Objects;
 import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -36,13 +37,13 @@
 
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
   private final KafkaProperties properties;
-  private final Provider<KafkaProducer<String, String>> producerProvider;
+  private final Provider<Producer<String, String>> producerProvider;
   private final KafkaEventsPublisherMetrics publisherMetrics;
   private volatile Producer<String, String> producer;
 
   @Inject
   public KafkaSession(
-      Provider<KafkaProducer<String, String>> producerProvider,
+      Provider<Producer<String, String>> producerProvider,
       KafkaProperties properties,
       KafkaEventsPublisherMetrics publisherMetrics) {
     this.producerProvider = producerProvider;
@@ -63,12 +64,43 @@
       return;
     }
 
-    LOGGER.info("Connect to {}...", properties.getProperty("bootstrap.servers"));
-    /* Need to make sure that the thread of the running connection uses
-     * the correct class loader otherwize you can endup with hard to debug
-     * ClassNotFoundExceptions
-     */
-    setConnectionClassLoader();
+    switch (properties.getClientType()) {
+      case NATIVE:
+        String bootstrapServers = properties.getProperty("bootstrap.servers");
+        if (bootstrapServers == null) {
+          LOGGER.warn("No Kafka bootstrap.servers property defined: session not started.");
+          return;
+        }
+
+        LOGGER.info("Connect to {}...", bootstrapServers);
+        /* Need to make sure that the thread of the running connection uses
+         * the correct class loader otherwise you can end up with hard to debug
+         * ClassNotFoundExceptions
+         */
+        setConnectionClassLoader();
+        break;
+
+      case REST:
+        URI kafkaProxyUri;
+        try {
+          kafkaProxyUri = properties.getRestApiUri();
+        } catch (URISyntaxException e) {
+          LOGGER.error("Invalid Kafka Proxy URI: session not started", e);
+          return;
+        }
+        if (kafkaProxyUri == null) {
+          LOGGER.warn("No Kafka Proxy URL property defined: session not started.");
+          return;
+        }
+
+        LOGGER.info("Connect to {}...", kafkaProxyUri);
+        break;
+
+      default:
+        LOGGER.error("Unsupported Kafka Client Type %s", properties.getClientType());
+        return;
+    }
+
     producer = producerProvider.get();
     LOGGER.info("Connection established.");
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
index 9ec109d..43bdc73 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaConsumerFactory.java
@@ -32,6 +32,11 @@
   }
 
   public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer) {
+    return create(config, keyDeserializer);
+  }
+
+  public Consumer<byte[], byte[]> create(
+      KafkaSubscriberProperties config, Deserializer<byte[]> keyDeserializer) {
     return new KafkaConsumer<>(config, keyDeserializer, new ByteArrayDeserializer());
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
new file mode 100644
index 0000000..f8809a6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventNativeSubscriber.java
@@ -0,0 +1,218 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventNativeSubscriber implements KafkaEventSubscriber {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+
+  private final OneOffRequestContext oneOffCtx;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private final Deserializer<Event> valueDeserializer;
+  private final KafkaSubscriberProperties configuration;
+  private final ExecutorService executor;
+  private final KafkaEventSubscriberMetrics subscriberMetrics;
+  private final KafkaConsumerFactory consumerFactory;
+  private final Deserializer<byte[]> keyDeserializer;
+
+  private java.util.function.Consumer<Event> messageProcessor;
+  private String topic;
+  private AtomicBoolean resetOffset = new AtomicBoolean(false);
+
+  private volatile ReceiverJob receiver;
+  private final Optional<String> externalGroupId;
+
+  @Inject
+  public KafkaEventNativeSubscriber(
+      KafkaSubscriberProperties configuration,
+      KafkaConsumerFactory consumerFactory,
+      Deserializer<byte[]> keyDeserializer,
+      Deserializer<Event> valueDeserializer,
+      OneOffRequestContext oneOffCtx,
+      @ConsumerExecutor ExecutorService executor,
+      KafkaEventSubscriberMetrics subscriberMetrics,
+      @Assisted Optional<String> externalGroupId) {
+
+    this.oneOffCtx = oneOffCtx;
+    this.executor = executor;
+    this.subscriberMetrics = subscriberMetrics;
+    this.consumerFactory = consumerFactory;
+    this.keyDeserializer = keyDeserializer;
+    this.valueDeserializer = valueDeserializer;
+    this.externalGroupId = externalGroupId;
+    this.configuration = (KafkaSubscriberProperties) configuration.clone();
+    externalGroupId.ifPresent(gid -> this.configuration.setProperty("group.id", gid));
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#subscribe(java.lang.String, java.util.function.Consumer)
+   */
+  @Override
+  public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) {
+    this.topic = topic;
+    this.messageProcessor = messageProcessor;
+    logger.atInfo().log(
+        "Kafka consumer subscribing to topic alias [%s] for event topic [%s] with groupId [%s]",
+        topic, topic, configuration.getGroupId());
+    runReceiver(consumerFactory.create(configuration, keyDeserializer));
+  }
+
+  private void runReceiver(Consumer<byte[], byte[]> consumer) {
+    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread()
+          .setContextClassLoader(KafkaEventNativeSubscriber.class.getClassLoader());
+      consumer.subscribe(Collections.singleton(topic));
+      receiver = new ReceiverJob(consumer);
+      executor.execute(receiver);
+    } finally {
+      Thread.currentThread().setContextClassLoader(previousClassLoader);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#shutdown()
+   */
+  @Override
+  public void shutdown() {
+    closed.set(true);
+    receiver.wakeup();
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getMessageProcessor()
+   */
+  @Override
+  public java.util.function.Consumer<Event> getMessageProcessor() {
+    return messageProcessor;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getTopic()
+   */
+  @Override
+  public String getTopic() {
+    return topic;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#resetOffset()
+   */
+  @Override
+  public void resetOffset() {
+    resetOffset.set(true);
+  }
+
+  @Override
+  public Optional<String> getExternalGroupId() {
+    return externalGroupId;
+  }
+
+  private class ReceiverJob implements Runnable {
+    private final Consumer<byte[], byte[]> consumer;
+
+    public ReceiverJob(Consumer<byte[], byte[]> consumer) {
+      this.consumer = consumer;
+    }
+
+    public void wakeup() {
+      consumer.wakeup();
+    }
+
+    @Override
+    public void run() {
+      try {
+        consume();
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+      }
+    }
+
+    private void consume() throws InterruptedException {
+      try {
+        while (!closed.get()) {
+          if (resetOffset.getAndSet(false)) {
+            // Make sure there is an assignment for this consumer
+            while (consumer.assignment().isEmpty() && !closed.get()) {
+              logger.atInfo().log(
+                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+            }
+            consumer.seekToBeginning(consumer.assignment());
+          }
+          ConsumerRecords<byte[], byte[]> consumerRecords =
+              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
+          consumerRecords.forEach(
+              consumerRecord -> {
+                try (ManualRequestContext ctx = oneOffCtx.open()) {
+                  Event event =
+                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                  messageProcessor.accept(event);
+                } catch (Exception e) {
+                  logger.atSevere().withCause(e).log(
+                      "Malformed event '%s': [Exception: %s]",
+                      new String(consumerRecord.value(), UTF_8), e.toString());
+                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+                }
+              });
+        }
+      } catch (WakeupException e) {
+        // Ignore exception if closing
+        if (!closed.get()) {
+          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
+          reconnectAfterFailure();
+        }
+      } catch (Exception e) {
+        subscriberMetrics.incrementSubscriberFailedToPollMessages();
+        logger.atSevere().withCause(e).log(
+            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+        reconnectAfterFailure();
+      } finally {
+        consumer.close();
+      }
+    }
+
+    private void reconnectAfterFailure() throws InterruptedException {
+      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+      // for avoiding hammering exactly at the same interval in case of failure
+      long reconnectDelay =
+          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+      Thread.sleep(reconnectDelay);
+      runReceiver(consumerFactory.create(configuration, keyDeserializer));
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
new file mode 100644
index 0000000..476be1e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventRestSubscriber.java
@@ -0,0 +1,375 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.subscribe;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+
+public class KafkaEventRestSubscriber implements KafkaEventSubscriber {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+  // Prefix is a length of 'rest-consumer-' string
+  private static final int INSTANCE_ID_PREFIX_LEN = 14;
+  /**
+   * Suffix is a length of a unique identifier for example: '-9836fe85-d838-4722-97c9-4a7b-34e834d'
+   */
+  private static final int INSTANCE_ID_SUFFIX_LEN = 37;
+
+  private final OneOffRequestContext oneOffCtx;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private final Deserializer<Event> valueDeserializer;
+  private final KafkaSubscriberProperties configuration;
+  private final ExecutorService executor;
+  private final KafkaEventSubscriberMetrics subscriberMetrics;
+  private final Gson gson;
+
+  private java.util.function.Consumer<Event> messageProcessor;
+  private String topic;
+  private final KafkaRestClient restClient;
+  private final AtomicBoolean resetOffset;
+  private final long restClientTimeoutMs;
+  private volatile ReceiverJob receiver;
+  private final Optional<String> externalGroupId;
+
+  @Inject
+  public KafkaEventRestSubscriber(
+      KafkaSubscriberProperties configuration,
+      Deserializer<Event> valueDeserializer,
+      OneOffRequestContext oneOffCtx,
+      @ConsumerExecutor ExecutorService executor,
+      KafkaEventSubscriberMetrics subscriberMetrics,
+      KafkaRestClient.Factory restClientFactory,
+      @Assisted Optional<String> externalGroupId) {
+
+    this.oneOffCtx = oneOffCtx;
+    this.executor = executor;
+    this.subscriberMetrics = subscriberMetrics;
+    this.valueDeserializer = valueDeserializer;
+    this.externalGroupId = externalGroupId;
+    this.configuration = (KafkaSubscriberProperties) configuration.clone();
+    externalGroupId.ifPresent(gid -> this.configuration.setProperty("group.id", gid));
+
+    gson = new Gson();
+    restClient = restClientFactory.create(configuration);
+    resetOffset = new AtomicBoolean(false);
+    restClientTimeoutMs = configuration.getRestApiTimeout().toMillis();
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#subscribe(java.lang.String, java.util.function.Consumer)
+   */
+  @Override
+  public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) {
+    this.topic = topic;
+    this.messageProcessor = messageProcessor;
+    logger.atInfo().log(
+        "Kafka consumer subscribing to topic alias [%s] for event topic [%s] with groupId [%s]",
+        topic, topic, configuration.getGroupId());
+    try {
+      runReceiver();
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void runReceiver() throws InterruptedException, ExecutionException, TimeoutException {
+    receiver = new ReceiverJob(configuration.getGroupId());
+    executor.execute(receiver);
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#shutdown()
+   */
+  @Override
+  public void shutdown() {
+    try {
+      closed.set(true);
+      receiver.close();
+    } catch (InterruptedException | ExecutionException | IOException | TimeoutException e) {
+      logger.atWarning().withCause(e).log("Unable to close receiver for topic=%s", topic);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getMessageProcessor()
+   */
+  @Override
+  public java.util.function.Consumer<Event> getMessageProcessor() {
+    return messageProcessor;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#getTopic()
+   */
+  @Override
+  public String getTopic() {
+    return topic;
+  }
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.kafka.subscribe.KafkaEventSubscriber#resetOffset()
+   */
+  @Override
+  public void resetOffset() {
+    resetOffset.set(true);
+  }
+
+  @Override
+  public Optional<String> getExternalGroupId() {
+    return externalGroupId;
+  }
+
+  private class ReceiverJob implements Runnable {
+    private final ListenableFuture<URI> kafkaRestConsumerUri;
+    private final ListenableFuture<?> kafkaSubscriber;
+
+    public ReceiverJob(String consumerGroup)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      kafkaRestConsumerUri = createConsumer(consumerGroup);
+      kafkaSubscriber = restClient.mapAsync(kafkaRestConsumerUri, this::subscribeToTopic);
+      kafkaSubscriber.get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
+    }
+
+    public void close()
+        throws InterruptedException, ExecutionException, IOException, TimeoutException {
+      restClient
+          .mapAsync(kafkaRestConsumerUri, this::deleteConsumer)
+          .get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
+      restClient.close();
+    }
+
+    @Override
+    public void run() {
+      try {
+        consume();
+      } catch (Exception e) {
+        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
+      }
+    }
+
+    private void consume() throws InterruptedException, ExecutionException, TimeoutException {
+      try {
+        while (!closed.get()) {
+          if (resetOffset.getAndSet(false)) {
+            restClient
+                .mapAsync(getTopicPartitions(), this::seekToBeginning)
+                .get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
+          }
+
+          ConsumerRecords<byte[], byte[]> records =
+              restClient
+                  .mapAsync(kafkaRestConsumerUri, this::getRecords)
+                  .get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
+          records.forEach(
+              consumerRecord -> {
+                try (ManualRequestContext ctx = oneOffCtx.open()) {
+                  Event event =
+                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
+                  messageProcessor.accept(event);
+                } catch (Exception e) {
+                  logger.atSevere().withCause(e).log(
+                      "Malformed event '%s'", new String(consumerRecord.value(), UTF_8));
+                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+                }
+              });
+        }
+      } catch (Exception e) {
+        subscriberMetrics.incrementSubscriberFailedToPollMessages();
+        logger.atSevere().withCause(e).log(
+            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
+        reconnectAfterFailure();
+      } finally {
+        restClient
+            .mapAsync(kafkaRestConsumerUri, this::deleteSubscription)
+            .get(restClientTimeoutMs, TimeUnit.MILLISECONDS);
+      }
+    }
+
+    private ListenableFuture<HttpResponse> seekToBeginning(Set<Integer> partitions) {
+      ListenableFuture<HttpPost> post =
+          restClient.map(
+              kafkaRestConsumerUri,
+              uri -> restClient.createPostSeekTopicFromBeginning(uri, topic, partitions));
+      return restClient.mapAsync(post, p -> restClient.execute(p, HttpStatus.SC_NO_CONTENT));
+    }
+
+    private ListenableFuture<Set<Integer>> getTopicPartitions() {
+      HttpGet getTopic = restClient.createGetTopic(topic);
+      return restClient.mapAsync(
+          restClient.execute(getTopic, HttpStatus.SC_OK), this::getPartitions);
+    }
+
+    private ListenableFuture<ConsumerRecords<byte[], byte[]>> getRecords(URI consumerUri) {
+      HttpGet getRecords = restClient.createGetRecords(consumerUri);
+      return restClient.mapAsync(
+          restClient.execute(getRecords, HttpStatus.SC_OK), this::convertRecords);
+    }
+
+    private ListenableFuture<HttpResponse> subscribeToTopic(URI consumerUri) {
+      HttpPost post = restClient.createPostToSubscribe(consumerUri, topic);
+      return restClient.execute(post);
+    }
+
+    private ListenableFuture<?> deleteConsumer(URI consumerUri) {
+      HttpDelete delete = restClient.createDeleteToConsumer(consumerUri);
+      return restClient.execute(delete);
+    }
+
+    private ListenableFuture<?> deleteSubscription(URI consumerUri) {
+      HttpDelete delete = restClient.createDeleteToConsumerSubscriptions(consumerUri);
+      return restClient.execute(delete);
+    }
+
+    private ListenableFuture<URI> createConsumer(String consumerGroup) {
+      HttpPost post = restClient.createPostToConsumer(consumerGroup + "-" + topic);
+      return restClient.mapAsync(restClient.execute(post, HttpStatus.SC_OK), this::getConsumerUri);
+    }
+
+    private ListenableFuture<Set<Integer>> getPartitions(HttpResponse response) {
+      try (Reader bodyReader =
+          new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)) {
+        JsonObject responseJson = gson.fromJson(bodyReader, JsonObject.class);
+        Set<Integer> partitions = extractPartitions(responseJson);
+        return Futures.immediateFuture(partitions);
+      } catch (IOException e) {
+        return Futures.immediateFailedFuture(e);
+      }
+    }
+
+    private ListenableFuture<URI> getConsumerUri(HttpResponse response) {
+      try (Reader bodyReader =
+          new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)) {
+        JsonObject responseJson = gson.fromJson(bodyReader, JsonObject.class);
+        URI consumerUri = new URI(responseJson.get("base_uri").getAsString());
+        String instanceId = responseJson.get("instance_id").getAsString();
+
+        String restProxyId = getRestProxyId(instanceId);
+        return Futures.immediateFuture(
+            restClient.resolveKafkaRestApiUri(restProxyId, consumerUri.getPath()));
+      } catch (UnsupportedOperationException | IOException | URISyntaxException e) {
+        return Futures.immediateFailedFuture(e);
+      }
+    }
+
+    private String getRestProxyId(String instanceId) {
+      int instanceIdLen = instanceId.length();
+      if (instanceIdLen <= INSTANCE_ID_SUFFIX_LEN + INSTANCE_ID_PREFIX_LEN) {
+        // Kafka Rest Proxy instance id is not mandatory
+        return "";
+      }
+
+      return instanceId.substring(
+          INSTANCE_ID_PREFIX_LEN, instanceId.length() - INSTANCE_ID_SUFFIX_LEN);
+    }
+
+    private ListenableFuture<ConsumerRecords<byte[], byte[]>> convertRecords(
+        HttpResponse response) {
+      try (Reader bodyReader = new InputStreamReader(response.getEntity().getContent())) {
+        JsonArray jsonRecords = gson.fromJson(bodyReader, JsonArray.class);
+        if (jsonRecords.size() == 0) {
+          return Futures.immediateFuture(new ConsumerRecords<>(Collections.emptyMap()));
+        }
+
+        Stream<ConsumerRecord<byte[], byte[]>> jsonObjects =
+            StreamSupport.stream(jsonRecords.spliterator(), false)
+                .map(JsonElement::getAsJsonObject)
+                .map(this::jsonToConsumerRecords);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records =
+            jsonObjects.collect(Collectors.groupingBy(this::jsonRecordPartition));
+        return Futures.immediateFuture(new ConsumerRecords<>(records));
+      } catch (IOException e) {
+        subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
+        return Futures.immediateFailedFuture(e);
+      }
+    }
+
+    private ConsumerRecord<byte[], byte[]> jsonToConsumerRecords(JsonObject jsonRecord) {
+      return new ConsumerRecord<>(
+          jsonRecord.get("topic").getAsString(),
+          jsonRecord.get("partition").getAsInt(),
+          jsonRecord.get("offset").getAsLong(),
+          jsonRecord.get("key").toString().getBytes(),
+          jsonRecord.get("value").toString().getBytes());
+    }
+
+    private Set<Integer> extractPartitions(JsonObject jsonRecord) {
+      return StreamSupport.stream(
+              jsonRecord.get("partitions").getAsJsonArray().spliterator(), false)
+          .map(jsonElem -> jsonElem.getAsJsonObject().get("partition"))
+          .map(JsonElement::getAsInt)
+          .collect(Collectors.toSet());
+    }
+
+    private TopicPartition jsonRecordPartition(ConsumerRecord<byte[], byte[]> consumerRecord) {
+      return new TopicPartition(topic, consumerRecord.partition());
+    }
+
+    private void reconnectAfterFailure()
+        throws InterruptedException, ExecutionException, TimeoutException {
+      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
+      // for avoiding hammering exactly at the same interval in case of failure
+      long reconnectDelay =
+          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
+              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
+      Thread.sleep(reconnectDelay);
+      runReceiver();
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
index 1f0b0aa..1530be7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2019 The Android Open Source Project
+// Copyright (C) 2021 The Android Open Source Project
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -11,175 +11,52 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
+
 package com.googlesource.gerrit.plugins.kafka.subscribe;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.kafka.broker.ConsumerExecutor;
-import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
+import java.util.Optional;
 
-public class KafkaEventSubscriber {
-  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-  private static final int DELAY_RECONNECT_AFTER_FAILURE_MSEC = 1000;
+/** Generic interface to a Kafka topic subscriber. */
+public interface KafkaEventSubscriber {
 
-  private final OneOffRequestContext oneOffCtx;
-  private final AtomicBoolean closed = new AtomicBoolean(false);
-
-  private final Deserializer<Event> valueDeserializer;
-  private final KafkaSubscriberProperties configuration;
-  private final ExecutorService executor;
-  private final KafkaEventSubscriberMetrics subscriberMetrics;
-  private final KafkaConsumerFactory consumerFactory;
-  private final Deserializer<byte[]> keyDeserializer;
-
-  private java.util.function.Consumer<Event> messageProcessor;
-  private String topic;
-  private AtomicBoolean resetOffset = new AtomicBoolean(false);
-
-  private volatile ReceiverJob receiver;
-
-  @Inject
-  public KafkaEventSubscriber(
-      KafkaSubscriberProperties configuration,
-      KafkaConsumerFactory consumerFactory,
-      Deserializer<byte[]> keyDeserializer,
-      Deserializer<Event> valueDeserializer,
-      OneOffRequestContext oneOffCtx,
-      @ConsumerExecutor ExecutorService executor,
-      KafkaEventSubscriberMetrics subscriberMetrics) {
-
-    this.configuration = configuration;
-    this.oneOffCtx = oneOffCtx;
-    this.executor = executor;
-    this.subscriberMetrics = subscriberMetrics;
-    this.consumerFactory = consumerFactory;
-    this.keyDeserializer = keyDeserializer;
-    this.valueDeserializer = valueDeserializer;
+  public interface Factory {
+    KafkaEventSubscriber create(Optional<String> externalGroupId);
   }
 
-  public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) {
-    this.topic = topic;
-    this.messageProcessor = messageProcessor;
-    logger.atInfo().log(
-        "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, topic);
-    runReceiver();
-  }
+  /**
+   * Subscribe to a topic and receive messages asynchronously.
+   *
+   * @param topic Kafka topic name
+   * @param messageProcessor consumer function for processing incoming messages
+   */
+  void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor);
 
-  private void runReceiver() {
-    final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
-      Consumer<byte[], byte[]> consumer = consumerFactory.create(keyDeserializer);
-      consumer.subscribe(Collections.singleton(topic));
-      receiver = new ReceiverJob(consumer);
-      executor.execute(receiver);
-    } finally {
-      Thread.currentThread().setContextClassLoader(previousClassLoader);
-    }
-  }
+  /** Shutdown Kafka consumer. */
+  void shutdown();
 
-  public void shutdown() {
-    closed.set(true);
-    receiver.wakeup();
-  }
+  /**
+   * Returns the current consumer function for the subscribed topic.
+   *
+   * @return the default topic consumer function.
+   */
+  java.util.function.Consumer<Event> getMessageProcessor();
 
-  public java.util.function.Consumer<Event> getMessageProcessor() {
-    return messageProcessor;
-  }
+  /**
+   * Returns the current subscribed topic name.
+   *
+   * @return Kafka topic name.
+   */
+  String getTopic();
 
-  public String getTopic() {
-    return topic;
-  }
+  /** Reset the offset for reading incoming Kafka messages of the topic. */
+  void resetOffset();
 
-  public void resetOffset() {
-    resetOffset.set(true);
-  }
-
-  private class ReceiverJob implements Runnable {
-    private final Consumer<byte[], byte[]> consumer;
-
-    public ReceiverJob(Consumer<byte[], byte[]> consumer) {
-      this.consumer = consumer;
-    }
-
-    public void wakeup() {
-      consumer.wakeup();
-    }
-
-    @Override
-    public void run() {
-      try {
-        consume();
-      } catch (Exception e) {
-        logger.atSevere().withCause(e).log("Consumer loop of topic %s ended", topic);
-      }
-    }
-
-    private void consume() throws InterruptedException {
-      try {
-        while (!closed.get()) {
-          if (resetOffset.getAndSet(false)) {
-            // Make sure there is an assignment for this consumer
-            while (consumer.assignment().isEmpty() && !closed.get()) {
-              logger.atInfo().log(
-                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
-              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
-            }
-            consumer.seekToBeginning(consumer.assignment());
-          }
-          ConsumerRecords<byte[], byte[]> consumerRecords =
-              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
-          consumerRecords.forEach(
-              consumerRecord -> {
-                try (ManualRequestContext ctx = oneOffCtx.open()) {
-                  Event event =
-                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
-                  messageProcessor.accept(event);
-                } catch (Exception e) {
-                  logger.atSevere().withCause(e).log(
-                      "Malformed event '%s'", new String(consumerRecord.value(), UTF_8));
-                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
-                }
-              });
-        }
-      } catch (WakeupException e) {
-        // Ignore exception if closing
-        if (!closed.get()) {
-          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
-          reconnectAfterFailure();
-        }
-      } catch (Exception e) {
-        subscriberMetrics.incrementSubscriberFailedToPollMessages();
-        logger.atSevere().withCause(e).log(
-            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
-        reconnectAfterFailure();
-      } finally {
-        consumer.close();
-      }
-    }
-
-    private void reconnectAfterFailure() throws InterruptedException {
-      // Random delay with average of DELAY_RECONNECT_AFTER_FAILURE_MSEC
-      // for avoiding hammering exactly at the same interval in case of failure
-      long reconnectDelay =
-          DELAY_RECONNECT_AFTER_FAILURE_MSEC / 2
-              + new Random().nextInt(DELAY_RECONNECT_AFTER_FAILURE_MSEC);
-      Thread.sleep(reconnectDelay);
-      runReceiver();
-    }
-  }
+  /**
+   * Returns the external consumer's group id when it is defined.
+   *
+   * @return Optional instance with external consumer's group id otherwise an empty Optional
+   *     instance
+   */
+  Optional<String> getExternalGroupId();
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index a10004f..621d769 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -36,10 +36,24 @@
 Additional properties
 ---------------------
 
+`plugin.@PLUGIN@.clientType`
+:	Client stack for connecting to Kafka broker:
+    - `NATIVE` for using the Kafka client to connect to the broker directory
+    - `REST` for using a simple HTTP client to connect to
+      [Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html).
+      **NOTE**: `plugin.@PLUGIN@.restApiUri` is mandatory when using a `REST` client type.
+	Default: `NATIVE`
+
 `plugin.@PLUGIN@.groupId`
 :	Kafka consumer group for receiving messages.
 	Default: Gerrit instance-id
 
+`plugin.@PLUGIN@.httpWireLog`
+:	Enable the HTTP wire protocol logging in error_log for all the communication with
+	the [Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html).
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: false
+
 `plugin.@PLUGIN@.pollingIntervalMs`
 :	Polling interval in msec for receiving messages from Kafka topic subscription.
 	Default: 1000
@@ -52,6 +66,37 @@
     `streamEventTopic`, `gerritTopic`, `projectListEventTopic`,
     `cacheEventTopic`, `indexEventTopic`
 
+`plugin.@PLUGIN@.restApiUri`
+:	URL of the
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	for sending/receiving messages through REST-API instead of using the native Kafka client.
+	The value can use the `${kafka_rest_id}` placeholder which will be replaced at runtime using
+	the `KAFKA_REST_ID` associated with the Kafka REST-API Proxy that is answering the call,
+	typically needed when having multiple proxies behind a workload balancer and sticky session
+	allocation isn't an option.
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: unset
+
+`plugin.@PLUGIN@.restApiThreads`
+:	Maximum number of concurrent client calls to the
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	for sending/receiving messages.
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: 10
+
+`plugin.@PLUGIN@.restApiTimeout`
+:	Maximum time to wait for a client call to
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	to complete. This setting is also applied as TCP socket connection and read/write timeout
+	for the outgoing HTTP calls.
+	The value is expressed using the `N unit` format of all other Gerrit time expressions, using
+	one of the following units:
+	- s, sec, second, seconds
+	- m, min, minute, minutes
+	- h, hr, hour, hours
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: 60 sec
+
 `plugin.@PLUGIN@.sendAsync`
 :	Send messages to Kafka asynchronously, detaching the calling process from the
 	acknowledge of the message being sent.
@@ -82,3 +127,19 @@
 Number of subscribers          [6]: 6
 Consumer group                 [my_group_id]: my_group_id
 ```
+
+secure.config
+--------------------
+
+`plugin.@PLUGIN@.restApiUsername`
+:	Username used for the authentication to the
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	for sending/receiving messages through REST-API instead of using the native Kafka client.
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: unset
+
+`plugin.@PLUGIN@.restApiPassword`
+:	Password used for the authentication to the
+	[Confluent REST-API Proxy](https://docs.confluent.io/platform/current/kafka-rest/index.html)
+	**NOTE**: when `plugin.@PLUGIN@.restApiUri` is unset or set to `NATIVE`, this setting is ignored.
+	Default: unset
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
index 6e654bf..028d589 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,6 +19,7 @@
 import static org.junit.Assert.fail;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
@@ -51,14 +52,15 @@
 @NoHttpd
 @TestPlugin(name = "events-kafka", sysModule = "com.googlesource.gerrit.plugins.kafka.Module")
 public class EventConsumerIT extends LightweightPluginDaemonTest {
-  static final long KAFKA_POLL_TIMEOUT = 10000L;
-
+  static final Duration KAFKA_POLL_TIMEOUT = Duration.ofSeconds(10);
+  private static final Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofSeconds(30);
   private KafkaContainer kafka;
+  private final Gson gson = new EventGsonProvider().get();
 
   @Override
   public void setUpTestPlugin() throws Exception {
     try {
-      kafka = new KafkaContainer();
+      kafka = KafkaContainerProvider.get();
       kafka.start();
 
       System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
@@ -101,7 +103,6 @@
     assertThat(events).hasSize(6);
     String commentAddedEventJson = Iterables.getLast(events);
 
-    Gson gson = new EventGsonProvider().get();
     Event event = gson.fromJson(commentAddedEventJson, Event.class);
     assertThat(event).isInstanceOf(CommentAddedEvent.class);
 
@@ -135,13 +136,36 @@
       name = "plugin.events-kafka.valueDeserializer",
       value = "org.apache.kafka.common.serialization.StringDeserializer")
   @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
+  public void consumeEventsWithExternalGroupId() throws Exception {
+    String topic = "a_topic";
+    String consumerGroup1 = "consumer-group-1";
+    Event eventMessage = new ProjectCreatedEvent();
+    eventMessage.instanceId = "test-instance-id-1";
+    List<Event> receivedEventsWithGroupId1 = new ArrayList<>();
+
+    ExtendedBrokerApi kafkaBrokerApi = ((ExtendedBrokerApi) kafkaBrokerApi());
+    kafkaBrokerApi.send(topic, eventMessage);
+    kafkaBrokerApi.receiveAsync(topic, consumerGroup1, receivedEventsWithGroupId1::add);
+
+    waitUntil(() -> receivedEventsWithGroupId1.size() == 1, WAIT_FOR_POLL_TIMEOUT);
+    assertThat(gson.toJson(receivedEventsWithGroupId1.get(0))).isEqualTo(gson.toJson(eventMessage));
+  }
+
+  @Test
+  @UseLocalDisk
+  @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group")
+  @GerritConfig(
+      name = "plugin.events-kafka.keyDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(
+      name = "plugin.events-kafka.valueDeserializer",
+      value = "org.apache.kafka.common.serialization.StringDeserializer")
+  @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500")
   public void shouldReplayAllEvents() throws InterruptedException {
     String topic = "a_topic";
     Event eventMessage = new ProjectCreatedEvent();
     eventMessage.instanceId = "test-instance-id";
 
-    Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000);
-
     List<Event> receivedEvents = new ArrayList<>();
 
     BrokerApi kafkaBrokerApi = kafkaBrokerApi();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaContainerProvider.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaContainerProvider.java
new file mode 100644
index 0000000..859d7ee
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaContainerProvider.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka;
+
+import java.util.Map;
+import org.junit.Ignore;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+@Ignore
+public class KafkaContainerProvider {
+  public static int KAFKA_PORT_INTERNAL = KafkaContainer.KAFKA_PORT + 1;
+  private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka";
+  private static final String KAFKA_IMAGE_TAG = "5.4.3";
+
+  public static KafkaContainer get() {
+    KafkaContainer kafkaContainer =
+        new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME).withTag(KAFKA_IMAGE_TAG)) {
+
+          @Override
+          public String getBootstrapServers() {
+            return String.format(
+                    "INTERNAL://%s:%s,", getNetworkAliases().get(0), KAFKA_PORT_INTERNAL)
+                + super.getBootstrapServers();
+          }
+        };
+
+    Map<String, String> kafkaEnv = kafkaContainer.getEnvMap();
+    String kafkaListeners = kafkaEnv.get("KAFKA_LISTENERS");
+    String kafkaProtocolMap = kafkaEnv.get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP");
+
+    return kafkaContainer
+        .withNetwork(Network.newNetwork())
+        .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", kafkaProtocolMap + ",INTERNAL:PLAINTEXT")
+        .withEnv("KAFKA_LISTENERS", kafkaListeners + ",INTERNAL://0.0.0.0:" + KAFKA_PORT_INTERNAL);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
new file mode 100644
index 0000000..5cbf585
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/KafkaRestContainer.java
@@ -0,0 +1,92 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka;
+
+import com.github.dockerjava.api.model.ContainerNetwork;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import org.junit.Ignore;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Ignore
+public class KafkaRestContainer extends GenericContainer<KafkaRestContainer> {
+
+  public static final String KAFKA_REST_PROXY_HOSTNAME = "restproxy";
+  public static final int KAFKA_REST_PORT = 8082;
+  private final String kafkaRestHostname;
+
+  public KafkaRestContainer(KafkaContainer kafkaContainer, Boolean enableAuthentication) {
+    this(kafkaContainer, null, enableAuthentication);
+  }
+
+  public KafkaRestContainer(
+      KafkaContainer kafkaContainer, String kafkaRestId, Boolean enableAuthentication) {
+    super(restProxyImageFor(kafkaContainer));
+
+    kafkaRestHostname = KAFKA_REST_PROXY_HOSTNAME + Strings.nullToEmpty(kafkaRestId);
+
+    withNetwork(kafkaContainer.getNetwork());
+
+    withExposedPorts(KAFKA_REST_PORT);
+    String bootstrapServers =
+        String.format(
+            "PLAINTEXT://%s:%s",
+            kafkaContainer.getNetworkAliases().get(0), KafkaContainerProvider.KAFKA_PORT_INTERNAL);
+    withEnv("KAFKA_REST_BOOTSTRAP_SERVERS", bootstrapServers);
+    withEnv("KAFKA_REST_LISTENERS", "http://0.0.0.0:" + KAFKA_REST_PORT);
+    withEnv("KAFKA_REST_CLIENT_SECURITY_PROTOCOL", "PLAINTEXT");
+    withEnv("KAFKA_REST_HOST_NAME", kafkaRestHostname);
+    if (kafkaRestId != null) {
+      withEnv("KAFKA_REST_ID", kafkaRestId);
+    }
+    if (enableAuthentication) {
+      withEnv("KAFKA_REST_AUTHENTICATION_METHOD", "BASIC");
+      withEnv("KAFKA_REST_AUTHENTICATION_REALM", "KafkaRest");
+      withEnv("KAFKA_REST_AUTHENTICATION_ROLES", "GerritRole");
+      withEnv(
+          "KAFKAREST_OPTS",
+          "-Djava.security.auth.login.config=/etc/kafka-rest/rest-jaas.properties");
+      withClasspathResourceMapping(
+          "rest-jaas.properties", "/etc/kafka-rest/rest-jaas.properties", BindMode.READ_ONLY);
+      withClasspathResourceMapping(
+          "password.properties", "/etc/kafka-rest/password.properties", BindMode.READ_ONLY);
+    }
+    withCreateContainerCmdModifier(cmd -> cmd.withHostName(kafkaRestHostname));
+  }
+
+  private static DockerImageName restProxyImageFor(KafkaContainer kafkaContainer) {
+    String[] kafkaImageNameParts = kafkaContainer.getDockerImageName().split(":");
+    return DockerImageName.parse(kafkaImageNameParts[0] + "-rest").withTag(kafkaImageNameParts[1]);
+  }
+
+  public URI getApiURI() {
+    try {
+      return new URI(
+          String.format("http://%s:%d", getContainerIpAddress(), getMappedPort(KAFKA_REST_PORT)));
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid Kafka API URI", e);
+    }
+  }
+
+  public String getKafkaRestContainerIP() {
+    Map<String, ContainerNetwork> networks = getContainerInfo().getNetworkSettings().getNetworks();
+    return networks.values().iterator().next().getIpAddress();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index d8df198..3f83e92 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -14,9 +14,12 @@
 
 package com.googlesource.gerrit.plugins.kafka.api;
 
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
+import static com.gerritforge.gerrit.eventbroker.TopicSubscriberWithGroupId.topicSubscriberWithGroupId;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.mock;
 
+import com.google.common.base.Strings;
 import com.google.gerrit.metrics.MetricMaker;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
@@ -33,7 +36,10 @@
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.kafka.KafkaContainerProvider;
+import com.googlesource.gerrit.plugins.kafka.KafkaRestContainer;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
@@ -42,32 +48,47 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaBrokerApiTest {
-  private static KafkaContainer kafka;
 
-  private static final int TEST_NUM_SUBSCRIBERS = 1;
-  private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
-  private static final int TEST_POLLING_INTERVAL_MSEC = 100;
+  static KafkaContainer kafka;
+  static KafkaRestContainer kafkaRest;
+  static KafkaRestContainer kafkaRestWithId;
+  static GenericContainer<?> nginx;
+  static String restApiUsername;
+  static String restApiPassword;
+
+  static final int TEST_NUM_SUBSCRIBERS = 1;
+  static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName();
+  static final int TEST_POLLING_INTERVAL_MSEC = 100;
+  static final String KAFKA_REST_ID = "kafka-rest-instance-0";
   private static final int TEST_THREAD_POOL_SIZE = 10;
   private static final String TEST_INSTANCE_ID = "test-instance-id";
-  private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS;
+  private static final TimeUnit TEST_TIMEOUT_UNIT = TimeUnit.SECONDS;
   private static final int TEST_TIMEOUT = 30;
+  private static final int TEST_WAIT_FOR_MORE_MESSAGES_TIMEOUT = 5;
 
   private Injector injector;
   private KafkaSession session;
   private Gson gson;
+  protected ClientType clientType;
+
+  @Rule public TestName name = new TestName();
 
   public static class TestWorkQueue extends WorkQueue {
 
@@ -96,34 +117,55 @@
 
       bind(KafkaProperties.class).toInstance(kafkaProperties);
       bind(KafkaSession.class).in(Scopes.SINGLETON);
-      KafkaSubscriberProperties kafkaSubscriberProperties =
-          new KafkaSubscriberProperties(
-              TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS);
-      bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
-      bind(new TypeLiteral<KafkaProducer<String, String>>() {})
-          .toProvider(KafkaProducerProvider.class);
+
+      bindKafkaClientImpl();
 
       bind(WorkQueue.class).to(TestWorkQueue.class);
     }
+
+    protected void bindKafkaClientImpl() {
+      bind(new TypeLiteral<Producer<String, String>>() {}).toProvider(KafkaProducerProvider.class);
+      KafkaSubscriberProperties kafkaSubscriberProperties =
+          new KafkaSubscriberProperties(
+              TEST_POLLING_INTERVAL_MSEC, TEST_GROUP_ID, TEST_NUM_SUBSCRIBERS, ClientType.NATIVE);
+      bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+    }
   }
 
   public static class TestConsumer implements Consumer<Event> {
     public final List<Event> messages = new ArrayList<>();
-    private final CountDownLatch lock;
+    private CountDownLatch[] locks;
 
     public TestConsumer(int numMessagesExpected) {
-      lock = new CountDownLatch(numMessagesExpected);
+      resetExpectedMessages(numMessagesExpected);
+    }
+
+    public void resetExpectedMessages(int numMessagesExpected) {
+      locks = new CountDownLatch[numMessagesExpected];
+      for (int i = 0; i < numMessagesExpected; i++) {
+        locks[i] = new CountDownLatch(i + 1);
+      }
     }
 
     @Override
     public void accept(Event message) {
       messages.add(message);
-      lock.countDown();
+      for (CountDownLatch countDownLatch : locks) {
+        countDownLatch.countDown();
+      }
     }
 
     public boolean await() {
+      return await(locks.length);
+    }
+
+    public boolean await(int numItems) {
+      return await(numItems, TEST_TIMEOUT, TEST_TIMEOUT_UNIT);
+    }
+
+    public boolean await(int numItems, long timeout, TimeUnit unit) {
       try {
-        return lock.await(TEST_TIMEOUT, TEST_TIMOUT_UNIT);
+        return locks[numItems - 1].await(timeout, unit);
       } catch (InterruptedException e) {
         return false;
       }
@@ -132,20 +174,45 @@
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    kafka = new KafkaContainer();
+    kafka = KafkaContainerProvider.get();
     kafka.start();
+    kafkaRestWithId = new KafkaRestContainer(kafka, KAFKA_REST_ID, isAuthenticationProvided());
+    kafkaRestWithId.start();
+    kafkaRest = new KafkaRestContainer(kafka, isAuthenticationProvided());
+    kafkaRest.start();
+
     System.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
   }
 
+  @Before
+  public void setup() {
+    clientType = ClientType.NATIVE;
+  }
+
   @AfterClass
   public static void afterClass() {
-    if (kafka != null) {
-      kafka.stop();
+    stopContainer(kafka);
+    stopContainer(kafkaRest);
+    stopContainer(kafkaRestWithId);
+    stopContainer(nginx);
+  }
+
+  private static void stopContainer(GenericContainer<?> container) {
+    if (container != null) {
+      container.stop();
     }
   }
 
+  private static boolean isAuthenticationProvided() {
+    return !Strings.isNullOrEmpty(restApiUsername) && !Strings.isNullOrEmpty(restApiPassword);
+  }
+
+  protected TestModule newTestModule(KafkaProperties kafkaProperties) {
+    return new TestModule(kafkaProperties);
+  }
+
   public void connectToKafka(KafkaProperties kafkaProperties) {
-    Injector baseInjector = Guice.createInjector(new TestModule(kafkaProperties));
+    Injector baseInjector = Guice.createInjector(newTestModule(kafkaProperties));
     WorkQueue testWorkQueue = baseInjector.getInstance(WorkQueue.class);
     KafkaSubscriberProperties kafkaSubscriberProperties =
         baseInjector.getInstance(KafkaSubscriberProperties.class);
@@ -167,9 +234,11 @@
 
   @Test
   public void shouldSendSyncAndReceiveToTopic() {
-    connectToKafka(new KafkaProperties(false));
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
-    String testTopic = "test_topic_sync";
+    String testTopic = testTopic();
     TestConsumer testConsumer = new TestConsumer(1);
     Event testEventMessage = new ProjectCreatedEvent();
     testEventMessage.instanceId = TEST_INSTANCE_ID;
@@ -180,13 +249,21 @@
     assertThat(testConsumer.await()).isTrue();
     assertThat(testConsumer.messages).hasSize(1);
     assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+
+    assertNoMoreExpectedMessages(testConsumer);
+  }
+
+  private String testTopic() {
+    return "test_topic_" + name.getMethodName();
   }
 
   @Test
   public void shouldSendAsyncAndReceiveToTopic() {
-    connectToKafka(new KafkaProperties(true));
+    connectToKafka(
+        new KafkaProperties(
+            true, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
     KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
-    String testTopic = "test_topic_async";
+    String testTopic = testTopic();
     TestConsumer testConsumer = new TestConsumer(1);
     Event testEventMessage = new ProjectCreatedEvent();
     testEventMessage.instanceId = TEST_INSTANCE_ID;
@@ -197,5 +274,162 @@
     assertThat(testConsumer.await()).isTrue();
     assertThat(testConsumer.messages).hasSize(1);
     assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+
+    assertNoMoreExpectedMessages(testConsumer);
+  }
+
+  @Test
+  public void shouldSendToTopicAndResetOffset() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    Event testEventMessage = new ProjectCreatedEvent();
+
+    TestConsumer testConsumer = new TestConsumer(2);
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    assertThat(testConsumer.await(1)).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+
+    kafkaBrokerApi.replayAllEvents(testTopic);
+    assertThat(testConsumer.await(2)).isTrue();
+    assertThat(testConsumer.messages).hasSize(2);
+    assertThat(gson.toJson(testConsumer.messages.get(1))).isEqualTo(gson.toJson(testEventMessage));
+  }
+
+  @Test
+  public void shouldConsumerWithGroupIdConsumeMessage() {
+    connectToKafka(
+        new KafkaProperties(
+            true, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    TestConsumer testConsumer = new TestConsumer(1);
+    Event testEventMessage = new ProjectCreatedEvent();
+    testEventMessage.instanceId = TEST_INSTANCE_ID;
+
+    kafkaBrokerApi.send(testTopic, testEventMessage);
+    kafkaBrokerApi.receiveAsync(testTopic, "group-id-1", testConsumer);
+
+    assertThat(testConsumer.await()).isTrue();
+    assertThat(testConsumer.messages).hasSize(1);
+    assertThat(gson.toJson(testConsumer.messages.get(0))).isEqualTo(gson.toJson(testEventMessage));
+
+    assertNoMoreExpectedMessages(testConsumer);
+  }
+
+  @Test
+  public void shouldRegisterConsumerWithoutExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    TestConsumer testConsumer = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
+    assertThat(kafkaBrokerApi.topicSubscribers())
+        .containsExactly(topicSubscriber(testTopic, testConsumer));
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+  }
+
+  @Test
+  public void shouldRegisterConsumerWithExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer);
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer)));
+  }
+
+  @Test
+  public void shouldRegisterDifferentConsumersWithTheSameExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer1 = new TestConsumer(1);
+    TestConsumer testConsumer2 = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer1);
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer2);
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer1)),
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer2)));
+  }
+
+  @Test
+  public void shouldRegisterConsumerWithConfiguredGroupIdAndConsumerWithExternalGroupId() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer1 = new TestConsumer(1);
+    TestConsumer testConsumer2 = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, testConsumer1);
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer2);
+    assertThat(kafkaBrokerApi.topicSubscribers())
+        .containsExactly(topicSubscriber(testTopic, testConsumer1));
+
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer2)));
+  }
+
+  @Test
+  public void shouldNotRegisterTheSameConsumerWithExternalGroupIdTwicePerTopic() {
+    connectToKafka(
+        new KafkaProperties(
+            false, clientType, getKafkaRestApiUriString(), restApiUsername, restApiPassword));
+    KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class);
+    String testTopic = testTopic();
+    String groupId = "group_id_1";
+    TestConsumer testConsumer = new TestConsumer(1);
+
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId()).isEmpty();
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer);
+    kafkaBrokerApi.receiveAsync(testTopic, groupId, testConsumer);
+    assertThat(kafkaBrokerApi.topicSubscribers()).isEmpty();
+    assertThat(kafkaBrokerApi.topicSubscribersWithGroupId())
+        .containsExactly(
+            topicSubscriberWithGroupId(groupId, topicSubscriber(testTopic, testConsumer)));
+  }
+
+  protected String getKafkaRestApiUriString() {
+    return null;
+  }
+
+  private void assertNoMoreExpectedMessages(TestConsumer testConsumer) {
+    testConsumer.resetExpectedMessages(1);
+    assertThat(testConsumer.await(1, TEST_WAIT_FOR_MORE_MESSAGES_TIMEOUT, TEST_TIMEOUT_UNIT))
+        .isFalse();
   }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
new file mode 100644
index 0000000..5be61ab
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTest.java
@@ -0,0 +1,75 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaRestProducer;
+import com.googlesource.gerrit.plugins.kafka.rest.FutureExecutor;
+import com.googlesource.gerrit.plugins.kafka.rest.HttpHostProxy;
+import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.kafka.clients.producer.Producer;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerRestApiTest extends KafkaBrokerRestApiTestBase {
+
+  @Override
+  @Before
+  public void setup() {
+    clientType = ClientType.REST;
+  }
+
+  @Override
+  protected TestModule newTestModule(KafkaProperties kafkaProperties) {
+    return new TestModule(kafkaProperties) {
+
+      @Override
+      protected void bindKafkaClientImpl() {
+        bind(new TypeLiteral<Producer<String, String>>() {}).to(KafkaRestProducer.class);
+        bind(ExecutorService.class)
+            .annotatedWith(FutureExecutor.class)
+            .toInstance(Executors.newCachedThreadPool());
+
+        KafkaSubscriberProperties kafkaSubscriberProperties =
+            new KafkaSubscriberProperties(
+                TEST_POLLING_INTERVAL_MSEC,
+                TEST_GROUP_ID,
+                TEST_NUM_SUBSCRIBERS,
+                ClientType.REST,
+                getKafkaRestApiUriString(),
+                restApiUsername,
+                restApiPassword);
+        bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+
+        bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
+
+        install(new FactoryModuleBuilder().build(KafkaRestClient.Factory.class));
+      }
+    };
+  }
+
+  @Override
+  protected String getKafkaRestApiUriString() {
+    return kafkaRest.getApiURI().toString();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
new file mode 100644
index 0000000..cee073f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiTestBase.java
@@ -0,0 +1,69 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.googlesource.gerrit.plugins.kafka.KafkaRestContainer;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+@Ignore
+public class KafkaBrokerRestApiTestBase extends KafkaBrokerApiTest {
+  private static final String NGINX_IMAGE = "nginx:1.21.5";
+
+  @SuppressWarnings("resource")
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    KafkaBrokerApiTest.beforeClass();
+    String nginxKafkaConf =
+        String.format(
+            "server {\\n"
+                + "  listen       80  default_server;\\n"
+                + "  listen  [::]:80  default_server;\\n"
+                + "  location     /%s/ {\\n"
+                + "    proxy_pass http://%s:%d/; \\n"
+                + "	   proxy_set_header Authorization \"Basic Z2Vycml0OnNlY3JldA==\";\\n"
+                + "  }\\n"
+                + "  location     / {\\n"
+                + "    proxy_pass http://%s:%d; \\n"
+                + "	   proxy_set_header Authorization \"Basic Z2Vycml0OnNlY3JldA==\";\\n"
+                + "  }\\n"
+                + "}",
+            KAFKA_REST_ID,
+            kafkaRestWithId.getKafkaRestContainerIP(),
+            KafkaRestContainer.KAFKA_REST_PORT,
+            kafkaRestWithId.getKafkaRestContainerIP(),
+            KafkaRestContainer.KAFKA_REST_PORT);
+    nginx =
+        new GenericContainer<>(
+                new ImageFromDockerfile()
+                    .withDockerfileFromBuilder(
+                        builder ->
+                            builder
+                                .from(NGINX_IMAGE)
+                                .run(
+                                    "sh",
+                                    "-c",
+                                    String.format(
+                                        "echo '%s' | tee /etc/nginx/conf.d/default.conf",
+                                        nginxKafkaConf))
+                                .build()))
+            .withExposedPorts(80)
+            .withNetwork(kafkaRestWithId.getNetwork())
+            .waitingFor(new HttpWaitStrategy());
+    nginx.start();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java
new file mode 100644
index 0000000..abd054f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithAuthenticationTest.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerRestApiWithAuthenticationTest extends KafkaBrokerRestApiTest {
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    restApiUsername = "gerrit";
+    restApiPassword = "secret";
+    KafkaBrokerRestApiTest.beforeClass();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
new file mode 100644
index 0000000..6ee06e7
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerRestApiWithIdPrefixTest.java
@@ -0,0 +1,83 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.api;
+
+import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
+import com.googlesource.gerrit.plugins.kafka.publish.KafkaRestProducer;
+import com.googlesource.gerrit.plugins.kafka.rest.FutureExecutor;
+import com.googlesource.gerrit.plugins.kafka.rest.HttpHostProxy;
+import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.kafka.clients.producer.Producer;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBrokerRestApiWithIdPrefixTest extends KafkaBrokerRestApiTestBase {
+
+  @Override
+  @Before
+  public void setup() {
+    clientType = ClientType.REST;
+  }
+
+  @Override
+  protected TestModule newTestModule(KafkaProperties kafkaProperties) {
+    return new TestModule(kafkaProperties) {
+
+      @Override
+      protected void bindKafkaClientImpl() {
+        bind(new TypeLiteral<Producer<String, String>>() {}).to(KafkaRestProducer.class);
+        bind(ExecutorService.class)
+            .annotatedWith(FutureExecutor.class)
+            .toInstance(Executors.newCachedThreadPool());
+
+        KafkaSubscriberProperties kafkaSubscriberProperties =
+            new KafkaSubscriberProperties(
+                TEST_POLLING_INTERVAL_MSEC,
+                TEST_GROUP_ID,
+                TEST_NUM_SUBSCRIBERS,
+                ClientType.REST,
+                getApiUriString(),
+                null,
+                null);
+        bind(KafkaSubscriberProperties.class).toInstance(kafkaSubscriberProperties);
+
+        bind(HttpHostProxy.class).toInstance(new HttpHostProxy(null, null, null));
+
+        install(new FactoryModuleBuilder().build(KafkaRestClient.Factory.class));
+      }
+    };
+  }
+
+  @Override
+  protected String getKafkaRestApiUriString() {
+    return getApiUriString();
+  }
+
+  private static String getApiUriString() {
+    return String.format(
+        "http://%s:%d/%s",
+        nginx.getHost(),
+        nginx.getLivenessCheckPortNumbers().iterator().next(),
+        KafkaProperties.REST_API_URI_ID_PLACEHOLDER);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index 5aa9ca8..59e7ca2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.kafka.publish;
 
+import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.only;
 import static org.mockito.Mockito.verify;
@@ -21,10 +22,11 @@
 
 import com.google.common.util.concurrent.Futures;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
 import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Before;
@@ -38,7 +40,7 @@
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaSessionTest {
   KafkaSession objectUnderTest;
-  @Mock KafkaProducer<String, String> kafkaProducer;
+  @Mock Producer<String, String> kafkaProducer;
   @Mock KafkaProducerProvider producerProvider;
   @Mock KafkaProperties properties;
   @Mock KafkaEventsPublisherMetrics publisherMetrics;
@@ -52,17 +54,19 @@
   public void setUp() {
     when(producerProvider.get()).thenReturn(kafkaProducer);
     when(properties.getTopic()).thenReturn(topic);
+    when(properties.getProperty("bootstrap.servers")).thenReturn("localhost:9092");
+    when(properties.getClientType()).thenReturn(ClientType.NATIVE);
 
     recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
 
     objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
-    objectUnderTest.connect();
   }
 
   @Test
   public void shouldIncrementBrokerMetricCounterWhenMessagePublishedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+    objectUnderTest.connect();
     objectUnderTest.publish(message);
     verify(publisherMetrics, only()).incrementBrokerPublishedMessage();
   }
@@ -71,6 +75,7 @@
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
+    objectUnderTest.connect();
     objectUnderTest.publish(message);
     verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
@@ -80,6 +85,7 @@
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
+      objectUnderTest.connect();
       objectUnderTest.publish(message);
     } catch (RuntimeException e) {
       // expected
@@ -92,6 +98,7 @@
     when(properties.isSendAsync()).thenReturn(true);
     when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
 
+    objectUnderTest.connect();
     objectUnderTest.publish(message);
 
     verify(kafkaProducer).send(any(), callbackCaptor.capture());
@@ -105,6 +112,7 @@
     when(kafkaProducer.send(any(), any()))
         .thenReturn(Futures.immediateFailedFuture(new Exception()));
 
+    objectUnderTest.connect();
     objectUnderTest.publish(message);
 
     verify(kafkaProducer).send(any(), callbackCaptor.capture());
@@ -118,10 +126,18 @@
     when(kafkaProducer.send(any(), any()))
         .thenThrow(new RuntimeException("Unexpected runtime exception"));
     try {
+      objectUnderTest.connect();
       objectUnderTest.publish(message);
     } catch (RuntimeException e) {
       // expected
     }
     verify(publisherMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
+
+  @Test
+  public void shouldNotConnectKafkaSessionWhenBoostrapServersAreNotSet() {
+    when(properties.getProperty("bootstrap.servers")).thenReturn(null);
+    objectUnderTest.connect();
+    assertThat(objectUnderTest.isOpen()).isFalse();
+  }
 }
diff --git a/src/test/resources/password.properties b/src/test/resources/password.properties
new file mode 100644
index 0000000..bcff92d
--- /dev/null
+++ b/src/test/resources/password.properties
@@ -0,0 +1 @@
+gerrit: secret,GerritRole
\ No newline at end of file
diff --git a/src/test/resources/rest-jaas.properties b/src/test/resources/rest-jaas.properties
new file mode 100644
index 0000000..719a030
--- /dev/null
+++ b/src/test/resources/rest-jaas.properties
@@ -0,0 +1,5 @@
+KafkaRest {
+    org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+    debug="true"
+    file="/etc/kafka-rest/password.properties";
+};
\ No newline at end of file