Initial master commit from gcloud-pubsub-events repo

Change-Id: I22c96402f8a1c38d8d1ce45a9149c9160e157c3c
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..9714d33
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,112 @@
+load("//tools/bzl:junit.bzl", "junit_tests")
+load(
+    "//tools/bzl:plugin.bzl",
+    "PLUGIN_DEPS",
+    "PLUGIN_TEST_DEPS",
+    "gerrit_plugin",
+)
+
+gerrit_plugin(
+    name = "events-gcloud-pubsub",
+    srcs = glob(["src/main/java/**/*.java"]),
+    manifest_entries = [
+        "Gerrit-PluginName: events-gcloud-pubsub",
+        "Gerrit-Module: com.googlesource.gerrit.plugins.pubsub.Module",
+        "Implementation-Title: Gerrit events listener to send events to an external GCloud PubSub broker",
+        "Implementation-URL: https://gerrit.googlesource.com/plugins/events-gcloud-pubsub",
+    ],
+    resources = glob(["src/main/resources/**/*"]),
+    deps = [
+        "@api-common//jar",
+        "@events-broker//jar:neverlink",
+        "@gax-grpc//jar",
+        "@gax//jar",
+        "@google-auth-library-credentials//jar",
+        "@google-auth-library-oauth2-http//jar",
+        "@google-cloud-pubsub-proto//jar",
+        "@google-cloud-pubsub//jar",
+        "@google-http-client-gson//jar",
+        "@google-http-client//jar",
+        "@grpc-alts//jar",
+        "@grpc-api//jar",
+        "@grpc-auth//jar",
+        "@grpc-context//jar",
+        "@grpc-core//jar",
+        "@grpc-netty-shaded//jar",
+        "@grpc-protobuf-lite//jar",
+        "@grpc-protobuf//jar",
+        "@grpc-stub//jar",
+        "@opencensus-api//jar",
+        "@opencensus-contrib-http-util//jar",
+        "@perfmark-api//jar",
+        "@proto-google-common-protos//jar",
+        "@proto-google-iam-v1//jar",
+        "@threetenbp//jar",
+    ],
+)
+
+junit_tests(
+    name = "events-gcloud-pubsub_tests",
+    srcs = glob(["src/test/java/**/*.java"]),
+    tags = ["events-gcloud-pubsub"],
+    deps = [
+        ":events-gcloud-pubsub__plugin_test_deps",
+        "//lib/testcontainers",
+        "@api-common//jar",
+        "@events-broker//jar",
+        "@gax-grpc//jar",
+        "@gax//jar",
+        "@google-auth-library-credentials//jar",
+        "@google-auth-library-oauth2-http//jar",
+        "@google-cloud-pubsub-proto//jar",
+        "@google-cloud-pubsub//jar",
+        "@google-http-client-gson//jar",
+        "@google-http-client//jar",
+        "@grpc-alts//jar",
+        "@grpc-api//jar",
+        "@grpc-context//jar",
+        "@grpc-core//jar",
+        "@grpc-netty-shaded//jar",
+        "@grpc-protobuf-lite//jar",
+        "@grpc-protobuf//jar",
+        "@grpc-stub//jar",
+        "@opencensus-api//jar",
+        "@opencensus-contrib-http-util//jar",
+        "@perfmark-api//jar",
+        "@proto-google-common-protos//jar",
+        "@proto-google-iam-v1//jar",
+        "@testcontainers-gcloud//jar",
+        "@threetenbp//jar",
+    ],
+)
+
+java_library(
+    name = "events-gcloud-pubsub__plugin_test_deps",
+    testonly = 1,
+    visibility = ["//visibility:public"],
+    exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
+        ":events-gcloud-pubsub__plugin",
+        "//lib/jackson:jackson-annotations",
+        "//lib/testcontainers",
+        "//lib/testcontainers:docker-java-api",
+        "//lib/testcontainers:docker-java-transport",
+        "@testcontainers-gcloud//jar",
+        "@grpc-api//jar",
+        "@gax-grpc//jar",
+        "@grpc-netty-shaded//jar",
+        "@grpc-core//jar",
+        "@threetenbp//jar",
+        "@grpc-alts//jar",
+        "@grpc-protobuf//jar",
+        "@grpc-protobuf-lite//jar",
+        "@proto-google-iam-v1//jar",
+        "@proto-google-common-protos//jar",
+        "@google-http-client//jar",
+        "@grpc-context//jar",
+        "@grpc-stub//jar",
+        "@perfmark-api//jar",
+        "@google-http-client-gson//jar",
+        "@opencensus-api//jar",
+        "@opencensus-contrib-http-util//jar",
+    ],
+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..11069ed
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..99b9edb
--- /dev/null
+++ b/README.md
@@ -0,0 +1,8 @@
+# events-gcloud-pubsub
+Gerrit event producer and consumer for [GCloud PubSub](https://cloud.google.com/pubsub/)
+
+## Build
+Information on how to build this plugin can be found [here](./src/main/resources/Documentation/build.md)
+
+## Configuration
+Information on how to configure this plugin can be found [here](./src/main/resources/Documentation/config.md)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..3616263
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,164 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+def external_plugin_deps():
+    maven_jar(
+        name = "junit-platform",
+        artifact = "org.junit.platform:junit-platform-commons:1.4.0",
+        sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
+    )
+
+    maven_jar(
+        name = "google-cloud-pubsub",
+        artifact = "com.google.cloud:google-cloud-pubsub:1.111.4",
+        sha1 = "01988db8241471b09fc317c803d20403d93d6ca5",
+    )
+
+    maven_jar(
+        name = "google-cloud-pubsub-proto",
+        artifact = "com.google.api.grpc:proto-google-cloud-pubsub-v1:1.93.4",
+        sha1 = "167bfae34ec63215ee3b9e95a4deb0b67104c021",
+    )
+
+    maven_jar(
+        name = "api-common",
+        artifact = "com.google.api:api-common:1.10.1",
+        sha1 = "d157681b5909cf959a9fa60ced9bed9da741ffef",
+    )
+
+    maven_jar(
+        name = "google-auth-library-credentials",
+        artifact = "com.google.auth:google-auth-library-credentials:0.24.1",
+        sha1 = "5f43498fae558213e27cd904944626c88cf03d03",
+    )
+
+    maven_jar(
+        name = "google-auth-library-oauth2-http",
+        artifact = "com.google.auth:google-auth-library-oauth2-http:0.24.1",
+        sha1 = "ce4ca632ff44eb9cb3033db590d4862a686db199",
+    )
+
+    maven_jar(
+        name = "gax",
+        artifact = "com.google.api:gax:1.62.0",
+        sha1 = "11e565f1a65f7e2245238ac5c19875c0ddd25b14",
+    )
+
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.3.1",
+        sha1 = "90775e671946b20e52be3a11277d1ed33973d66e",
+    )
+
+    maven_jar(
+        name = "testcontainers-gcloud",
+        artifact = "org.testcontainers:gcloud:1.15.2",
+        sha1 = "0ad02bb83edc818469e1080995cae409f5d40694",
+    )
+
+    maven_jar(
+        name = "grpc-api",
+        artifact = "io.grpc:grpc-api:1.36.0",
+        sha1 = "5a2c6286f76477a44aaf63c9f4d0f5399652885a",
+    )
+
+    maven_jar(
+        name = "gax-grpc",
+        artifact = "com.google.api:gax-grpc:1.62.0",
+        sha1 = "9c8e22dbceac414c03bfc92abc4399e82208d647",
+    )
+
+    maven_jar(
+        name = "grpc-core",
+        artifact = "io.grpc:grpc-core:1.36.0",
+        sha1 = "17a7f3287439c1d2641fabc4d767e7de4ebb05e5",
+    )
+
+    maven_jar(
+        name = "grpc-netty-shaded",
+        artifact = "io.grpc:grpc-netty-shaded:1.36.0",
+        sha1 = "575d65cd47a8c997f2014e702920d23b9f18d764",
+    )
+
+    maven_jar(
+        name = "threetenbp",
+        artifact = "org.threeten:threetenbp:1.5.0",
+        sha1 = "e18c0fb79ebee3e3907042a7f31c31404f9e546b",
+    )
+
+    maven_jar(
+        name = "grpc-alts",
+        artifact = "io.grpc:grpc-alts:1.36.0",
+        sha1 = "e41ed1f9739daa26752885304855161e8d504fa0",
+    )
+
+    maven_jar(
+        name = "grpc-protobuf",
+        artifact = "io.grpc:grpc-protobuf:1.36.0",
+        sha1 = "b579119c664bb1b50249c652ef139ef30adb68a9",
+    )
+
+    maven_jar(
+        name = "grpc-protobuf-lite",
+        artifact = "io.grpc:grpc-protobuf-lite:1.36.0",
+        sha1 = "78f16d4544b8e83b845cd91ed9c24854f9f78a8c",
+    )
+
+    maven_jar(
+        name = "proto-google-iam-v1",
+        artifact = "com.google.api.grpc:proto-google-iam-v1:1.0.9",
+        sha1 = "7a3ad48a2f9925ec8cd811dc06bca778ba9560cc",
+    )
+
+    maven_jar(
+        name = "proto-google-common-protos",
+        artifact = "com.google.api.grpc:proto-google-common-protos:2.1.0",
+        sha1 = "65234da8719aed4672298f574d7e87c1f025c4dc",
+    )
+
+    maven_jar(
+        name = "google-http-client",
+        artifact = "com.google.http-client:google-http-client:1.39.0",
+        sha1 = "936d09a3afa6911f1255ce039debeaf37928ee75",
+    )
+
+    maven_jar(
+        name = "google-http-client-gson",
+        artifact = "com.google.http-client:google-http-client-gson:1.39.0",
+        sha1 = "24ec436b2b4a27c17ccfa83a9c885a1f582e29b8",
+    )
+
+    maven_jar(
+        name = "grpc-context",
+        artifact = "io.grpc:grpc-context:1.36.0",
+        sha1 = "8d8c6e0f00ae7889f4a435c4ec0c4ad4cb99578d",
+    )
+
+    maven_jar(
+        name = "grpc-stub",
+        artifact = "io.grpc:grpc-stub:1.36.0",
+        sha1 = "7154e1fbdc9a809f158e8990999760a1bab95ed7",
+    )
+
+    maven_jar(
+        name = "perfmark-api",
+        artifact = "io.perfmark:perfmark-api:0.23.0",
+        sha1 = "0b813b7539fae6550541da8caafd6add86d4e22f",
+    )
+
+    maven_jar(
+        name = "opencensus-api",
+        artifact = "io.opencensus:opencensus-api:0.28.0",
+        sha1 = "0fc0d06a9d975a38c581dff59b99cf31db78bd99",
+    )
+
+    maven_jar(
+        name = "opencensus-contrib-http-util",
+        artifact = "io.opencensus:opencensus-contrib-http-util:0.28.0",
+        sha1 = "f6cb276330197d51dd65327fc305a3df7e622705",
+    )
+
+    maven_jar(
+        name = "grpc-auth",
+        artifact = "io.grpc:grpc-auth:1.36.0",
+        sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e",
+    )
\ No newline at end of file
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ConsumerExecutor.java
new file mode 100644
index 0000000..6993db4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ConsumerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Manager.java
new file mode 100644
index 0000000..01bd857
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Manager.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Set;
+
+@Singleton
+public class Manager implements LifecycleListener {
+
+  private final Set<TopicSubscriber> consumers;
+  private final BrokerApi brokerApi;
+  private final PubSubEventListener pubSubEventListener;
+
+  @Inject
+  public Manager(
+      Set<TopicSubscriber> consumers,
+      BrokerApi brokerApi,
+      PubSubEventListener pubSubEventListener) {
+    this.consumers = consumers;
+    this.brokerApi = brokerApi;
+    this.pubSubEventListener = pubSubEventListener;
+  }
+
+  @Override
+  public void start() {
+    consumers.forEach(
+        topicSubscriber ->
+            brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+  }
+
+  @Override
+  public void stop() {
+    brokerApi.disconnect();
+    pubSubEventListener.disconnect();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
new file mode 100644
index 0000000..32110dd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -0,0 +1,66 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.pubsub.local.EnvironmentChecker;
+import com.googlesource.gerrit.plugins.pubsub.local.LocalCredentialsProvider;
+import com.googlesource.gerrit.plugins.pubsub.local.LocalPublisherProvider;
+import com.googlesource.gerrit.plugins.pubsub.local.LocalSubscriberProvider;
+
+class Module extends FactoryModule {
+
+  private PubSubApiModule pubSubApiModule;
+  private EnvironmentChecker environmentChecker;
+
+  @Inject
+  public Module(PubSubApiModule pubSubApiModule, EnvironmentChecker environmentChecker) {
+    this.pubSubApiModule = pubSubApiModule;
+    this.environmentChecker = environmentChecker;
+  }
+
+  @Override
+  protected void configure() {
+    bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+    DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
+    DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+    factory(PubSubPublisher.Factory.class);
+    factory(PubSubEventSubscriber.Factory.class);
+
+    if (environmentChecker.isLocalEnvironment()) {
+      bind(CredentialsProvider.class)
+          .toProvider(LocalCredentialsProvider.class)
+          .in(Scopes.SINGLETON);
+      bind(SubscriberProvider.class).to(LocalSubscriberProvider.class);
+      bind(PublisherProvider.class).to(LocalPublisherProvider.class);
+    } else {
+      bind(CredentialsProvider.class)
+          .toProvider(ServiceAccountCredentialsProvider.class)
+          .in(Scopes.SINGLETON);
+      bind(SubscriberProvider.class);
+      bind(PublisherProvider.class);
+    }
+    install(pubSubApiModule);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubApiModule.java
new file mode 100644
index 0000000..ca32304
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubApiModule.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class PubSubApiModule extends LifecycleModule {
+  WorkQueue workQueue;
+  PubSubConfiguration configuration;
+
+  private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+
+  @Inject
+  public PubSubApiModule(WorkQueue workQueue, PubSubConfiguration configuration) {
+    this.workQueue = workQueue;
+    this.configuration = configuration;
+  }
+
+  /**
+   * Clients(for example multi-site plugin) of events-gcloud-pubsub library are registering
+   * consumers. Because we cannot guarantee that client plugin is loaded after the
+   * events-gcloud-pubsub we have to make sure that already registered consumers are reassigned to
+   * the events-gcloud-pubsub. This injection is optional because if events-gcloud-pubsub plugin is
+   * loaded before the client plugin no consumers are registered yet.
+   *
+   * @param previousBrokerApi
+   */
+  @Inject(optional = true)
+  public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+    if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+      this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+    }
+  }
+
+  @Override
+  protected void configure() {
+    bind(ScheduledExecutorService.class)
+        .annotatedWith(ConsumerExecutor.class)
+        .toProvider(ScheduledExecutorServiceProvider.class)
+        .in(Scopes.SINGLETON);
+
+    bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+    DynamicItem.bind(binder(), BrokerApi.class).to(PubSubBrokerApi.class).in(Scopes.SINGLETON);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
new file mode 100644
index 0000000..c989aa4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -0,0 +1,88 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+class PubSubBrokerApi implements BrokerApi {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private PubSubPublisher.Factory publisherFactory;
+  private PubSubEventSubscriber.Factory subscriberFactory;
+  private Map<String, PubSubPublisher> publishers = new ConcurrentHashMap<>();
+  private Set<PubSubEventSubscriber> subscribers;
+
+  @Inject
+  public PubSubBrokerApi(
+      PubSubPublisher.Factory publisherFactory, PubSubEventSubscriber.Factory subscriberFactory) {
+    this.publisherFactory = publisherFactory;
+    this.subscriberFactory = subscriberFactory;
+    subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+  }
+
+  @Override
+  public boolean send(String topic, EventMessage message) {
+    return publishers.computeIfAbsent(topic, t -> publisherFactory.create(t)).publish(message);
+  }
+
+  @Override
+  public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+    PubSubEventSubscriber subscriber = subscriberFactory.create(topic, eventConsumer);
+    subscribers.add(subscriber);
+    subscriber.subscribe();
+  }
+
+  @Override
+  public Set<TopicSubscriber> topicSubscribers() {
+    return subscribers.stream()
+        .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
+  public void disconnect() {
+    publishers
+        .values()
+        .forEach(
+            publisher -> {
+              try {
+                publisher.close();
+              } catch (InterruptedException e) {
+                logger.atSevere().withCause(e).log("Disconnect failed");
+              }
+            });
+
+    for (PubSubEventSubscriber subscriber : subscribers) {
+      subscriber.shutdown();
+    }
+    subscribers.clear();
+  }
+
+  @Override
+  public void replayAllEvents(String topic) {
+    subscribers.stream()
+        .filter(subscriber -> topic.equals(subscriber.getTopic()))
+        .forEach(PubSubEventSubscriber::replayMessages);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
new file mode 100644
index 0000000..54efac3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -0,0 +1,119 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PubSubConfiguration {
+  private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+  private static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
+  private static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
+  private static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
+  private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+
+  private final String gcloudProject;
+  private final String subscriptionId;
+  private final Integer numberOfSubscribers;
+  private final Boolean sendAsync;
+  private final String privateKeyLocation;
+  private final Integer ackDeadlineSeconds;
+  private final Long subscribtionTimeoutInSeconds;
+  private final Long shutdownTimeoutInSeconds;
+  private final String streamEventsTopic;
+  private final PluginConfig fromGerritConfig;
+
+  @Inject
+  public PubSubConfiguration(
+      PluginConfigFactory configFactory,
+      @PluginName String pluginName,
+      @Nullable @GerritInstanceId String instanceId) {
+    this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+    this.sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
+    this.gcloudProject = getMandatoryString("gcloudProject");
+    this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
+    this.privateKeyLocation = getMandatoryString("privateKeyLocation");
+    this.streamEventsTopic =
+        fromGerritConfig.getString("streamEventsTopic", DEFAULT_STREAM_EVENTS_TOPIC);
+    this.numberOfSubscribers =
+        Integer.parseInt(
+            fromGerritConfig.getString("numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+    this.ackDeadlineSeconds =
+        Integer.parseInt(
+            fromGerritConfig.getString("ackDeadlineSeconds", DEFAULT_ACK_DEADLINE_SECONDS));
+    this.subscribtionTimeoutInSeconds =
+        Long.parseLong(
+            fromGerritConfig.getString(
+                "subscribtionTimeoutInSeconds", DEFAULT_SUBSCTIPRION_TIMEOUT));
+    this.shutdownTimeoutInSeconds =
+        Long.parseLong(
+            fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
+  }
+
+  public Boolean isSendAsync() {
+    return sendAsync;
+  }
+
+  public String getGCloudProject() {
+    return gcloudProject;
+  }
+
+  public Integer getNumberOfSubscribers() {
+    return numberOfSubscribers;
+  }
+
+  public String getPrivateKeyLocation() {
+    return privateKeyLocation;
+  }
+
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
+  public Integer getAckDeadlineSeconds() {
+    return ackDeadlineSeconds;
+  }
+
+  public Long getSubscribtionTimeoutInSeconds() {
+    return subscribtionTimeoutInSeconds;
+  }
+
+  public Long getShutdownTimeoutInSeconds() {
+    return shutdownTimeoutInSeconds;
+  }
+
+  public String getStreamEventsTopic() {
+    return streamEventsTopic;
+  }
+
+  private String getMandatoryString(String name) throws IllegalStateException {
+    return getMandatoryString(name, null);
+  }
+
+  private String getMandatoryString(String name, String defaultValue) throws IllegalStateException {
+    String value = fromGerritConfig.getString(name, defaultValue);
+    if (value == null) {
+      throw new IllegalStateException(
+          String.format("Invalid configuration: parameter '%s' is mandatory", name));
+    }
+    return value;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventListener.java
new file mode 100644
index 0000000..29ec263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventListener.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PubSubEventListener implements EventListener {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final PubSubPublisher publisher;
+
+  @Inject
+  public PubSubEventListener(
+      PubSubPublisher.Factory publisherFactory, PubSubConfiguration configuration) {
+    this.publisher = publisherFactory.create(configuration.getStreamEventsTopic());
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    publisher.publish(event);
+  }
+
+  public void disconnect() {
+    try {
+      publisher.close();
+    } catch (InterruptedException e) {
+      logger.atSevere().withCause(e).log("Disconnect failed");
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
new file mode 100644
index 0000000..b8d8c49
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.common.flogger.FluentLogger;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.google.pubsub.v1.PubsubMessage;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+public class PubSubEventSubscriber {
+
+  public interface Factory {
+    public PubSubEventSubscriber create(String topic, Consumer<EventMessage> messageProcessor);
+  }
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Gson gson;
+  private final String topic;
+  private final Consumer<EventMessage> messageProcessor;
+  private final SubscriberProvider subscriberProvider;
+  private final PubSubConfiguration config;
+  private Subscriber subscriber;
+
+  @Inject
+  public PubSubEventSubscriber(
+      Gson gson,
+      SubscriberProvider subscriberProvider,
+      PubSubConfiguration config,
+      @Assisted String topic,
+      @Assisted Consumer<EventMessage> messageProcessor) {
+    this.gson = gson;
+    this.topic = topic;
+    this.messageProcessor = messageProcessor;
+    this.subscriberProvider = subscriberProvider;
+    this.config = config;
+  }
+
+  public void subscribe() {
+    MessageReceiver receiver =
+        (PubsubMessage message, AckReplyConsumer consumer) -> {
+          EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
+          messageProcessor.accept(event);
+          consumer.ack();
+        };
+
+    try {
+      subscriber = subscriberProvider.get(topic, receiver);
+      subscriber
+          .startAsync()
+          .awaitRunning(config.getSubscribtionTimeoutInSeconds(), TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      logger.atSevere().withCause(e).log("Timeout during subscribing to the topic %s", topic);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Exception during subscribing to the topic %s", topic);
+    }
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public Consumer<EventMessage> getMessageProcessor() {
+    return messageProcessor;
+  }
+
+  public void replayMessages() {
+    subscriberProvider.replayMessages(subscriber.getSubscriptionNameString());
+  }
+
+  public void shutdown() {
+    try {
+      subscriber
+          .stopAsync()
+          .awaitTerminated(config.getShutdownTimeoutInSeconds(), TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      logger.atSevere().withCause(e).log("Timeout during subscriber shutdown");
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..24ef602
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -0,0 +1,94 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class PubSubPublisher {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  public interface Factory {
+    public PubSubPublisher create(String topic);
+  }
+
+  private final Gson gson;
+  private final Publisher publisher;
+  private final PubSubConfiguration pubSubProperties;
+
+  @Inject
+  public PubSubPublisher(
+      PubSubConfiguration pubSubProperties,
+      PublisherProvider publisherProvider,
+      Gson gson,
+      @Assisted String topic)
+      throws IOException {
+    this.gson = gson;
+    this.publisher = publisherProvider.get(topic);
+    this.pubSubProperties = pubSubProperties;
+  }
+
+  public boolean publish(Event event) {
+    return publish(gson.toJson(event));
+  }
+
+  public boolean publish(EventMessage event) {
+    return publish(gson.toJson(event));
+  }
+
+  private boolean publish(String eventPayload) {
+    ByteString data = ByteString.copyFromUtf8(eventPayload);
+    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
+    if (pubSubProperties.isSendAsync()) {
+      return publishAsync(pubsubMessage) != null;
+    }
+
+    publishSync(pubsubMessage);
+    return true;
+  }
+
+  private ApiFuture<String> publishAsync(PubsubMessage pubsubMessage) {
+    return publisher.publish(pubsubMessage);
+  }
+
+  private void publishSync(PubsubMessage pubsubMessage) {
+    try {
+      ApiFuture<String> messageIdFuture = publishAsync(pubsubMessage);
+      messageIdFuture.get(1000, TimeUnit.SECONDS);
+
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      logger.atSevere().withCause(e).log("Cannot send the message");
+    }
+  }
+
+  public void close() throws InterruptedException {
+    if (publisher != null) {
+      publisher.shutdown();
+      publisher.awaitTermination(1, TimeUnit.MINUTES);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PublisherProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PublisherProvider.java
new file mode 100644
index 0000000..a7096d0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PublisherProvider.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+
+@Singleton
+public class PublisherProvider {
+
+  protected CredentialsProvider credentials;
+  protected PubSubConfiguration config;
+
+  @Inject
+  public PublisherProvider(CredentialsProvider credentials, PubSubConfiguration config) {
+    this.credentials = credentials;
+    this.config = config;
+  }
+
+  public Publisher get(String topic) throws IOException {
+    return Publisher.newBuilder(TopicName.of(config.getGCloudProject(), topic))
+        .setCredentialsProvider(credentials)
+        .build();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/ScheduledExecutorServiceProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ScheduledExecutorServiceProvider.java
new file mode 100644
index 0000000..1e93288
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ScheduledExecutorServiceProvider.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class ScheduledExecutorServiceProvider implements Provider<ScheduledExecutorService> {
+  private WorkQueue workQueue;
+  private PubSubConfiguration configuration;
+
+  @Inject
+  public ScheduledExecutorServiceProvider(WorkQueue workQueue, PubSubConfiguration configuration) {
+    this.workQueue = workQueue;
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ScheduledExecutorService get() {
+    return workQueue.createQueue(configuration.getNumberOfSubscribers(), "pubsub-subscriber");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/ServiceAccountCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ServiceAccountCredentialsProvider.java
new file mode 100644
index 0000000..717b2bc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ServiceAccountCredentialsProvider.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class ServiceAccountCredentialsProvider implements Provider<CredentialsProvider> {
+  private CredentialsProvider credentials;
+
+  @Inject
+  public ServiceAccountCredentialsProvider(PubSubConfiguration pubSubProperties)
+      throws FileNotFoundException, IOException {
+    this.credentials =
+        FixedCredentialsProvider.create(
+            ServiceAccountCredentials.fromStream(
+                new FileInputStream(pubSubProperties.getPrivateKeyLocation())));
+  }
+
+  @Override
+  public CredentialsProvider get() {
+    return credentials;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
new file mode 100644
index 0000000..31024d8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
@@ -0,0 +1,130 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.rpc.NotFoundException;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.GetSubscriptionRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.SeekRequest;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SubscriberProvider {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  protected CredentialsProvider credentials;
+  protected PubSubConfiguration pubSubProperties;
+  protected ScheduledExecutorService executor;
+
+  @Inject
+  public SubscriberProvider(
+      CredentialsProvider credentials,
+      PubSubConfiguration pubSubProperties,
+      @ConsumerExecutor ScheduledExecutorService executor) {
+    this.credentials = credentials;
+    this.pubSubProperties = pubSubProperties;
+    this.executor = executor;
+  }
+
+  public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
+    return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+        .setExecutorProvider(FixedExecutorProvider.create(executor))
+        .setCredentialsProvider(credentials)
+        .build();
+  }
+
+  protected SubscriptionAdminSettings createSubscriptionAdminSettings() throws IOException {
+    return SubscriptionAdminSettings.newBuilder().setCredentialsProvider(credentials).build();
+  }
+
+  protected Subscription getOrCreateSubscription(String topicId) throws IOException {
+    try (SubscriptionAdminClient subscriptionAdminClient =
+        SubscriptionAdminClient.create(createSubscriptionAdminSettings())) {
+      String subscriptionName =
+          String.format("%s-%s", pubSubProperties.getSubscriptionId(), topicId);
+      ProjectSubscriptionName projectSubscriptionName =
+          ProjectSubscriptionName.of(pubSubProperties.getGCloudProject(), subscriptionName);
+
+      return getSubscription(subscriptionAdminClient, projectSubscriptionName)
+          .orElseGet(
+              () ->
+                  subscriptionAdminClient.createSubscription(
+                      createSubscriptionRequest(projectSubscriptionName, topicId)));
+    }
+  }
+
+  protected Subscription createSubscriptionRequest(
+      ProjectSubscriptionName projectSubscriptionName, String topicId) {
+    return Subscription.newBuilder()
+        .setName(projectSubscriptionName.toString())
+        .setTopic(TopicName.of(pubSubProperties.getGCloudProject(), topicId).toString())
+        .setAckDeadlineSeconds(pubSubProperties.getAckDeadlineSeconds())
+        .setRetainAckedMessages(true)
+        .build();
+  }
+
+  protected Optional<Subscription> getSubscription(
+      SubscriptionAdminClient subscriptionAdminClient,
+      ProjectSubscriptionName projectSubscriptionName) {
+    try {
+      // we should use subscriptionAdminClient.listSubscriptions but for local setup this method
+      // throws UNKNOWN_EXCEPTION
+      return Optional.of(subscriptionAdminClient.getSubscription(projectSubscriptionName));
+    } catch (NotFoundException e) {
+      return Optional.empty();
+    }
+  }
+
+  public void replayMessages(String subscriptionName) {
+    try (SubscriptionAdminClient subscriptionAdminClient =
+        SubscriptionAdminClient.create(createSubscriptionAdminSettings())) {
+      Duration messageRetentionDuration =
+          subscriptionAdminClient
+              .getSubscription(
+                  GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build())
+              .getMessageRetentionDuration();
+      LocalDateTime retentionTime =
+          LocalDateTime.now().minusSeconds(messageRetentionDuration.getSeconds());
+      Timestamp retentionTimeEpoch =
+          Timestamp.newBuilder()
+              .setSeconds(retentionTime.atZone(ZoneOffset.UTC).toEpochSecond())
+              .build();
+
+      SeekRequest request =
+          SeekRequest.newBuilder()
+              .setSubscription(subscriptionName)
+              .setTime(retentionTimeEpoch)
+              .build();
+      subscriptionAdminClient.seek(request);
+    } catch (IOException e) {
+      logger.atSevere().withCause(e).log("Cannot replay messages");
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/EnvironmentChecker.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/EnvironmentChecker.java
new file mode 100644
index 0000000..0587188
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/EnvironmentChecker.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.inject.Singleton;
+import java.util.Optional;
+
+@Singleton
+public class EnvironmentChecker {
+  public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
+
+  private Optional<String> hostPort;
+
+  public EnvironmentChecker() {
+    this.hostPort =
+        Optional.ofNullable(System.getenv(PUBSUB_EMULATOR_HOST))
+            .or(() -> Optional.ofNullable(System.getProperty(PUBSUB_EMULATOR_HOST)));
+  }
+
+  public Optional<String> getLocalHostAndPort() {
+    return hostPort;
+  }
+
+  public Boolean isLocalEnvironment() {
+    return getLocalHostAndPort().isPresent();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalCredentialsProvider.java
new file mode 100644
index 0000000..b7c1c94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalCredentialsProvider.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.inject.Provider;
+
+public class LocalCredentialsProvider implements Provider<CredentialsProvider> {
+
+  CredentialsProvider credentials;
+
+  public LocalCredentialsProvider() {
+    this.credentials =
+        FixedCredentialsProvider.create(NoCredentialsProvider.create().getCredentials());
+  }
+
+  @Override
+  public CredentialsProvider get() {
+    return credentials;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalPublisherProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalPublisherProvider.java
new file mode 100644
index 0000000..d5475cb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalPublisherProvider.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.AlreadyExistsException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.inject.Inject;
+import com.google.pubsub.v1.TopicName;
+import com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration;
+import com.googlesource.gerrit.plugins.pubsub.PublisherProvider;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+
+public class LocalPublisherProvider extends PublisherProvider {
+  private EnvironmentChecker environmentChecker;
+
+  @Inject
+  public LocalPublisherProvider(
+      CredentialsProvider credentials,
+      PubSubConfiguration pubSubProperties,
+      EnvironmentChecker environmentChecker) {
+    super(credentials, pubSubProperties);
+    this.environmentChecker = environmentChecker;
+  }
+
+  @Override
+  public Publisher get(String topic) throws IOException {
+    ManagedChannel channel =
+        ManagedChannelBuilder.forTarget(environmentChecker.getLocalHostAndPort().get())
+            .usePlaintext()
+            .build();
+    TransportChannelProvider channelProvider =
+        FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+    createTopic(channel, config.getGCloudProject(), topic);
+    return Publisher.newBuilder(TopicName.of(config.getGCloudProject(), topic))
+        .setChannelProvider(channelProvider)
+        .setCredentialsProvider(credentials)
+        .build();
+  }
+
+  private static void createTopic(ManagedChannel channel, String project, String topicId)
+      throws IOException {
+
+    TopicAdminSettings topicAdminSettings =
+        TopicAdminSettings.newBuilder()
+            .setTransportChannelProvider(
+                FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)))
+            .setCredentialsProvider(NoCredentialsProvider.create())
+            .build();
+    try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
+      TopicName topicName = TopicName.of(project, topicId);
+      topicAdminClient.createTopic(topicName);
+    } catch (AlreadyExistsException e) {
+      // topic already exists do nothing
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
new file mode 100644
index 0000000..a0da1e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
@@ -0,0 +1,97 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.AlreadyExistsException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.inject.Inject;
+import com.google.pubsub.v1.TopicName;
+import com.googlesource.gerrit.plugins.pubsub.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration;
+import com.googlesource.gerrit.plugins.pubsub.SubscriberProvider;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class LocalSubscriberProvider extends SubscriberProvider {
+  private EnvironmentChecker environmentChecker;
+
+  @Inject
+  public LocalSubscriberProvider(
+      PubSubConfiguration pubSubProperties,
+      CredentialsProvider credentials,
+      EnvironmentChecker environmentChecker,
+      @ConsumerExecutor ScheduledExecutorService executor) {
+    super(credentials, pubSubProperties, executor);
+    this.environmentChecker = environmentChecker;
+  }
+
+  @Override
+  public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
+    TransportChannelProvider channelProvider = createChannelProvider();
+    createTopic(channelProvider, pubSubProperties.getGCloudProject(), topic);
+    return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+        .setChannelProvider(channelProvider)
+        .setExecutorProvider(FixedExecutorProvider.create(executor))
+        .setCredentialsProvider(credentials)
+        .build();
+  }
+
+  @Override
+  protected SubscriptionAdminSettings createSubscriptionAdminSettings() throws IOException {
+    TransportChannelProvider channelProvider = createChannelProvider();
+    return SubscriptionAdminSettings.newBuilder()
+        .setTransportChannelProvider(channelProvider)
+        .setCredentialsProvider(credentials)
+        .build();
+  }
+
+  private TransportChannelProvider createChannelProvider() {
+    ManagedChannel channel =
+        ManagedChannelBuilder.forTarget(environmentChecker.getLocalHostAndPort().get())
+            .usePlaintext()
+            .build();
+    TransportChannelProvider channelProvider =
+        FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+    return channelProvider;
+  }
+
+  private static void createTopic(
+      TransportChannelProvider channelProvider, String project, String topicId) throws IOException {
+
+    TopicAdminSettings topicAdminSettings =
+        TopicAdminSettings.newBuilder()
+            .setTransportChannelProvider(channelProvider)
+            .setCredentialsProvider(NoCredentialsProvider.create())
+            .build();
+    try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
+      TopicName topicName = TopicName.of(project, topicId);
+      topicAdminClient.createTopic(topicName);
+    } catch (AlreadyExistsException e) {
+      // topic already exists do nothing
+    }
+  }
+}
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
new file mode 100644
index 0000000..73f1a9f
--- /dev/null
+++ b/src/main/resources/Documentation/build.md
@@ -0,0 +1,42 @@
+# Build
+
+The events-gcloud-pubsub plugin can be built as a regular 'in-tree' plugin. That means
+that is required to clone a Gerrit source tree first and then to have the plugin
+source directory into the `/plugins` path.
+
+Additionally, the `plugins/external_plugin_deps.bzl` file needs to be updated to
+match the events-gcloud-pubsub plugin one.
+
+```shell script
+git clone --recursive https://gerrit.googlesource.com/gerrit
+cd gerrit
+git clone "https://gerrit.googlesource.com/plugins/events-gcloud-pubsub" plugins/events-gcloud-pubsub
+ln -sf ../plugins/events-gcloud-pubsub/external_plugin_deps.bzl plugins/.
+bazelisk build plugins/events-gcloud-pubsub
+```
+
+The output is created in
+
+```
+bazel-bin/plugins/events-gcloud-pubsub/events-gcloud-pubsub.jar
+```
+
+This project can be imported into the Eclipse IDE.
+Add the plugin name to the `CUSTOM_PLUGINS` set in
+Gerrit core in `tools/bzl/plugins.bzl`, and execute:
+
+```
+  ./tools/eclipse/project.py
+```
+
+To execute the tests run either one of:
+
+```
+  bazelisk test --test_tag_filters=@PLUGIN@ //...
+  bazelisk test plugins/@PLUGIN@:@PLUGIN@_tests
+```
+Tests prerequisite:
+* Docker
+
+How to build the Gerrit Plugin API is described in the [Gerrit
+documentation](../../../Documentation/dev-bazel.html#_extension_and_plugin_api_jar_files).
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
new file mode 100644
index 0000000..2e93838
--- /dev/null
+++ b/src/main/resources/Documentation/config.md
@@ -0,0 +1,59 @@
+Configuration
+=========================
+
+The events-gcloud-pubsub plugin is configured by adding a plugin stanza in the
+`gerrit.config` file, for example:
+
+```text
+[plugin "events-gcloud-pubsub"]
+    numberOfSubscribers = 6
+    subscriptionId = instance-1
+    gcloudProject = test_project
+    privateKeyLocation = /var/gerrit/etc/secured_key.json
+
+```
+
+`plugin.events-gcloud-pubsub.gcloudProject`
+:   GCloud [project name](https://cloud.google.com/docs/overview#projects)
+
+`plugin.events-gcloud-pubsub.subscriptionId`
+:   Conditional. This value identifies the subscriber and it must be unique within your
+    Gerrit cluster to allow different Gerrit nodes to consume data from the
+    stream independently. It can be omitted when `gerrit.instanceId` is
+    configured, otherwise it is mandatory.
+    Default: `gerrit.instanceId` value (when defined)
+    See also: [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit.instanceId)
+
+`plugin.events-gcloud-pubsub.privateKeyLocation`
+:   Path to the JSON file that contains service account key. The file
+    should be readable only by the daemon process because it contains information
+    that wouldn’t normally be exposed to everyone.
+
+`plugin.events-gcloud-pubsub.numberOfSubscribers`
+:   Optional. The number of expected events-gcloud-pubsub subscribers. This will be used
+    to allocate a thread pool able to run all subscribers.
+    Default: 6
+
+`plugin.events-gcloud-pubsub.sendAsync`
+:   Optional. Send messages to GCloud PubSub asynchronously, detaching the calling
+    process from the acknowledge of the message being sent.
+    The drawback of the enabling the sendAsync parameter is that the broker would only
+    return the status of the successful invocation of the message send operation and not
+    the actual status received by the broker. This means that when sendAsync is enabled
+    'broker_msg_publisher_failure_counter' metric is not incremented when message send
+    failure occurs.
+    Default: true
+
+`plugin.events-gcloud-pubsub.ackDeadlineSeconds`
+:   Optional. The approximate amount of time (on a best-effort basis) Pub/Sub waits for
+    the subscriber to acknowledge receipt before resending the message.
+    Default: 10
+
+`plugin.events-gcloud-pubsub.subscribtionTimeoutInSeconds`
+:   Optional. Maximum time in seconds to wait for the subscriber to connect to GCloud PubSub topic.
+    Default: 10
+
+`plugin.events-gcloud-pubsub.streamEventsTopic`
+:   Optional. Name of the GCloud PubSub topic for stream events. events-gcloud-pubsub plugin exposes
+    all stream events under this topic name.
+    Default: gerrit
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
new file mode 100644
index 0000000..08a200b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -0,0 +1,262 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.NoHttpd;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.TopicName;
+import com.googlesource.gerrit.plugins.pubsub.local.EnvironmentChecker;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Test;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@NoHttpd
+@TestPlugin(
+    name = "events-gcloud-pubsub",
+    sysModule = "com.googlesource.gerrit.plugins.pubsub.Module")
+public class PubSubBrokerApiIT extends LightweightPluginDaemonTest {
+  private static final String PROJECT_ID = "test_project";
+  private static final String TOPIC_ID = "test_topic";
+  private static final String SUBSCRIPTION_ID = "test_subscription_id";
+
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
+  private static final String PRIVATE_KEY_LOCATION = "not used in test";
+
+  @Inject private Gson gson;
+
+  private TransportChannelProvider channelProvider;
+  private NoCredentialsProvider credentialsProvider;
+  private ManagedChannel channel;
+
+  private BrokerApi objectUnderTest;
+
+  public PubSubEmulatorContainer emulator =
+      new PubSubEmulatorContainer(
+          DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators"));
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    emulator.start();
+    String hostport = emulator.getEmulatorEndpoint();
+    System.setProperty(EnvironmentChecker.PUBSUB_EMULATOR_HOST, hostport);
+    channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
+    channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+    credentialsProvider = NoCredentialsProvider.create();
+
+    createTopic(TOPIC_ID, channelProvider, credentialsProvider);
+
+    super.setUpTestPlugin();
+
+    objectUnderTest = plugin.getSysInjector().getInstance(BrokerApi.class);
+  }
+
+  @Override
+  public void tearDownTestPlugin() {
+    channel.shutdown();
+    emulator.close();
+    super.tearDownTestPlugin();
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+  @GerritConfig(
+      name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+      value = PRIVATE_KEY_LOCATION)
+  public void shouldSendEvent() throws IOException {
+    createSubscription(SUBSCRIPTION_ID, TOPIC_ID, channelProvider, credentialsProvider);
+    UUID id = UUID.randomUUID();
+    Event event = new ProjectCreatedEvent();
+    EventMessage eventMessage = new EventMessage(new EventMessage.Header(id, id), event);
+    String expectedMessageJson = gson.toJson(eventMessage);
+
+    objectUnderTest.send(TOPIC_ID, eventMessage);
+
+    readMessageAndValidate(
+        (pullResponse) -> {
+          assertThat(pullResponse.getReceivedMessagesList()).hasSize(1);
+          assertThat(pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8())
+              .isEqualTo(expectedMessageJson);
+        });
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+  @GerritConfig(
+      name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+      value = PRIVATE_KEY_LOCATION)
+  public void shouldProduceStreamEvents() throws Exception {
+    String subscriptionId = "gerrit-subscription-id";
+    String topicId = "gerrit";
+    createSubscription(subscriptionId, topicId, channelProvider, credentialsProvider);
+
+    createChange();
+
+    readMessageAndValidate(
+        (pullResponse) -> {
+          List<ReceivedMessage> messages = pullResponse.getReceivedMessagesList();
+          assertThat(messages).hasSize(4);
+          Map<String, Long> messageTypeCount =
+              messages.stream()
+                  .map(m -> gson.fromJson(m.getMessage().getData().toStringUtf8(), Map.class))
+                  .map(m -> m.get("type").toString())
+                  .collect(groupingBy(t -> t, counting()));
+
+          assertThat(messageTypeCount.get(RefUpdatedEvent.TYPE)).isEqualTo(3);
+          assertThat(messageTypeCount.get("patchset-created")).isEqualTo(1);
+        },
+        PROJECT_ID,
+        subscriptionId);
+  }
+
+  @Test
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+  @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+  @GerritConfig(
+      name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+      value = PRIVATE_KEY_LOCATION)
+  public void shouldConsumeEvent() throws InterruptedException {
+    UUID id = UUID.randomUUID();
+    Event event = new ProjectCreatedEvent();
+    EventMessage eventMessage = new EventMessage(new EventMessage.Header(id, id), event);
+    String expectedMessageJson = gson.toJson(eventMessage);
+    TestConsumer consumer = new TestConsumer();
+
+    objectUnderTest.receiveAsync(TOPIC_ID, consumer);
+
+    objectUnderTest.send(TOPIC_ID, eventMessage);
+
+    WaitUtil.waitUntil(
+        () ->
+            consumer.getMessage() != null
+                && expectedMessageJson.equals(gson.toJson(consumer.getMessage())),
+        TEST_TIMEOUT);
+  }
+
+  private void readMessageAndValidate(Consumer<PullResponse> validate) throws IOException {
+    readMessageAndValidate(validate, PROJECT_ID, SUBSCRIPTION_ID);
+  }
+
+  private void readMessageAndValidate(
+      Consumer<PullResponse> validate, String projectId, String subscriptionId) throws IOException {
+    SubscriberStubSettings subscriberStubSettings =
+        SubscriberStubSettings.newBuilder()
+            .setTransportChannelProvider(channelProvider)
+            .setCredentialsProvider(credentialsProvider)
+            .build();
+    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
+      PullRequest pullRequest =
+          PullRequest.newBuilder()
+              .setMaxMessages(Integer.MAX_VALUE) // make sure that we read all messages
+              .setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
+              .build();
+      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
+
+      validate.accept(pullResponse);
+    }
+  }
+
+  private void createTopic(
+      String topicId,
+      TransportChannelProvider channelProvider,
+      NoCredentialsProvider credentialsProvider)
+      throws IOException {
+    TopicAdminSettings topicAdminSettings =
+        TopicAdminSettings.newBuilder()
+            .setTransportChannelProvider(channelProvider)
+            .setCredentialsProvider(credentialsProvider)
+            .build();
+    try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
+      TopicName topicName = TopicName.of(PROJECT_ID, topicId);
+      topicAdminClient.createTopic(topicName);
+    }
+  }
+
+  private void createSubscription(
+      String subscriptionId,
+      String topicId,
+      TransportChannelProvider channelProvider,
+      NoCredentialsProvider credentialsProvider)
+      throws IOException {
+    SubscriptionAdminSettings subscriptionAdminSettings =
+        SubscriptionAdminSettings.newBuilder()
+            .setTransportChannelProvider(channelProvider)
+            .setCredentialsProvider(credentialsProvider)
+            .build();
+    SubscriptionAdminClient subscriptionAdminClient =
+        SubscriptionAdminClient.create(subscriptionAdminSettings);
+    ProjectSubscriptionName subscriptionName =
+        ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
+    subscriptionAdminClient
+        .createSubscription(
+            subscriptionName,
+            TopicName.of(PROJECT_ID, topicId),
+            PushConfig.getDefaultInstance(),
+            10)
+        .getName();
+  }
+
+  private class TestConsumer implements Consumer<EventMessage> {
+    private EventMessage msg;
+
+    @Override
+    public void accept(EventMessage msg) {
+      this.msg = msg;
+    }
+
+    public EventMessage getMessage() {
+      return msg;
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfigurationTest.java
new file mode 100644
index 0000000..a1834ad
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfigurationTest.java
@@ -0,0 +1,146 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubConfigurationTest {
+  private static final String PLUGIN_NAME = "events-gcloud-pubsub";
+  private static final String subscriptionId = "some-subscription-id";
+  private static final String gerritInstanceId = "some-gerrit-id";
+  private static final String gCloudProject = "gcloud-test-project";
+  private static final String privateKeyLocation = "/some/path";
+
+  private PluginConfig.Update pluginConfig;
+  @Mock private PluginConfigFactory pluginConfigFactoryMock;
+
+  @Before
+  public void setup() {
+    pluginConfig = PluginConfig.Update.forTest(PLUGIN_NAME, new Config());
+    pluginConfig.setString("subscriptionId", subscriptionId);
+    pluginConfig.setString("gcloudProject", gCloudProject);
+    pluginConfig.setString("privateKeyLocation", privateKeyLocation);
+  }
+
+  @Test
+  public void shouldUseSubscriptionIdWhenConfiguredEvenIfGerritInstanceIdIsNull() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    PubSubConfiguration configuration =
+        new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, null);
+
+    assertThat(configuration.getSubscriptionId()).isEqualTo(subscriptionId);
+  }
+
+  @Test
+  public void shouldUseSubscriptionIdWhenConfiguredEvenIfGerritInstanceIdIsDefined() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    PubSubConfiguration configuration =
+        new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+    assertThat(configuration.getSubscriptionId()).isEqualTo(subscriptionId);
+  }
+
+  @Test
+  public void shouldUseGerritInstanceIdWhenSubscriptionIdIsEmpty() {
+    pluginConfig.setString("subscriptionId", "");
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    PubSubConfiguration configuration =
+        new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+    assertThat(configuration.getSubscriptionId()).isEqualTo(gerritInstanceId);
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenSubscriptionIdIsNotDefinedAndGerritInstanceIdIsNull() {
+    pluginConfig.setString("subscriptionId", "");
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    IllegalStateException thrown =
+        assertThrows(
+            IllegalStateException.class,
+            () -> new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, null));
+
+    assertThat(thrown).hasMessageThat().contains("parameter 'subscriptionId' is mandatory");
+  }
+
+  @Test
+  public void shouldReadGCloudProjectWhenConfigured() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    PubSubConfiguration configuration =
+        new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+    assertThat(configuration.getGCloudProject()).isEqualTo(gCloudProject);
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenGCloudProjectIsNotDefined() {
+    pluginConfig.setString("gcloudProject", "");
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    IllegalStateException thrown =
+        assertThrows(
+            IllegalStateException.class,
+            () -> new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId));
+
+    assertThat(thrown).hasMessageThat().contains("parameter 'gcloudProject' is mandatory");
+  }
+
+  @Test
+  public void shouldReadPrivateKeyLocationWhenConfigured() {
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    PubSubConfiguration configuration =
+        new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+    assertThat(configuration.getPrivateKeyLocation()).isEqualTo(privateKeyLocation);
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenPrivateKeyLocationIsNotDefined() {
+    pluginConfig.setString("privateKeyLocation", "");
+    when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+        .thenReturn(pluginConfig.asPluginConfig());
+
+    IllegalStateException thrown =
+        assertThrows(
+            IllegalStateException.class,
+            () -> new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId));
+
+    assertThat(thrown).hasMessageThat().contains("parameter 'privateKeyLocation' is mandatory");
+  }
+}