Initial master commit from kinesis-events repo

Change-Id: Ie3798159ca8eaadc209911354c1df72bdac1331f
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..e9368d5
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,93 @@
+load("@rules_java//java:defs.bzl", "java_library")
+load("//tools/bzl:junit.bzl", "junit_tests")
+load(
+    "//tools/bzl:plugin.bzl",
+    "PLUGIN_DEPS",
+    "PLUGIN_TEST_DEPS",
+    "gerrit_plugin",
+)
+
+gerrit_plugin(
+    name = "events-aws-kinesis",
+    srcs = glob(["src/main/java/**/*.java"]),
+    manifest_entries = [
+        "Gerrit-PluginName: events-aws-kinesis",
+        "Gerrit-Module: com.googlesource.gerrit.plugins.kinesis.Module",
+        "Implementation-Title: Gerrit events listener to send events to AWS Kinesis broker",
+        "Implementation-URL: https://gerrit.googlesource.com/plugins/events-aws-kinesis",
+    ],
+    resources = glob(["src/main/resources/**/*"]),
+    deps = [
+        "@amazon-auth//jar",
+        "@amazon-aws-core//jar",
+        "@amazon-cloudwatch//jar",
+        "@amazon-dynamodb//jar",
+        "@amazon-http-client-spi//jar",
+        "@amazon-kinesis-client//jar",
+        "@amazon-kinesis//jar",
+        "@amazon-netty-nio-client//jar",
+        "@amazon-profiles//jar",
+        "@amazon-regions//jar",
+        "@amazon-sdk-core//jar",
+        "@amazon-utils//jar",
+        "@apache-commons-io//jar",
+        "@apache-commons-lang3//jar",
+        "@aws-glue-schema-serde//jar",
+        "@aws-java-sdk-core//jar",
+        "@awssdk-cbor-protocol//jar",
+        "@awssdk-json-protocol//jar",
+        "@awssdk-kinesis-producer//jar",
+        "@awssdk-metrics-spi//jar",
+        "@awssdk-protocol-core//jar",
+        "@awssdk-query-protocol//jar",
+        "@commons-codec//jar",
+        "@events-broker//jar:neverlink",
+        "@io-netty-all//jar",
+        "@jackson-annotations//jar",
+        "@jackson-core//jar",
+        "@jackson-databind//jar",
+        "@jackson-dataformat-cbor//jar",
+        "@javax-xml-bind//jar",
+        "@reactive-streams//jar",
+        "@reactor-core//jar",
+        "@rxjava//jar",
+    ],
+)
+
+junit_tests(
+    name = "kinesis_events_tests",
+    timeout = "long",
+    srcs = glob(["src/test/java/**/*.java"]),
+    tags = ["events-aws-kinesis"],
+    deps = [
+        ":events-aws-kinesis__plugin_test_deps",
+        "//lib/testcontainers",
+        "@amazon-http-client-spi//jar",
+        "@amazon-kinesis-client//jar",
+        "@amazon-kinesis//jar",
+        "@events-broker//jar",
+        "@testcontainer-localstack//jar",
+    ],
+)
+
+java_library(
+    name = "events-aws-kinesis__plugin_test_deps",
+    testonly = 1,
+    visibility = ["//visibility:public"],
+    exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
+        ":events-aws-kinesis__plugin",
+        "//lib/jackson:jackson-annotations",
+        "//lib/testcontainers",
+        "//lib/testcontainers:docker-java-api",
+        "//lib/testcontainers:docker-java-transport",
+        "@amazon-regions//jar",
+        "@amazon-auth//jar",
+        "@amazon-kinesis//jar",
+        "@amazon-aws-core//jar",
+        "@amazon-sdk-core//jar",
+        "@amazon-profiles//jar",
+        "@aws-java-sdk-core//jar",
+        "@awssdk-url-connection-client//jar",
+        "@amazon-dynamodb//jar",
+    ],
+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..11069ed
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..564c6cc
--- /dev/null
+++ b/README.md
@@ -0,0 +1,9 @@
+# events-aws-kinesis
+Provide producer and consumer of events streamed over
+[AWS kinesis](https://aws.amazon.com/kinesis/).
+
+## Build
+Information on how to build this plugin can be found [here](./src/main/resources/Documentation/Build.md)
+
+## Configuration
+Information on how to configure this plugin can be found [here](./src/main/resources/Documentation/Config.md)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..b219489
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,204 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+AWS_SDK_VER = "2.16.19"
+AWS_KINESIS_VER = "2.3.4"
+JACKSON_VER = "2.10.4"
+
+def external_plugin_deps():
+    maven_jar(
+        name = "junit-platform",
+        artifact = "org.junit.platform:junit-platform-commons:1.4.0",
+        sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
+    )
+
+    maven_jar(
+        name = "amazon-kinesis-client",
+        artifact = "software.amazon.kinesis:amazon-kinesis-client:" + AWS_KINESIS_VER,
+        sha1 = "6bb6fcbc5a0f6fd6085f3b1589e738485b0b7867",
+    )
+
+    maven_jar(
+        name = "amazon-kinesis",
+        artifact = "software.amazon.awssdk:kinesis:" + AWS_SDK_VER,
+        sha1 = "bec13fc5ef9225d1a10f13fbe1de8cb114448cf8",
+    )
+
+    maven_jar(
+        name = "amazon-dynamodb",
+        artifact = "software.amazon.awssdk:dynamodb:" + AWS_SDK_VER,
+        sha1 = "33ec7d291973658779b5777db2a0214a5c469e81",
+    )
+
+    maven_jar(
+        name = "amazon-cloudwatch",
+        artifact = "software.amazon.awssdk:cloudwatch:" + AWS_SDK_VER,
+        sha1 = "7585fbe349a92e0a9f040e4194ac89ca32e7983d",
+    )
+
+    maven_jar(
+        name = "amazon-regions",
+        artifact = "software.amazon.awssdk:regions:" + AWS_SDK_VER,
+        sha1 = "089f4f3d3ef20b2486f09e71da638c03100eab64",
+    )
+
+    maven_jar(
+        name = "amazon-netty-nio-client",
+        artifact = "software.amazon.awssdk:netty-nio-client:" + AWS_SDK_VER,
+        sha1 = "bb674feda8417513a647c7aa8cba9a537068d099",
+    )
+
+    maven_jar(
+        name = "amazon-utils",
+        artifact = "software.amazon.awssdk:utils:" + AWS_SDK_VER,
+        sha1 = "53edaa1f884682ac3091293eff3eb024ed0e36bb",
+    )
+
+    maven_jar(
+        name = "amazon-sdk-core",
+        artifact = "software.amazon.awssdk:sdk-core:" + AWS_SDK_VER,
+        sha1 = "02a60fd9c138048272ef8b6c80ae67491dd386a9",
+    )
+
+    maven_jar(
+        name = "amazon-aws-core",
+        artifact = "software.amazon.awssdk:aws-core:" + AWS_SDK_VER,
+        sha1 = "0f50f5cf2698a0de7d2d77322cbf3fb13f76187f",
+    )
+
+    maven_jar(
+        name = "amazon-http-client-spi",
+        artifact = "software.amazon.awssdk:http-client-spi:" + AWS_SDK_VER,
+        sha1 = "e4027e7e0cb064602100b34e19f131983f76f872",
+    )
+
+    maven_jar(
+        name = "amazon-auth",
+        artifact = "software.amazon.awssdk:auth:" + AWS_SDK_VER,
+        sha1 = "4163754b2a0eadcb569a35f0666fd5d859e43ef8",
+    )
+
+    maven_jar(
+        name = "reactive-streams",
+        artifact = "org.reactivestreams:reactive-streams:1.0.2",
+        sha1 = "323964c36556eb0e6209f65c1cef72b53b461ab8",
+    )
+
+    maven_jar(
+        name = "reactor-core",
+        artifact = "io.projectreactor:reactor-core:3.4.3",
+        sha1 = "df23dbdf95f892f7a04292d040fd8b308bd66602",
+    )
+
+    maven_jar(
+        name = "rxjava",
+        artifact = "io.reactivex.rxjava2:rxjava:2.1.14",
+        sha1 = "20dbf7496e417da474eda12717bf4653dbbd5a6b",
+    )
+
+    maven_jar(
+        name = "jackson-databind",
+        artifact = "com.fasterxml.jackson.core:jackson-databind:" + JACKSON_VER,
+        sha1 = "76e9152e93d4cf052f93a64596f633ba5b1c8ed9",
+    )
+
+    maven_jar(
+        name = "jackson-dataformat-cbor",
+        artifact = "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:" + JACKSON_VER,
+        sha1 = "c854bb2d46138198cb5d4aae86ef6c04b8bc1e70",
+    )
+
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.3.0-rc7",
+        sha1 = "5efe1c4a0f7c385b0ec95b8f9897248049c4173c",
+    )
+
+    maven_jar(
+        name = 'io-netty-all',
+        artifact = 'io.netty:netty-all:4.1.51.Final',
+        sha1 = '5e5f741acc4c211ac4572c31c7e5277ec465e4e4',
+    )
+
+    maven_jar(
+        name = 'awssdk-query-protocol',
+        artifact = 'software.amazon.awssdk:aws-query-protocol:' + AWS_SDK_VER,
+        sha1 = '4c88c66daa5039813e879b324636d15fa2802787',
+    )
+
+    maven_jar(
+        name = 'awssdk-protocol-core',
+        artifact = 'software.amazon.awssdk:protocol-core:' + AWS_SDK_VER,
+        sha1 = '6200c1617f87eed0216c6afab35bab2403da140c',
+    )
+
+    maven_jar(
+        name = 'awssdk-json-protocol',
+        artifact = 'software.amazon.awssdk:aws-json-protocol:' + AWS_SDK_VER,
+        sha1 = '16449e555f61607b917dc7f242c1928298de9bdd',
+    )
+
+    maven_jar(
+        name = 'awssdk-cbor-protocol',
+        artifact = 'software.amazon.awssdk:aws-cbor-protocol:' + AWS_SDK_VER,
+        sha1 = '7353a868437576b9e4911779ae66a85ef6be0d9e',
+    )
+
+    maven_jar(
+        name = 'awssdk-metrics-spi',
+        artifact = 'software.amazon.awssdk:metrics-spi:' + AWS_SDK_VER,
+        sha1 = 'd8669974b412766751b5eaf9c1edad908bfe5c38',
+    )
+
+    maven_jar(
+        name = 'amazon-profiles',
+        artifact = 'software.amazon.awssdk:profiles:' + AWS_SDK_VER,
+        sha1 = '5add2a843de43bd0acf45e1ab8c2b94c3638dd66',
+    )
+
+    maven_jar(
+        name = 'apache-commons-lang3',
+        artifact = 'org.apache.commons:commons-lang3:3.12.0',
+        sha1 = 'c6842c86792ff03b9f1d1fe2aab8dc23aa6c6f0e',
+    )
+
+    maven_jar(
+        name = 'testcontainer-localstack',
+        artifact = 'org.testcontainers:localstack:1.15.2',
+        sha1 = 'ae3c4717bc5f37410abbb490cb46d349a77990a0',
+    )
+
+    maven_jar(
+        name = 'aws-java-sdk-core',
+        artifact = 'com.amazonaws:aws-java-sdk-core:1.11.960',
+        sha1 = '18b6b2a5cb83a0e2e33a593302b5dbe0ca2ade64',
+    )
+
+    maven_jar(
+        name = 'awssdk-url-connection-client',
+        artifact = 'software.amazon.awssdk:url-connection-client:' + AWS_SDK_VER,
+        sha1 = 'b84ac8bae45841bc65af3c4f55164d9a3399b653',
+    )
+
+    maven_jar(
+        name = 'awssdk-kinesis-producer',
+        artifact = 'com.amazonaws:amazon-kinesis-producer:0.14.6',
+        sha1 = '7f83582df816dccc5217f05ece309a5cd8c7a9a5',
+    )
+
+    maven_jar(
+        name = 'aws-glue-schema-serde',
+        artifact = 'software.amazon.glue:schema-registry-serde:1.0.0',
+        sha1 = '30815b670f89876465caa69b47e6df6fd6875d0f',
+    )
+
+    maven_jar(
+        name = 'apache-commons-io',
+        artifact = 'commons-io:commons-io:2.4',
+        sha1 = 'b1b6ea3b7e4aa4f492509a4952029cd8e48019ad',
+    )
+
+    maven_jar(
+        name = 'javax-xml-bind',
+        artifact = 'javax.xml.bind:jaxb-api:2.3.1',
+        sha1 = '8531ad5ac454cc2deb9d4d32c40c4d7451939b5d',
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/AWSLogLevelListener.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/AWSLogLevelListener.java
new file mode 100644
index 0000000..cff8637
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/AWSLogLevelListener.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.inject.Inject;
+import java.util.stream.Stream;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class AWSLogLevelListener implements LifecycleListener {
+  private final Level awsLibLogLevel;
+
+  @Inject
+  AWSLogLevelListener(Configuration configuration) {
+    this.awsLibLogLevel = configuration.getAwsLibLogLevel();
+  }
+
+  @Override
+  public void start() {
+    Stream.of("software.amazon", "com.amazonaws")
+        .forEach(s -> Logger.getLogger(s).setLevel(awsLibLogLevel));
+  }
+
+  @Override
+  public void stop() {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
new file mode 100644
index 0000000..86fad53
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CheckpointResetter.java
@@ -0,0 +1,124 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.cosumerLeaseName;
+import static software.amazon.kinesis.common.InitialPositionInStream.TRIM_HORIZON;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;
+
+@Singleton
+class CheckpointResetter {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private static final String LEASE_KEY_ATTRIBUTE_NAME = "leaseKey";
+  private static final String LEASE_CHECKPOINT_ATTRIBUTE_VAUE = "checkpoint";
+  private static final Integer DYNAMODB_RESPONSE_TIMEOUT_SECS = 5;
+
+  private final Configuration configuration;
+  private final DynamoDbAsyncClient dynamoDbAsyncClient;
+
+  @Inject
+  CheckpointResetter(Configuration configuration, DynamoDbAsyncClient dynamoDbAsyncClient) {
+    this.configuration = configuration;
+    this.dynamoDbAsyncClient = dynamoDbAsyncClient;
+  }
+
+  public void setAllShardsToBeginning(String streamName) {
+    String leaseTable = cosumerLeaseName(configuration.getApplicationName(), streamName);
+
+    try {
+      for (String shard : getAllShards(leaseTable)) {
+        logger.atInfo().log("[%s - %s] Resetting checkpoint", leaseTable, shard);
+
+        Map<String, AttributeValue> updateKey = new HashMap<>();
+        updateKey.put(LEASE_KEY_ATTRIBUTE_NAME, AttributeValue.builder().s(shard).build());
+
+        Map<String, AttributeValueUpdate> updateValues = new HashMap<>();
+        updateValues.put(
+            LEASE_CHECKPOINT_ATTRIBUTE_VAUE,
+            AttributeValueUpdate.builder()
+                .value(AttributeValue.builder().s(TRIM_HORIZON.name().toUpperCase()).build())
+                .build());
+
+        UpdateItemResponse updateItemResponse =
+            dynamoDbAsyncClient
+                .updateItem(
+                    UpdateItemRequest.builder()
+                        .tableName(leaseTable)
+                        .key(updateKey)
+                        .attributeUpdates(updateValues)
+                        .returnValues(ReturnValue.ALL_OLD)
+                        .build())
+                .get(DYNAMODB_RESPONSE_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+        logger.atInfo().log(
+            "[%s - %s] Successfully reset checkpoints. old value: %s",
+            leaseTable, shard, updateItemResponse);
+      }
+    } catch (InterruptedException e) {
+      logger.atWarning().log("%s resetOffset: interrupted", leaseTable);
+    } catch (ExecutionException e) {
+      logger.atSevere().withCause(e).log("%s resetOffset: Error", leaseTable);
+    } catch (TimeoutException e) {
+      logger.atSevere().withCause(e).log("%s resetOffset: Timeout", leaseTable);
+    }
+  }
+
+  private Set<String> getAllShards(String leaseTable)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    try {
+      ScanRequest scanRequest =
+          ScanRequest.builder()
+              .tableName(leaseTable)
+              .attributesToGet(LEASE_KEY_ATTRIBUTE_NAME)
+              .build();
+
+      ScanResponse scanResponse =
+          dynamoDbAsyncClient
+              .scan(scanRequest)
+              .get(DYNAMODB_RESPONSE_TIMEOUT_SECS, TimeUnit.SECONDS);
+      return scanResponse.items().stream()
+          .map(i -> i.get(LEASE_KEY_ATTRIBUTE_NAME).s())
+          .collect(Collectors.toSet());
+    } catch (Exception e) {
+      if (e.getCause() != null && e.getCause() instanceof ResourceNotFoundException) {
+        logger.atWarning().log(
+            "%s resetOffset: lease table does not exist, nothing to reset.", leaseTable);
+        return Collections.emptySet();
+      }
+      throw e;
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/CloudWatchAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CloudWatchAsyncClientProvider.java
new file mode 100644
index 0000000..e511313
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/CloudWatchAsyncClientProvider.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
+
+@Singleton
+class CloudWatchAsyncClientProvider implements Provider<CloudWatchAsyncClient> {
+  private final Configuration configuration;
+
+  @Inject
+  CloudWatchAsyncClientProvider(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public CloudWatchAsyncClient get() {
+    CloudWatchAsyncClientBuilder builder = CloudWatchAsyncClient.builder();
+    configuration.getRegion().ifPresent(builder::region);
+    configuration.getEndpoint().ifPresent(builder::endpointOverride);
+
+    return builder.build();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
new file mode 100644
index 0000000..ef7e698
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Configuration.java
@@ -0,0 +1,182 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.net.URI;
+import java.util.Optional;
+import org.apache.log4j.Level;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
+
+@Singleton
+class Configuration {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+  private static final String DEFAULT_INITIAL_POSITION = "latest";
+  private static final Long DEFAULT_POLLING_INTERVAL_MS = 1000L;
+  private static final Integer DEFAULT_MAX_RECORDS = 100;
+  private static final Long DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS = 6000L;
+  private static final Long DEFAULT_PUBLISH_TIMEOUT_MS = 6000L;
+  private static final Long DEFAULT_SHUTDOWN_TIMEOUT_MS = 20000L;
+  private static final Level DEFAULT_AWS_LIB_LOG_LEVEL = Level.WARN;
+  private static final Boolean DEFAULT_SEND_ASYNC = true;
+
+  private final String applicationName;
+  private final String streamEventsTopic;
+  private final int numberOfSubscribers;
+  private final InitialPositionInStream initialPosition;
+  private final Optional<Region> region;
+  private final Optional<URI> endpoint;
+  private final Long pollingIntervalMs;
+  private final Integer maxRecords;
+  private final Long publishTimeoutMs;
+  private final Long publishSingleRequestTimeoutMs;
+  private final Long shutdownTimeoutMs;
+  private final Level awsLibLogLevel;
+  private final Boolean sendAsync;
+
+  @Inject
+  public Configuration(PluginConfigFactory configFactory, @PluginName String pluginName) {
+    PluginConfig pluginConfig = configFactory.getFromGerritConfig(pluginName);
+
+    this.region = Optional.ofNullable(getStringParam(pluginConfig, "region", null)).map(Region::of);
+    this.endpoint =
+        Optional.ofNullable(getStringParam(pluginConfig, "endpoint", null)).map(URI::create);
+    this.streamEventsTopic = getStringParam(pluginConfig, "topic", DEFAULT_STREAM_EVENTS_TOPIC);
+    this.numberOfSubscribers =
+        Integer.parseInt(
+            getStringParam(pluginConfig, "numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+    this.applicationName = getStringParam(pluginConfig, "applicationName", pluginName);
+
+    this.initialPosition =
+        InitialPositionInStream.valueOf(
+            getStringParam(pluginConfig, "initialPosition", DEFAULT_INITIAL_POSITION)
+                .toUpperCase());
+
+    this.pollingIntervalMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "pollingIntervalMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_POLLING_INTERVAL_MS);
+
+    this.maxRecords =
+        Optional.ofNullable(getStringParam(pluginConfig, "maxRecords", null))
+            .map(Integer::parseInt)
+            .orElse(DEFAULT_MAX_RECORDS);
+
+    this.publishSingleRequestTimeoutMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "publishSingleRequestTimeoutMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_PUBLISH_SINGLE_REQUEST_TIMEOUT_MS);
+
+    this.publishTimeoutMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "publishTimeoutMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_PUBLISH_TIMEOUT_MS);
+
+    this.shutdownTimeoutMs =
+        Optional.ofNullable(getStringParam(pluginConfig, "shutdownTimeoutMs", null))
+            .map(Long::parseLong)
+            .orElse(DEFAULT_SHUTDOWN_TIMEOUT_MS);
+
+    this.awsLibLogLevel =
+        Optional.ofNullable(getStringParam(pluginConfig, "awsLibLogLevel", null))
+            .map(l -> Level.toLevel(l, DEFAULT_AWS_LIB_LOG_LEVEL))
+            .orElse(DEFAULT_AWS_LIB_LOG_LEVEL);
+
+    this.sendAsync =
+        Optional.ofNullable(getStringParam(pluginConfig, "sendAsync", null))
+            .map(Boolean::new)
+            .orElse(DEFAULT_SEND_ASYNC);
+
+    logger.atInfo().log(
+        "Kinesis client. Application:'%s'|PollingInterval: %s|maxRecords: %s%s%s",
+        applicationName,
+        pollingIntervalMs,
+        maxRecords,
+        region.map(r -> String.format("|region: %s", r.id())).orElse(""),
+        endpoint.map(e -> String.format("|endpoint: %s", e.toASCIIString())).orElse(""));
+  }
+
+  public String getStreamEventsTopic() {
+    return streamEventsTopic;
+  }
+
+  public int getNumberOfSubscribers() {
+    return numberOfSubscribers;
+  }
+
+  public String getApplicationName() {
+    return applicationName;
+  }
+
+  public Long getPublishTimeoutMs() {
+    return publishTimeoutMs;
+  }
+
+  public Optional<Region> getRegion() {
+    return region;
+  }
+
+  public Optional<URI> getEndpoint() {
+    return endpoint;
+  }
+
+  public Long getPublishSingleRequestTimeoutMs() {
+    return publishSingleRequestTimeoutMs;
+  }
+
+  public Long getPollingIntervalMs() {
+    return pollingIntervalMs;
+  }
+
+  public Integer getMaxRecords() {
+    return maxRecords;
+  }
+
+  public InitialPositionInStream getInitialPosition() {
+    return initialPosition;
+  }
+
+  private static String getStringParam(
+      PluginConfig pluginConfig, String name, String defaultValue) {
+    return Strings.isNullOrEmpty(System.getProperty(name))
+        ? pluginConfig.getString(name, defaultValue)
+        : System.getProperty(name);
+  }
+
+  public static String cosumerLeaseName(String applicationName, String streamName) {
+    return String.format("%s-%s", applicationName, streamName);
+  }
+
+  public Long getShutdownTimeoutMs() {
+    return shutdownTimeoutMs;
+  }
+
+  public Level getAwsLibLogLevel() {
+    return awsLibLogLevel;
+  }
+
+  public Boolean isSendAsync() {
+    return sendAsync;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ConsumerExecutor.java
new file mode 100644
index 0000000..406f05f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ConsumerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/ConsumerExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ConsumerExecutorProvider.java
new file mode 100644
index 0000000..34034b1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ConsumerExecutorProvider.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.util.concurrent.ExecutorService;
+
+class ConsumerExecutorProvider implements Provider<ExecutorService> {
+  private final WorkQueue workQueue;
+  private final Configuration configuration;
+
+  @Inject
+  ConsumerExecutorProvider(WorkQueue workQueue, Configuration configuration) {
+    this.workQueue = workQueue;
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ExecutorService get() {
+    return workQueue.createQueue(configuration.getNumberOfSubscribers(), "kinesis-consumers");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
new file mode 100644
index 0000000..f3812df
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/DynamoDbAsyncClientProvider.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
+
+@Singleton
+class DynamoDbAsyncClientProvider implements Provider<DynamoDbAsyncClient> {
+  private final Configuration configuration;
+
+  @Inject
+  DynamoDbAsyncClientProvider(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public DynamoDbAsyncClient get() {
+    DynamoDbAsyncClientBuilder builder = DynamoDbAsyncClient.builder();
+    configuration.getRegion().ifPresent(builder::region);
+    configuration.getEndpoint().ifPresent(builder::endpointOverride);
+
+    return builder.build();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
new file mode 100644
index 0000000..c983d60
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisAsyncClientProvider.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.kinesis.common.KinesisClientUtil;
+
+@Singleton
+class KinesisAsyncClientProvider implements Provider<KinesisAsyncClient> {
+  private final Configuration configuration;
+
+  @Inject
+  KinesisAsyncClientProvider(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public KinesisAsyncClient get() {
+    KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder();
+    configuration.getRegion().ifPresent(builder::region);
+    configuration.getEndpoint().ifPresent(builder::endpointOverride);
+
+    return KinesisClientUtil.createKinesisAsyncClient(builder);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
new file mode 100644
index 0000000..4e4ca3f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerApi.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+class KinesisBrokerApi implements BrokerApi {
+  private final KinesisConsumer.Factory consumerFactory;
+
+  private final Gson gson;
+  private final KinesisPublisher kinesisPublisher;
+  private final Set<KinesisConsumer> consumers;
+
+  @Inject
+  public KinesisBrokerApi(
+      Gson gson, KinesisPublisher kinesisPublisher, KinesisConsumer.Factory consumerFactory) {
+    this.gson = gson;
+    this.kinesisPublisher = kinesisPublisher;
+    this.consumerFactory = consumerFactory;
+    this.consumers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+  }
+
+  @Override
+  public boolean send(String streamName, EventMessage event) {
+    return sendWithResult(streamName, event).isSuccess();
+  }
+
+  PublishResult sendWithResult(String streamName, EventMessage event) {
+    return kinesisPublisher.publish(
+        streamName, gson.toJson(event), event.getHeader().sourceInstanceId.toString());
+  }
+
+  @Override
+  public void receiveAsync(String streamName, Consumer<EventMessage> eventConsumer) {
+    KinesisConsumer consumer = consumerFactory.create(streamName, eventConsumer);
+    consumers.add(consumer);
+    consumer.subscribe(streamName, eventConsumer);
+  }
+
+  @Override
+  public Set<TopicSubscriber> topicSubscribers() {
+    return consumers.stream()
+        .map(s -> TopicSubscriber.topicSubscriber(s.getStreamName(), s.getMessageProcessor()))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
+  public void disconnect() {
+    consumers.parallelStream().forEach(KinesisConsumer::shutdown);
+    consumers.clear();
+  }
+
+  @Override
+  public void replayAllEvents(String topic) {
+    consumers.stream()
+        .filter(subscriber -> topic.equals(subscriber.getStreamName()))
+        .forEach(KinesisConsumer::resetOffset);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerLifeCycleManager.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerLifeCycleManager.java
new file mode 100644
index 0000000..67678dc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisBrokerLifeCycleManager.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Set;
+
+@Singleton
+class KinesisBrokerLifeCycleManager implements LifecycleListener {
+  private final Set<TopicSubscriber> consumers;
+  private final BrokerApi brokerApi;
+
+  @Inject
+  public KinesisBrokerLifeCycleManager(Set<TopicSubscriber> consumers, BrokerApi brokerApi) {
+    this.consumers = consumers;
+    this.brokerApi = brokerApi;
+  }
+
+  @Override
+  public void start() {
+    consumers.forEach(
+        topicSubscriber ->
+            brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+  }
+
+  @Override
+  public void stop() {
+    brokerApi.disconnect();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
new file mode 100644
index 0000000..4a610b5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisConsumer.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import software.amazon.kinesis.coordinator.Scheduler;
+
+class KinesisConsumer {
+  interface Factory {
+    KinesisConsumer create(String topic, Consumer<EventMessage> messageProcessor);
+  }
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private final SchedulerProvider.Factory schedulerFactory;
+  private final CheckpointResetter checkpointResetter;
+  private final Configuration configuration;
+  private final ExecutorService executor;
+  private Scheduler kinesisScheduler;
+
+  private java.util.function.Consumer<EventMessage> messageProcessor;
+  private String streamName;
+  private AtomicBoolean resetOffset = new AtomicBoolean(false);
+
+  @Inject
+  public KinesisConsumer(
+      SchedulerProvider.Factory schedulerFactory,
+      CheckpointResetter checkpointResetter,
+      Configuration configuration,
+      @ConsumerExecutor ExecutorService executor) {
+    this.schedulerFactory = schedulerFactory;
+    this.checkpointResetter = checkpointResetter;
+    this.configuration = configuration;
+    this.executor = executor;
+  }
+
+  public void subscribe(
+      String streamName, java.util.function.Consumer<EventMessage> messageProcessor) {
+    this.streamName = streamName;
+    this.messageProcessor = messageProcessor;
+
+    logger.atInfo().log("Subscribe kinesis consumer to stream [%s]", streamName);
+    runReceiver(messageProcessor);
+  }
+
+  private void runReceiver(java.util.function.Consumer<EventMessage> messageProcessor) {
+    this.kinesisScheduler =
+        schedulerFactory.create(streamName, resetOffset.getAndSet(false), messageProcessor).get();
+    executor.execute(kinesisScheduler);
+  }
+
+  public void shutdown() {
+    Future<Boolean> gracefulShutdownFuture = kinesisScheduler.startGracefulShutdown();
+    logger.atInfo().log(
+        "Waiting up to '%s' milliseconds to complete shutdown of kinesis consumer of stream '%s'",
+        configuration.getShutdownTimeoutMs(), getStreamName());
+    try {
+      gracefulShutdownFuture.get(configuration.getShutdownTimeoutMs(), TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "Error caught when shutting down kinesis consumer for stream %s", getStreamName());
+    }
+    logger.atInfo().log("Shutdown kinesis consumer of stream %s completed.", getStreamName());
+  }
+
+  public java.util.function.Consumer<EventMessage> getMessageProcessor() {
+    return messageProcessor;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public void resetOffset() {
+    // Move all checkpoints (if any) to TRIM_HORIZON, so that the consumer
+    // scheduler will start consuming from beginning.
+    checkpointResetter.setAllShardsToBeginning(streamName);
+
+    // Even when no checkpoints have been persisted, instruct the consumer
+    // scheduler to start from TRIM_HORIZON, irrespective of 'initialPosition'
+    // configuration.
+    resetOffset.set(true);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
new file mode 100644
index 0000000..02b65bf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisProducerProvider.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+@Singleton
+public class KinesisProducerProvider implements Provider<KinesisProducer> {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private final Configuration configuration;
+
+  @Inject
+  KinesisProducerProvider(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public KinesisProducer get() {
+    KinesisProducerConfiguration conf =
+        new KinesisProducerConfiguration()
+            .setAggregationEnabled(false)
+            .setMaxConnections(1)
+            .setRequestTimeout(configuration.getPublishSingleRequestTimeoutMs());
+
+    configuration.getRegion().ifPresent(r -> conf.setRegion(r.toString()));
+    configuration
+        .getEndpoint()
+        .ifPresent(
+            uri ->
+                conf.setKinesisEndpoint(uri.getHost())
+                    .setKinesisPort(uri.getPort())
+                    .setCloudwatchEndpoint(uri.getHost())
+                    .setCloudwatchPort(uri.getPort())
+                    .setVerifyCertificate(false));
+    logger.atInfo().log(
+        "Kinesis producer configured. Request Timeout (ms):'%s'%s%s",
+        configuration.getPublishSingleRequestTimeoutMs(),
+        configuration.getRegion().map(r -> String.format("|region: '%s'", r.id())).orElse(""),
+        configuration
+            .getEndpoint()
+            .map(e -> String.format("|endpoint: '%s'", e.toASCIIString()))
+            .orElse(""));
+
+    return new KinesisProducer(conf);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
new file mode 100644
index 0000000..9547191
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisPublisher.java
@@ -0,0 +1,173 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.amazonaws.services.kinesis.producer.Attempt;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.UserRecordResult;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Singleton
+class KinesisPublisher implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final KinesisProducer kinesisProducer;
+  private final Configuration configuration;
+  private final ExecutorService callBackExecutor;
+
+  private final Gson gson;
+
+  @Inject
+  public KinesisPublisher(
+      Gson gson,
+      KinesisProducer kinesisProducer,
+      Configuration configuration,
+      @ProducerCallbackExecutor ExecutorService callBackExecutor) {
+    this.gson = gson;
+    this.kinesisProducer = kinesisProducer;
+    this.configuration = configuration;
+    this.callBackExecutor = callBackExecutor;
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    publish(configuration.getStreamEventsTopic(), gson.toJson(event), event.getType());
+  }
+
+  PublishResult publish(String streamName, String stringEvent, String partitionKey) {
+    if (configuration.isSendAsync()) {
+      return publishAsync(streamName, stringEvent, partitionKey);
+    }
+    return publishSync(streamName, stringEvent, partitionKey);
+  }
+
+  private PublishResult publishSync(String streamName, String stringEvent, String partitionKey) {
+    logger.atFiner().log(
+        "KINESIS PRODUCER - Attempt to publish event %s to stream %s [PK: %s]",
+        stringEvent, streamName, partitionKey);
+
+    UserRecordResult result = null;
+    try {
+      result =
+          kinesisProducer
+              .addUserRecord(streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()))
+              .get(configuration.getPublishTimeoutMs(), TimeUnit.MILLISECONDS);
+
+      List<Attempt> attemptsDetails = result.getAttempts();
+      int numberOfAttempts = attemptsDetails.size();
+      if (result.isSuccessful()) {
+        logger.atFine().log(
+            "KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
+            stringEvent,
+            result.getShardId(),
+            partitionKey,
+            result.getSequenceNumber(),
+            numberOfAttempts);
+        return PublishResult.success(numberOfAttempts);
+      } else {
+        int currentIdx = numberOfAttempts - 1;
+        int previousIdx = currentIdx - 1;
+        Attempt current = attemptsDetails.get(currentIdx);
+        if (previousIdx >= 0) {
+          Attempt previous = attemptsDetails.get(previousIdx);
+          logger.atSevere().log(
+              String.format(
+                  "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s. Previous failure - %s : %s",
+                  stringEvent,
+                  partitionKey,
+                  current.getErrorCode(),
+                  current.getErrorMessage(),
+                  previous.getErrorCode(),
+                  previous.getErrorMessage()));
+        } else {
+          logger.atSevere().log(
+              String.format(
+                  "KINESIS PRODUCER - Failed publishing event '%s' [PK: %s] - %s : %s.",
+                  stringEvent, partitionKey, current.getErrorCode(), current.getErrorMessage()));
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.atSevere().withCause(e).log(
+          String.format(
+              "KINESIS PRODUCER - Interrupted publishing event '%s' [PK: %s]",
+              stringEvent, partitionKey));
+    } catch (ExecutionException e) {
+      logger.atSevere().withCause(e).log(
+          String.format(
+              "KINESIS PRODUCER - Error when publishing event '%s' [PK: %s]",
+              stringEvent, partitionKey));
+    } catch (TimeoutException e) {
+      logger.atSevere().withCause(e).log(
+          String.format(
+              "KINESIS PRODUCER - Timeout when publishing event '%s' [PK: %s]",
+              stringEvent, partitionKey));
+    }
+
+    return PublishResult.failure(
+        Optional.ofNullable(result).map(r -> r.getAttempts().size()).orElse(0));
+  }
+
+  private PublishResult publishAsync(String streamName, String stringEvent, String partitionKey) {
+    try {
+      ListenableFuture<UserRecordResult> publishF =
+          kinesisProducer.addUserRecord(
+              streamName, partitionKey, ByteBuffer.wrap(stringEvent.getBytes()));
+
+      Futures.addCallback(
+          publishF,
+          new FutureCallback<UserRecordResult>() {
+            @Override
+            public void onSuccess(UserRecordResult result) {
+              logger.atFine().log(
+                  "KINESIS PRODUCER - Successfully published event '%s' to shardId '%s' [PK: %s] [Sequence: %s] after %s attempt(s)",
+                  stringEvent,
+                  result.getShardId(),
+                  partitionKey,
+                  result.getSequenceNumber(),
+                  result.getAttempts().size());
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+              logger.atSevere().withCause(e).log(
+                  "KINESIS PRODUCER - Failed publishing event %s [PK: %s]",
+                  stringEvent, partitionKey);
+            }
+          },
+          callBackExecutor);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log(
+          "KINESIS PRODUCER - Error when publishing event %s [PK: %s]", stringEvent, partitionKey);
+      return PublishResult.failure(1);
+    }
+
+    return PublishResult.success(1);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
new file mode 100644
index 0000000..a57aa3f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import java.util.function.Consumer;
+import software.amazon.kinesis.exceptions.InvalidStateException;
+import software.amazon.kinesis.exceptions.ShutdownException;
+import software.amazon.kinesis.lifecycle.events.InitializationInput;
+import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
+import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
+import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
+import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
+import software.amazon.kinesis.processor.ShardRecordProcessor;
+
+class KinesisRecordProcessor implements ShardRecordProcessor {
+  interface Factory {
+    KinesisRecordProcessor create(Consumer<EventMessage> recordProcessor);
+  }
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private final Consumer<EventMessage> recordProcessor;
+  private final OneOffRequestContext oneOffCtx;
+  private final Gson gson;
+
+  @Inject
+  KinesisRecordProcessor(
+      @Assisted Consumer<EventMessage> recordProcessor, OneOffRequestContext oneOffCtx, Gson gson) {
+    this.recordProcessor = recordProcessor;
+    this.oneOffCtx = oneOffCtx;
+    this.gson = gson;
+  }
+
+  @Override
+  public void initialize(InitializationInput initializationInput) {
+    logger.atInfo().log(
+        "Initializing @ Sequence: %s", initializationInput.extendedSequenceNumber());
+  }
+
+  @Override
+  public void processRecords(ProcessRecordsInput processRecordsInput) {
+    try {
+      logger.atInfo().log("Processing %s record(s)", processRecordsInput.records().size());
+      processRecordsInput
+          .records()
+          .forEach(
+              consumerRecord -> {
+                logger.atFiner().log(
+                    "GERRIT > Processing record pk: %s -- %s",
+                    consumerRecord.partitionKey(), consumerRecord.sequenceNumber());
+                byte[] byteRecord = new byte[consumerRecord.data().remaining()];
+                consumerRecord.data().get(byteRecord);
+                String jsonMessage = new String(byteRecord);
+                logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
+                try (ManualRequestContext ctx = oneOffCtx.open()) {
+                  EventMessage eventMessage = gson.fromJson(jsonMessage, EventMessage.class);
+                  recordProcessor.accept(eventMessage);
+                } catch (Exception e) {
+                  logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
+                }
+              });
+    } catch (Throwable t) {
+      logger.atSevere().withCause(t).log("Caught throwable while processing records. Aborting.");
+    }
+  }
+
+  @Override
+  public void leaseLost(LeaseLostInput leaseLostInput) {
+    logger.atInfo().log("Lost lease, so terminating.");
+  }
+
+  @Override
+  public void shardEnded(ShardEndedInput shardEndedInput) {
+    try {
+      logger.atInfo().log("Reached shard end checkpointing.");
+      shardEndedInput.checkpointer().checkpoint();
+    } catch (ShutdownException | InvalidStateException e) {
+      logger.atSevere().withCause(e).log("Exception while checkpointing at shard end. Giving up.");
+    }
+  }
+
+  @Override
+  public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
+    try {
+      logger.atInfo().log("Scheduler is shutting down, checkpointing.");
+      shutdownRequestedInput.checkpointer().checkpoint();
+    } catch (ShutdownException | InvalidStateException e) {
+      logger.atSevere().withCause(e).log(
+          "Exception while checkpointing at requested shutdown. Giving up.");
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
new file mode 100644
index 0000000..557ad0f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessorFactory.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import java.util.function.Consumer;
+import software.amazon.kinesis.processor.ShardRecordProcessor;
+import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
+
+class KinesisRecordProcessorFactory implements ShardRecordProcessorFactory {
+  interface Factory {
+    KinesisRecordProcessorFactory create(Consumer<EventMessage> recordProcessor);
+  }
+
+  private final Consumer<EventMessage> recordProcessor;
+  private final KinesisRecordProcessor.Factory processorFactory;
+
+  @Inject
+  KinesisRecordProcessorFactory(
+      @Assisted Consumer<EventMessage> recordProcessor,
+      KinesisRecordProcessor.Factory processorFactory) {
+    this.recordProcessor = recordProcessor;
+    this.processorFactory = processorFactory;
+  }
+
+  public ShardRecordProcessor shardRecordProcessor() {
+    return processorFactory.create(recordProcessor);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
new file mode 100644
index 0000000..2629fe7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/Module.java
@@ -0,0 +1,85 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static com.google.inject.Scopes.SINGLETON;
+
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+public class Module extends LifecycleModule {
+  private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+
+  /**
+   * By default the events-broker library (loaded directly by the multi-site) registers a noop
+   * implementation, which itself registers a list of topic subscribers. Since we have no control
+   * over which plugin is loaded first (multi-site or events-aws-kinesis), we deal with the
+   * possibility that a broker api was already registered and, in that case, reassign its consumers
+   * to the events-aws-kinesis plugin. The injection is optional because if the events-aws-kinesis
+   * plugin is loaded first, then there will be no previously registered broker API.
+   *
+   * @param previousBrokerApi
+   */
+  @Inject(optional = true)
+  public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+    if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+      this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+    }
+  }
+
+  @Override
+  protected void configure() {
+    factory(KinesisRecordProcessor.Factory.class);
+    factory(KinesisRecordProcessorFactory.Factory.class);
+    bind(ExecutorService.class)
+        .annotatedWith(ConsumerExecutor.class)
+        .toProvider(ConsumerExecutorProvider.class)
+        .in(SINGLETON);
+    bind(ExecutorService.class)
+        .annotatedWith(ProducerCallbackExecutor.class)
+        .toProvider(ProducerCallbackExecutorProvider.class)
+        .in(SINGLETON);
+    bind(KinesisProducer.class).toProvider(KinesisProducerProvider.class).in(Scopes.SINGLETON);
+    bind(KinesisAsyncClient.class).toProvider(KinesisAsyncClientProvider.class).in(SINGLETON);
+    bind(DynamoDbAsyncClient.class).toProvider(DynamoDbAsyncClientProvider.class).in(SINGLETON);
+    bind(CloudWatchAsyncClient.class).toProvider(CloudWatchAsyncClientProvider.class).in(SINGLETON);
+    factory(SchedulerProvider.Factory.class);
+    bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+    DynamicItem.bind(binder(), BrokerApi.class).to(KinesisBrokerApi.class).in(Scopes.SINGLETON);
+    bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+    DynamicSet.bind(binder(), LifecycleListener.class).to(KinesisBrokerLifeCycleManager.class);
+    factory(KinesisConsumer.Factory.class);
+    DynamicSet.bind(binder(), EventListener.class).to(KinesisPublisher.class);
+    listener().to(AWSLogLevelListener.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/ProducerCallbackExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ProducerCallbackExecutor.java
new file mode 100644
index 0000000..8cd58f9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ProducerCallbackExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ProducerCallbackExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/ProducerCallbackExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ProducerCallbackExecutorProvider.java
new file mode 100644
index 0000000..df9c7ed
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/ProducerCallbackExecutorProvider.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
+import com.google.inject.Provider;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+class ProducerCallbackExecutorProvider implements Provider<ExecutorService> {
+
+  @Override
+  public ExecutorService get() {
+    // Currently, we use 1 thread only when publishing, so it makes sense to
+    // have the same when processing the related callbacks.
+    return new LoggingContextAwareExecutorService(
+        Executors.newFixedThreadPool(
+            1,
+            new ThreadFactoryBuilder()
+                .setNameFormat("kinesis-producer-callback-executor-%d")
+                .build()));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
new file mode 100644
index 0000000..202f3e9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/PublishResult.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+abstract class PublishResult {
+  public abstract boolean isSuccess();
+
+  public abstract int attempts();
+
+  public static PublishResult success(int attempts) {
+    return new AutoValue_PublishResult(true, attempts);
+  }
+
+  public static PublishResult failure(int attempts) {
+    return new AutoValue_PublishResult(false, attempts);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
new file mode 100644
index 0000000..19079bf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kinesis/SchedulerProvider.java
@@ -0,0 +1,98 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static com.googlesource.gerrit.plugins.kinesis.Configuration.cosumerLeaseName;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.inject.Provider;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.kinesis.common.ConfigsBuilder;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.coordinator.Scheduler;
+import software.amazon.kinesis.retrieval.RetrievalConfig;
+import software.amazon.kinesis.retrieval.polling.PollingConfig;
+
+class SchedulerProvider implements Provider<Scheduler> {
+  interface Factory {
+    SchedulerProvider create(
+        String streamName,
+        boolean fromBeginning,
+        java.util.function.Consumer<EventMessage> messageProcessor);
+  }
+
+  private final ConfigsBuilder configsBuilder;
+  private final Configuration configuration;
+  private final KinesisAsyncClient kinesisAsyncClient;
+  private final String streamName;
+  private final boolean fromBeginning;
+
+  @AssistedInject
+  SchedulerProvider(
+      Configuration configuration,
+      KinesisAsyncClient kinesisAsyncClient,
+      DynamoDbAsyncClient dynamoDbAsyncClient,
+      CloudWatchAsyncClient cloudWatchAsyncClient,
+      KinesisRecordProcessorFactory.Factory kinesisRecordProcessorFactory,
+      @Assisted String streamName,
+      @Assisted boolean fromBeginning,
+      @Assisted java.util.function.Consumer<EventMessage> messageProcessor) {
+    this.configuration = configuration;
+    this.kinesisAsyncClient = kinesisAsyncClient;
+    this.streamName = streamName;
+    this.fromBeginning = fromBeginning;
+    this.configsBuilder =
+        new ConfigsBuilder(
+            streamName,
+            cosumerLeaseName(configuration.getApplicationName(), streamName),
+            kinesisAsyncClient,
+            dynamoDbAsyncClient,
+            cloudWatchAsyncClient,
+            String.format("klc-worker-%s-%s", configuration.getApplicationName(), streamName),
+            kinesisRecordProcessorFactory.create(messageProcessor));
+  }
+
+  private RetrievalConfig getRetrievalConfig() {
+    PollingConfig polling =
+        new PollingConfig(streamName, kinesisAsyncClient)
+            .idleTimeBetweenReadsInMillis(configuration.getPollingIntervalMs())
+            .maxRecords(configuration.getMaxRecords());
+    RetrievalConfig retrievalConfig =
+        configsBuilder.retrievalConfig().retrievalSpecificConfig(polling);
+    retrievalConfig.initialPositionInStreamExtended(
+        InitialPositionInStreamExtended.newInitialPosition(
+            fromBeginning
+                ? InitialPositionInStream.TRIM_HORIZON
+                : configuration.getInitialPosition()));
+    return retrievalConfig;
+  }
+
+  @Override
+  public Scheduler get() {
+    return new Scheduler(
+        configsBuilder.checkpointConfig(),
+        configsBuilder.coordinatorConfig(),
+        configsBuilder.leaseManagementConfig(),
+        configsBuilder.lifecycleConfig(),
+        configsBuilder.metricsConfig(),
+        configsBuilder.processorConfig(),
+        getRetrievalConfig());
+  }
+}
diff --git a/src/main/resources/Documentation/Build.md b/src/main/resources/Documentation/Build.md
new file mode 100644
index 0000000..ffb8b0c
--- /dev/null
+++ b/src/main/resources/Documentation/Build.md
@@ -0,0 +1,22 @@
+# Build
+
+The events-aws-kinesis plugin can be build as a regular 'in-tree' plugin. That means
+that is required to clone a Gerrit source tree first and then to have the plugin
+source directory into the `/plugins` path.
+
+Additionally, the `plugins/external_plugin_deps.bzl` file needs to be updated to
+match the events-aws-kinesis plugin one.
+
+```shell script
+git clone --recursive https://gerrit.googlesource.com/gerrit
+cd gerrit
+git clone "https://gerrit.googlesource.com/plugins/events-aws-kinesis" plugins/events-aws-kinesis
+ln -sf plugins/events-aws-kinesis/external_plugin_deps.bzl plugins/.
+bazelisk build plugins/events-aws-kinesis
+```
+
+The output is created in
+
+```
+bazel-genfiles/plugins/events-aws-kinesis/events-aws-kinesis.jar
+```
\ No newline at end of file
diff --git a/src/main/resources/Documentation/Config.md b/src/main/resources/Documentation/Config.md
new file mode 100644
index 0000000..03e3650
--- /dev/null
+++ b/src/main/resources/Documentation/Config.md
@@ -0,0 +1,112 @@
+Configuration
+=========================
+
+The events-aws-kinesis plugin is configured by adding a plugin stanza in the
+`gerrit.config` file, for example:
+
+```text
+[plugin "events-aws-kinesis"]
+    numberOfSubscribers = 6
+    pollingIntervalMs = 500
+    maxRecords = 99
+    region = us-east-1
+    endpoint = http://localhost:4566
+    applicationName = instance-1
+    initialPosition = trim_horizon
+```
+
+`plugin.events-aws-kinesis.numberOfSubscribers`
+:   Optional. The number of expected kinesis subscribers. This will be used to allocate
+    a thread pool able to run all subscribers.
+    Default: 6
+
+`plugin.events-aws-kinesis.pollingIntervalMs`
+:   Optional. How often, in milliseconds, to poll Kinesis shards to retrieve
+    records. Please note that setting this value too low might incur in
+    `ProvisionedThroughputExceededException`.
+    See [AWS docs](https://docs.aws.amazon.com/streams/latest/dev/kinesis-low-latency.html)
+    for more details on this.
+    Default: 1000
+
+`plugin.events-aws-kinesis.maxRecords`
+:   Optional. The maximum number of records to fetch from the kinesis stream
+    Default: 100
+
+`plugin.events-aws-kinesis.region`
+:   Optional. Which AWS region to connect to.
+    Default: When not specified this value is provided via the default Region
+    Provider Chain, as explained [here](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html)
+
+`plugin.events-aws-kinesis.endpoint`
+:   Optional. When defined, it will override the default kinesis endpoint and it
+    will connect to it, rather than connecting to AWS. This is useful when
+    developing or testing, in order to connect locally.
+    See [localstack](https://github.com/localstack/localstack) to understand
+    more about how run kinesis stack outside AWS.
+    Default: <empty>
+
+`plugin.events-aws-kinesis.applicationName`
+:   Optional. This value identifies the application and it must be unique within your
+    gerrit cluster to allow different gerrit nodes to have different checkpoints
+    and to consume data from the stream independently.
+    Default: events-aws-kinesis
+
+`plugin.events-aws-kinesis.initialPosition`
+:   Optional. Which point in the stream the consumer should start consuming from.
+    Note that this only applies to consumers that *do not* have any persisted
+    checkpoint (i.e. first deployments). When checkpoints exist they always
+    override this value.
+
+    Needs to be one of these values:
+
+* TRIM_HORIZON: Start streaming at the last untrimmed record in the shard, which is the oldest data record in the shard.
+* LATEST: Start streaming just after the most recent record in the shard, so that you always read the most recent data in the shard.
+
+    Default: "LATEST"
+
+`plugin.events-aws-kinesis.publishSingleRequestTimeoutMs`
+: Optional. The maximum total time (milliseconds) elapsed between when a publish
+  request started and the receiving a response. If it goes over, the request
+  will be timed-out and possibly tried again, if `publishTimeoutMs` allows.
+  Default: 6000
+
+`plugin.events-aws-kinesis.publishTimeoutMs`
+: Optional. The maximum total time (milliseconds) waiting for publishing a record
+  to kinesis, including retries.
+  If it goes over, the request will be timed-out and not attempted again.
+  Default: 6000
+
+`plugin.events-aws-kinesis.shutdownTimeoutMs`
+: Optional. The maximum total time (milliseconds) waiting when shutting down
+  kinesis consumers.
+  Default: 20000
+
+`plugin.events-aws-kinesis.awsLibLogLevel`
+: Optional. Which level AWS libraries should log at.
+  This plugin delegates most complex tasks associated to the production and
+  consumption of data to libraries developed and maintained directly by AWS.
+  This configuration specifies how verbose those libraries are allowed to be when
+  logging.
+
+  Default: WARN
+  Allowed values:OFF|FATAL|ERROR|WARN|INFO|DEBUG|TRACE|ALL
+
+`plugin.events-aws-kinesis.streamEventsTopic`
+:   Optional. Name of the kinesis topic for stream events. events-aws-kinesis
+    plugin exposes all stream events under this topic name.
+    Default: gerrit
+
+`plugin.events-aws-kinesis.sendAsync`
+:   Optional. Whether to send messages to Kinesis asynchronously, without
+    waiting for the result of the operation.
+    Note that in this case, retries will still be attempted by the producer, but
+    in a separate thread, so that Gerrit will not be blocked waiting for the
+    publishing to terminate.
+    The overall result of the operation, once available, will be logged.
+    Default: true
+
+Overrides
+=========================
+
+Note that System properties always override and take priority over the above
+gerrit.config configuration.
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
new file mode 100644
index 0000000..888c7f1
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/ConfigurationTest.java
@@ -0,0 +1,94 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import org.apache.log4j.Level;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConfigurationTest {
+  private static final String PLUGIN_NAME = "events-aws-kinesis";
+
+  @Mock private PluginConfigFactory pluginConfigFactoryMock;
+  private PluginConfig.Update pluginConfig;
+
+  @Before
+  public void setup() {
+    pluginConfig = PluginConfig.Update.forTest(PLUGIN_NAME, new Config());
+  }
+
+  @Test
+  public void shouldReadDefaultAWSLibLogLevel() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.getAwsLibLogLevel()).isEqualTo(Level.WARN);
+  }
+
+  @Test
+  public void shouldConfigureSpecificAWSLibLogLevel() {
+    pluginConfig.setString("awsLibLogLevel", "debug");
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.getAwsLibLogLevel()).isEqualTo(Level.DEBUG);
+  }
+
+  @Test
+  public void shouldFallBackToDefaultWhenMisConfigured() {
+    pluginConfig.setString("awsLibLogLevel", "foobar");
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.getAwsLibLogLevel()).isEqualTo(Level.WARN);
+  }
+
+  @Test
+  public void shouldDefaultToAsynchronousPublishing() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.isSendAsync()).isEqualTo(true);
+  }
+
+  @Test
+  public void shouldConfigureSynchronousPublishing() {
+    pluginConfig.setBoolean("sendAsync", false);
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    Configuration configuration = new Configuration(pluginConfigFactoryMock, PLUGIN_NAME);
+
+    assertThat(configuration.isSendAsync()).isEqualTo(false);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
new file mode 100644
index 0000000..7b5dded
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/kinesis/KinesisEventsIT.java
@@ -0,0 +1,238 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kinesis;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.testcontainers.containers.localstack.LocalStackContainer.Service.CLOUDWATCH;
+import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;
+import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.StreamStatus;
+
+@TestPlugin(
+    name = "events-aws-kinesis",
+    sysModule = "com.googlesource.gerrit.plugins.kinesis.Module")
+public class KinesisEventsIT extends LightweightPluginDaemonTest {
+  // This timeout is quite high to allow the kinesis coordinator to acquire a
+  // lease on the newly created stream
+  private static final Duration WAIT_FOR_CONSUMPTION = Duration.ofSeconds(120);
+  private static final Duration STREAM_CREATION_TIMEOUT = Duration.ofSeconds(10);
+
+  private static final int LOCALSTACK_PORT = 4566;
+  private LocalStackContainer localstack =
+      new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.8"))
+          .withServices(DYNAMODB, KINESIS, CLOUDWATCH)
+          .withEnv("USE_SSL", "true")
+          .withExposedPorts(LOCALSTACK_PORT);
+
+  private KinesisClient kinesisClient;
+
+  @Before
+  public void setUpTestPlugin() throws Exception {
+    localstack.start();
+
+    kinesisClient =
+        KinesisClient.builder()
+            .endpointOverride(localstack.getEndpointOverride(KINESIS))
+            .credentialsProvider(
+                StaticCredentialsProvider.create(
+                    AwsBasicCredentials.create(
+                        localstack.getAccessKey(), localstack.getSecretKey())))
+            .region(Region.of(localstack.getRegion()))
+            .build();
+
+    System.setProperty("endpoint", localstack.getEndpointOverride(KINESIS).toASCIIString());
+    System.setProperty("region", localstack.getRegion());
+    System.setProperty("aws.accessKeyId", localstack.getAccessKey());
+
+    // The secret key property name has changed from aws-sdk 1.11.x and 2.x [1]
+    // Export both names so that default credential provider chains work for both
+    // Kinesis Consumer Library (uses V2) and Kinesis Producer Library (uses v1)
+    // [1]  https://docs.aws.amazon.com/sdk-for-java/latest/migration-guide/client-credential.html
+    System.setProperty("aws.secretKey", localstack.getSecretKey());
+    System.setProperty("aws.secretAccessKey", localstack.getSecretKey());
+
+    super.setUpTestPlugin();
+  }
+
+  @Override
+  public void tearDownTestPlugin() {
+    localstack.close();
+
+    super.tearDownTestPlugin();
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  public void shouldConsumeAnEventPublishedToATopic() throws Exception {
+    String streamName = UUID.randomUUID().toString();
+    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
+
+    kinesisBroker().send(streamName, eventMessage());
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
+        .isEqualTo(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  public void shouldReplayMessages() throws Exception {
+    String streamName = UUID.randomUUID().toString();
+    createStreamAndWait(streamName, STREAM_CREATION_TIMEOUT);
+
+    EventConsumerCounter eventConsumerCounter = new EventConsumerCounter();
+    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
+
+    EventMessage event = eventMessage();
+    kinesisBroker().send(streamName, event);
+
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
+        .isEqualTo(event.getHeader().eventId);
+
+    eventConsumerCounter.clear();
+    kinesisBroker().disconnect();
+    kinesisBroker().receiveAsync(streamName, eventConsumerCounter);
+    kinesisBroker().replayAllEvents(streamName);
+
+    WaitUtil.waitUntil(
+        () -> eventConsumerCounter.getConsumedMessages().size() == 1, WAIT_FOR_CONSUMPTION);
+    assertThat(eventConsumerCounter.getConsumedMessages().get(0).getHeader().eventId)
+        .isEqualTo(event.getHeader().eventId);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  @GerritConfig(name = "plugin.events-aws-kinesis.publishTimeoutMs", value = "10000")
+  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
+  public void sendingSynchronouslyShouldRetryUntilSuccessful() {
+    String streamName = UUID.randomUUID().toString();
+    createStreamAsync(streamName);
+
+    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
+    assertThat(publishResult.isSuccess()).isTrue();
+    assertThat(publishResult.attempts()).isGreaterThan(1);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "false")
+  public void sendingSynchronouslyShouldBeUnsuccessfulWhenTimingOut() {
+    String streamName = "not-existing-stream";
+
+    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
+    assertThat(publishResult.isSuccess()).isFalse();
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
+  public void sendingAsynchronouslyShouldBeImmediatelySuccessfulEvenWhenStreamDoesNotExist() {
+    String streamName = "not-existing-stream";
+
+    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
+    assertThat(publishResult.isSuccess()).isTrue();
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-aws-kinesis.applicationName", value = "test-consumer")
+  @GerritConfig(name = "plugin.events-aws-kinesis.initialPosition", value = "trim_horizon")
+  @GerritConfig(name = "plugin.events-aws-kinesis.sendAsync", value = "true")
+  public void sendingAsynchronouslyShouldBeImmediatelySuccessful() {
+    String streamName = UUID.randomUUID().toString();
+    createStreamAsync(streamName);
+
+    PublishResult publishResult = kinesisBroker().sendWithResult(streamName, eventMessage());
+    assertThat(publishResult.isSuccess()).isTrue();
+    assertThat(publishResult.attempts()).isEqualTo(1);
+  }
+
+  public KinesisBrokerApi kinesisBroker() {
+    return (KinesisBrokerApi) plugin.getSysInjector().getInstance(BrokerApi.class);
+  }
+
+  private void createStreamAndWait(String streamName, Duration timeout)
+      throws InterruptedException {
+    createStreamAsync(streamName);
+
+    WaitUtil.waitUntil(
+        () ->
+            kinesisClient
+                .describeStream(DescribeStreamRequest.builder().streamName(streamName).build())
+                .streamDescription()
+                .streamStatus()
+                .equals(StreamStatus.ACTIVE),
+        timeout);
+  }
+
+  private void createStreamAsync(String streamName) {
+    kinesisClient.createStream(
+        CreateStreamRequest.builder().streamName(streamName).shardCount(1).build());
+  }
+
+  private EventMessage eventMessage() {
+    return new EventMessage(
+        new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), new ProjectCreatedEvent());
+  }
+
+  private static class EventConsumerCounter implements Consumer<EventMessage> {
+    List<EventMessage> consumedMessages = new ArrayList<>();
+
+    @Override
+    public void accept(EventMessage eventMessage) {
+      consumedMessages.add(eventMessage);
+    }
+
+    public List<EventMessage> getConsumedMessages() {
+      return consumedMessages;
+    }
+
+    public void clear() {
+      consumedMessages.clear();
+    }
+  }
+}