Initial master commit from gcloud-pubsub-events repo
Change-Id: I22c96402f8a1c38d8d1ce45a9149c9160e157c3c
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..9714d33
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,112 @@
+load("//tools/bzl:junit.bzl", "junit_tests")
+load(
+ "//tools/bzl:plugin.bzl",
+ "PLUGIN_DEPS",
+ "PLUGIN_TEST_DEPS",
+ "gerrit_plugin",
+)
+
+gerrit_plugin(
+ name = "events-gcloud-pubsub",
+ srcs = glob(["src/main/java/**/*.java"]),
+ manifest_entries = [
+ "Gerrit-PluginName: events-gcloud-pubsub",
+ "Gerrit-Module: com.googlesource.gerrit.plugins.pubsub.Module",
+ "Implementation-Title: Gerrit events listener to send events to an external GCloud PubSub broker",
+ "Implementation-URL: https://gerrit.googlesource.com/plugins/events-gcloud-pubsub",
+ ],
+ resources = glob(["src/main/resources/**/*"]),
+ deps = [
+ "@api-common//jar",
+ "@events-broker//jar:neverlink",
+ "@gax-grpc//jar",
+ "@gax//jar",
+ "@google-auth-library-credentials//jar",
+ "@google-auth-library-oauth2-http//jar",
+ "@google-cloud-pubsub-proto//jar",
+ "@google-cloud-pubsub//jar",
+ "@google-http-client-gson//jar",
+ "@google-http-client//jar",
+ "@grpc-alts//jar",
+ "@grpc-api//jar",
+ "@grpc-auth//jar",
+ "@grpc-context//jar",
+ "@grpc-core//jar",
+ "@grpc-netty-shaded//jar",
+ "@grpc-protobuf-lite//jar",
+ "@grpc-protobuf//jar",
+ "@grpc-stub//jar",
+ "@opencensus-api//jar",
+ "@opencensus-contrib-http-util//jar",
+ "@perfmark-api//jar",
+ "@proto-google-common-protos//jar",
+ "@proto-google-iam-v1//jar",
+ "@threetenbp//jar",
+ ],
+)
+
+junit_tests(
+ name = "events-gcloud-pubsub_tests",
+ srcs = glob(["src/test/java/**/*.java"]),
+ tags = ["events-gcloud-pubsub"],
+ deps = [
+ ":events-gcloud-pubsub__plugin_test_deps",
+ "//lib/testcontainers",
+ "@api-common//jar",
+ "@events-broker//jar",
+ "@gax-grpc//jar",
+ "@gax//jar",
+ "@google-auth-library-credentials//jar",
+ "@google-auth-library-oauth2-http//jar",
+ "@google-cloud-pubsub-proto//jar",
+ "@google-cloud-pubsub//jar",
+ "@google-http-client-gson//jar",
+ "@google-http-client//jar",
+ "@grpc-alts//jar",
+ "@grpc-api//jar",
+ "@grpc-context//jar",
+ "@grpc-core//jar",
+ "@grpc-netty-shaded//jar",
+ "@grpc-protobuf-lite//jar",
+ "@grpc-protobuf//jar",
+ "@grpc-stub//jar",
+ "@opencensus-api//jar",
+ "@opencensus-contrib-http-util//jar",
+ "@perfmark-api//jar",
+ "@proto-google-common-protos//jar",
+ "@proto-google-iam-v1//jar",
+ "@testcontainers-gcloud//jar",
+ "@threetenbp//jar",
+ ],
+)
+
+java_library(
+ name = "events-gcloud-pubsub__plugin_test_deps",
+ testonly = 1,
+ visibility = ["//visibility:public"],
+ exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
+ ":events-gcloud-pubsub__plugin",
+ "//lib/jackson:jackson-annotations",
+ "//lib/testcontainers",
+ "//lib/testcontainers:docker-java-api",
+ "//lib/testcontainers:docker-java-transport",
+ "@testcontainers-gcloud//jar",
+ "@grpc-api//jar",
+ "@gax-grpc//jar",
+ "@grpc-netty-shaded//jar",
+ "@grpc-core//jar",
+ "@threetenbp//jar",
+ "@grpc-alts//jar",
+ "@grpc-protobuf//jar",
+ "@grpc-protobuf-lite//jar",
+ "@proto-google-iam-v1//jar",
+ "@proto-google-common-protos//jar",
+ "@google-http-client//jar",
+ "@grpc-context//jar",
+ "@grpc-stub//jar",
+ "@perfmark-api//jar",
+ "@google-http-client-gson//jar",
+ "@opencensus-api//jar",
+ "@opencensus-contrib-http-util//jar",
+ ],
+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..11069ed
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..99b9edb
--- /dev/null
+++ b/README.md
@@ -0,0 +1,8 @@
+# events-gcloud-pubsub
+Gerrit event producer and consumer for [GCloud PubSub](https://cloud.google.com/pubsub/)
+
+## Build
+Information on how to build this plugin can be found [here](./src/main/resources/Documentation/build.md)
+
+## Configuration
+Information on how to configure this plugin can be found [here](./src/main/resources/Documentation/config.md)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..3616263
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,164 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+def external_plugin_deps():
+ maven_jar(
+ name = "junit-platform",
+ artifact = "org.junit.platform:junit-platform-commons:1.4.0",
+ sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
+ )
+
+ maven_jar(
+ name = "google-cloud-pubsub",
+ artifact = "com.google.cloud:google-cloud-pubsub:1.111.4",
+ sha1 = "01988db8241471b09fc317c803d20403d93d6ca5",
+ )
+
+ maven_jar(
+ name = "google-cloud-pubsub-proto",
+ artifact = "com.google.api.grpc:proto-google-cloud-pubsub-v1:1.93.4",
+ sha1 = "167bfae34ec63215ee3b9e95a4deb0b67104c021",
+ )
+
+ maven_jar(
+ name = "api-common",
+ artifact = "com.google.api:api-common:1.10.1",
+ sha1 = "d157681b5909cf959a9fa60ced9bed9da741ffef",
+ )
+
+ maven_jar(
+ name = "google-auth-library-credentials",
+ artifact = "com.google.auth:google-auth-library-credentials:0.24.1",
+ sha1 = "5f43498fae558213e27cd904944626c88cf03d03",
+ )
+
+ maven_jar(
+ name = "google-auth-library-oauth2-http",
+ artifact = "com.google.auth:google-auth-library-oauth2-http:0.24.1",
+ sha1 = "ce4ca632ff44eb9cb3033db590d4862a686db199",
+ )
+
+ maven_jar(
+ name = "gax",
+ artifact = "com.google.api:gax:1.62.0",
+ sha1 = "11e565f1a65f7e2245238ac5c19875c0ddd25b14",
+ )
+
+ maven_jar(
+ name = "events-broker",
+ artifact = "com.gerritforge:events-broker:3.3.1",
+ sha1 = "90775e671946b20e52be3a11277d1ed33973d66e",
+ )
+
+ maven_jar(
+ name = "testcontainers-gcloud",
+ artifact = "org.testcontainers:gcloud:1.15.2",
+ sha1 = "0ad02bb83edc818469e1080995cae409f5d40694",
+ )
+
+ maven_jar(
+ name = "grpc-api",
+ artifact = "io.grpc:grpc-api:1.36.0",
+ sha1 = "5a2c6286f76477a44aaf63c9f4d0f5399652885a",
+ )
+
+ maven_jar(
+ name = "gax-grpc",
+ artifact = "com.google.api:gax-grpc:1.62.0",
+ sha1 = "9c8e22dbceac414c03bfc92abc4399e82208d647",
+ )
+
+ maven_jar(
+ name = "grpc-core",
+ artifact = "io.grpc:grpc-core:1.36.0",
+ sha1 = "17a7f3287439c1d2641fabc4d767e7de4ebb05e5",
+ )
+
+ maven_jar(
+ name = "grpc-netty-shaded",
+ artifact = "io.grpc:grpc-netty-shaded:1.36.0",
+ sha1 = "575d65cd47a8c997f2014e702920d23b9f18d764",
+ )
+
+ maven_jar(
+ name = "threetenbp",
+ artifact = "org.threeten:threetenbp:1.5.0",
+ sha1 = "e18c0fb79ebee3e3907042a7f31c31404f9e546b",
+ )
+
+ maven_jar(
+ name = "grpc-alts",
+ artifact = "io.grpc:grpc-alts:1.36.0",
+ sha1 = "e41ed1f9739daa26752885304855161e8d504fa0",
+ )
+
+ maven_jar(
+ name = "grpc-protobuf",
+ artifact = "io.grpc:grpc-protobuf:1.36.0",
+ sha1 = "b579119c664bb1b50249c652ef139ef30adb68a9",
+ )
+
+ maven_jar(
+ name = "grpc-protobuf-lite",
+ artifact = "io.grpc:grpc-protobuf-lite:1.36.0",
+ sha1 = "78f16d4544b8e83b845cd91ed9c24854f9f78a8c",
+ )
+
+ maven_jar(
+ name = "proto-google-iam-v1",
+ artifact = "com.google.api.grpc:proto-google-iam-v1:1.0.9",
+ sha1 = "7a3ad48a2f9925ec8cd811dc06bca778ba9560cc",
+ )
+
+ maven_jar(
+ name = "proto-google-common-protos",
+ artifact = "com.google.api.grpc:proto-google-common-protos:2.1.0",
+ sha1 = "65234da8719aed4672298f574d7e87c1f025c4dc",
+ )
+
+ maven_jar(
+ name = "google-http-client",
+ artifact = "com.google.http-client:google-http-client:1.39.0",
+ sha1 = "936d09a3afa6911f1255ce039debeaf37928ee75",
+ )
+
+ maven_jar(
+ name = "google-http-client-gson",
+ artifact = "com.google.http-client:google-http-client-gson:1.39.0",
+ sha1 = "24ec436b2b4a27c17ccfa83a9c885a1f582e29b8",
+ )
+
+ maven_jar(
+ name = "grpc-context",
+ artifact = "io.grpc:grpc-context:1.36.0",
+ sha1 = "8d8c6e0f00ae7889f4a435c4ec0c4ad4cb99578d",
+ )
+
+ maven_jar(
+ name = "grpc-stub",
+ artifact = "io.grpc:grpc-stub:1.36.0",
+ sha1 = "7154e1fbdc9a809f158e8990999760a1bab95ed7",
+ )
+
+ maven_jar(
+ name = "perfmark-api",
+ artifact = "io.perfmark:perfmark-api:0.23.0",
+ sha1 = "0b813b7539fae6550541da8caafd6add86d4e22f",
+ )
+
+ maven_jar(
+ name = "opencensus-api",
+ artifact = "io.opencensus:opencensus-api:0.28.0",
+ sha1 = "0fc0d06a9d975a38c581dff59b99cf31db78bd99",
+ )
+
+ maven_jar(
+ name = "opencensus-contrib-http-util",
+ artifact = "io.opencensus:opencensus-contrib-http-util:0.28.0",
+ sha1 = "f6cb276330197d51dd65327fc305a3df7e622705",
+ )
+
+ maven_jar(
+ name = "grpc-auth",
+ artifact = "io.grpc:grpc-auth:1.36.0",
+ sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e",
+ )
\ No newline at end of file
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ConsumerExecutor.java
new file mode 100644
index 0000000..6993db4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ConsumerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Manager.java
new file mode 100644
index 0000000..01bd857
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Manager.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Set;
+
+@Singleton
+public class Manager implements LifecycleListener {
+
+ private final Set<TopicSubscriber> consumers;
+ private final BrokerApi brokerApi;
+ private final PubSubEventListener pubSubEventListener;
+
+ @Inject
+ public Manager(
+ Set<TopicSubscriber> consumers,
+ BrokerApi brokerApi,
+ PubSubEventListener pubSubEventListener) {
+ this.consumers = consumers;
+ this.brokerApi = brokerApi;
+ this.pubSubEventListener = pubSubEventListener;
+ }
+
+ @Override
+ public void start() {
+ consumers.forEach(
+ topicSubscriber ->
+ brokerApi.receiveAsync(topicSubscriber.topic(), topicSubscriber.consumer()));
+ }
+
+ @Override
+ public void stop() {
+ brokerApi.disconnect();
+ pubSubEventListener.disconnect();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
new file mode 100644
index 0000000..32110dd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Module.java
@@ -0,0 +1,66 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.EventListener;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.pubsub.local.EnvironmentChecker;
+import com.googlesource.gerrit.plugins.pubsub.local.LocalCredentialsProvider;
+import com.googlesource.gerrit.plugins.pubsub.local.LocalPublisherProvider;
+import com.googlesource.gerrit.plugins.pubsub.local.LocalSubscriberProvider;
+
+class Module extends FactoryModule {
+
+ private PubSubApiModule pubSubApiModule;
+ private EnvironmentChecker environmentChecker;
+
+ @Inject
+ public Module(PubSubApiModule pubSubApiModule, EnvironmentChecker environmentChecker) {
+ this.pubSubApiModule = pubSubApiModule;
+ this.environmentChecker = environmentChecker;
+ }
+
+ @Override
+ protected void configure() {
+ bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class);
+ DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
+ DynamicSet.bind(binder(), EventListener.class).to(PubSubEventListener.class);
+ factory(PubSubPublisher.Factory.class);
+ factory(PubSubEventSubscriber.Factory.class);
+
+ if (environmentChecker.isLocalEnvironment()) {
+ bind(CredentialsProvider.class)
+ .toProvider(LocalCredentialsProvider.class)
+ .in(Scopes.SINGLETON);
+ bind(SubscriberProvider.class).to(LocalSubscriberProvider.class);
+ bind(PublisherProvider.class).to(LocalPublisherProvider.class);
+ } else {
+ bind(CredentialsProvider.class)
+ .toProvider(ServiceAccountCredentialsProvider.class)
+ .in(Scopes.SINGLETON);
+ bind(SubscriberProvider.class);
+ bind(PublisherProvider.class);
+ }
+ install(pubSubApiModule);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubApiModule.java
new file mode 100644
index 0000000..ca32304
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubApiModule.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class PubSubApiModule extends LifecycleModule {
+ WorkQueue workQueue;
+ PubSubConfiguration configuration;
+
+ private Set<TopicSubscriber> activeConsumers = Sets.newHashSet();
+
+ @Inject
+ public PubSubApiModule(WorkQueue workQueue, PubSubConfiguration configuration) {
+ this.workQueue = workQueue;
+ this.configuration = configuration;
+ }
+
+ /**
+ * Clients(for example multi-site plugin) of events-gcloud-pubsub library are registering
+ * consumers. Because we cannot guarantee that client plugin is loaded after the
+ * events-gcloud-pubsub we have to make sure that already registered consumers are reassigned to
+ * the events-gcloud-pubsub. This injection is optional because if events-gcloud-pubsub plugin is
+ * loaded before the client plugin no consumers are registered yet.
+ *
+ * @param previousBrokerApi
+ */
+ @Inject(optional = true)
+ public void setPreviousBrokerApi(DynamicItem<BrokerApi> previousBrokerApi) {
+ if (previousBrokerApi != null && previousBrokerApi.get() != null) {
+ this.activeConsumers = previousBrokerApi.get().topicSubscribers();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(ConsumerExecutor.class)
+ .toProvider(ScheduledExecutorServiceProvider.class)
+ .in(Scopes.SINGLETON);
+
+ bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers);
+ DynamicItem.bind(binder(), BrokerApi.class).to(PubSubBrokerApi.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
new file mode 100644
index 0000000..c989aa4
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -0,0 +1,88 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+class PubSubBrokerApi implements BrokerApi {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private PubSubPublisher.Factory publisherFactory;
+ private PubSubEventSubscriber.Factory subscriberFactory;
+ private Map<String, PubSubPublisher> publishers = new ConcurrentHashMap<>();
+ private Set<PubSubEventSubscriber> subscribers;
+
+ @Inject
+ public PubSubBrokerApi(
+ PubSubPublisher.Factory publisherFactory, PubSubEventSubscriber.Factory subscriberFactory) {
+ this.publisherFactory = publisherFactory;
+ this.subscriberFactory = subscriberFactory;
+ subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ }
+
+ @Override
+ public boolean send(String topic, EventMessage message) {
+ return publishers.computeIfAbsent(topic, t -> publisherFactory.create(t)).publish(message);
+ }
+
+ @Override
+ public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+ PubSubEventSubscriber subscriber = subscriberFactory.create(topic, eventConsumer);
+ subscribers.add(subscriber);
+ subscriber.subscribe();
+ }
+
+ @Override
+ public Set<TopicSubscriber> topicSubscribers() {
+ return subscribers.stream()
+ .map(s -> TopicSubscriber.topicSubscriber(s.getTopic(), s.getMessageProcessor()))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public void disconnect() {
+ publishers
+ .values()
+ .forEach(
+ publisher -> {
+ try {
+ publisher.close();
+ } catch (InterruptedException e) {
+ logger.atSevere().withCause(e).log("Disconnect failed");
+ }
+ });
+
+ for (PubSubEventSubscriber subscriber : subscribers) {
+ subscriber.shutdown();
+ }
+ subscribers.clear();
+ }
+
+ @Override
+ public void replayAllEvents(String topic) {
+ subscribers.stream()
+ .filter(subscriber -> topic.equals(subscriber.getTopic()))
+ .forEach(PubSubEventSubscriber::replayMessages);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
new file mode 100644
index 0000000..54efac3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -0,0 +1,119 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.server.config.GerritInstanceId;
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PubSubConfiguration {
+ private static final String DEFAULT_NUMBER_OF_SUBSCRIBERS = "6";
+ private static final String DEFAULT_ACK_DEADLINE_SECONDS = "10";
+ private static final String DEFAULT_SUBSCTIPRION_TIMEOUT = "10";
+ private static final String DEFAULT_SHUTDOWN_TIMEOUT = "10";
+ private static final String DEFAULT_STREAM_EVENTS_TOPIC = "gerrit";
+
+ private final String gcloudProject;
+ private final String subscriptionId;
+ private final Integer numberOfSubscribers;
+ private final Boolean sendAsync;
+ private final String privateKeyLocation;
+ private final Integer ackDeadlineSeconds;
+ private final Long subscribtionTimeoutInSeconds;
+ private final Long shutdownTimeoutInSeconds;
+ private final String streamEventsTopic;
+ private final PluginConfig fromGerritConfig;
+
+ @Inject
+ public PubSubConfiguration(
+ PluginConfigFactory configFactory,
+ @PluginName String pluginName,
+ @Nullable @GerritInstanceId String instanceId) {
+ this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+ this.sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
+ this.gcloudProject = getMandatoryString("gcloudProject");
+ this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
+ this.privateKeyLocation = getMandatoryString("privateKeyLocation");
+ this.streamEventsTopic =
+ fromGerritConfig.getString("streamEventsTopic", DEFAULT_STREAM_EVENTS_TOPIC);
+ this.numberOfSubscribers =
+ Integer.parseInt(
+ fromGerritConfig.getString("numberOfSubscribers", DEFAULT_NUMBER_OF_SUBSCRIBERS));
+ this.ackDeadlineSeconds =
+ Integer.parseInt(
+ fromGerritConfig.getString("ackDeadlineSeconds", DEFAULT_ACK_DEADLINE_SECONDS));
+ this.subscribtionTimeoutInSeconds =
+ Long.parseLong(
+ fromGerritConfig.getString(
+ "subscribtionTimeoutInSeconds", DEFAULT_SUBSCTIPRION_TIMEOUT));
+ this.shutdownTimeoutInSeconds =
+ Long.parseLong(
+ fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
+ }
+
+ public Boolean isSendAsync() {
+ return sendAsync;
+ }
+
+ public String getGCloudProject() {
+ return gcloudProject;
+ }
+
+ public Integer getNumberOfSubscribers() {
+ return numberOfSubscribers;
+ }
+
+ public String getPrivateKeyLocation() {
+ return privateKeyLocation;
+ }
+
+ public String getSubscriptionId() {
+ return subscriptionId;
+ }
+
+ public Integer getAckDeadlineSeconds() {
+ return ackDeadlineSeconds;
+ }
+
+ public Long getSubscribtionTimeoutInSeconds() {
+ return subscribtionTimeoutInSeconds;
+ }
+
+ public Long getShutdownTimeoutInSeconds() {
+ return shutdownTimeoutInSeconds;
+ }
+
+ public String getStreamEventsTopic() {
+ return streamEventsTopic;
+ }
+
+ private String getMandatoryString(String name) throws IllegalStateException {
+ return getMandatoryString(name, null);
+ }
+
+ private String getMandatoryString(String name, String defaultValue) throws IllegalStateException {
+ String value = fromGerritConfig.getString(name, defaultValue);
+ if (value == null) {
+ throw new IllegalStateException(
+ String.format("Invalid configuration: parameter '%s' is mandatory", name));
+ }
+ return value;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventListener.java
new file mode 100644
index 0000000..29ec263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventListener.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventListener;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class PubSubEventListener implements EventListener {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final PubSubPublisher publisher;
+
+ @Inject
+ public PubSubEventListener(
+ PubSubPublisher.Factory publisherFactory, PubSubConfiguration configuration) {
+ this.publisher = publisherFactory.create(configuration.getStreamEventsTopic());
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ publisher.publish(event);
+ }
+
+ public void disconnect() {
+ try {
+ publisher.close();
+ } catch (InterruptedException e) {
+ logger.atSevere().withCause(e).log("Disconnect failed");
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
new file mode 100644
index 0000000..b8d8c49
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubEventSubscriber.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.common.flogger.FluentLogger;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.google.pubsub.v1.PubsubMessage;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+public class PubSubEventSubscriber {
+
+ public interface Factory {
+ public PubSubEventSubscriber create(String topic, Consumer<EventMessage> messageProcessor);
+ }
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final Gson gson;
+ private final String topic;
+ private final Consumer<EventMessage> messageProcessor;
+ private final SubscriberProvider subscriberProvider;
+ private final PubSubConfiguration config;
+ private Subscriber subscriber;
+
+ @Inject
+ public PubSubEventSubscriber(
+ Gson gson,
+ SubscriberProvider subscriberProvider,
+ PubSubConfiguration config,
+ @Assisted String topic,
+ @Assisted Consumer<EventMessage> messageProcessor) {
+ this.gson = gson;
+ this.topic = topic;
+ this.messageProcessor = messageProcessor;
+ this.subscriberProvider = subscriberProvider;
+ this.config = config;
+ }
+
+ public void subscribe() {
+ MessageReceiver receiver =
+ (PubsubMessage message, AckReplyConsumer consumer) -> {
+ EventMessage event = gson.fromJson(message.getData().toStringUtf8(), EventMessage.class);
+ messageProcessor.accept(event);
+ consumer.ack();
+ };
+
+ try {
+ subscriber = subscriberProvider.get(topic, receiver);
+ subscriber
+ .startAsync()
+ .awaitRunning(config.getSubscribtionTimeoutInSeconds(), TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ logger.atSevere().withCause(e).log("Timeout during subscribing to the topic %s", topic);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Exception during subscribing to the topic %s", topic);
+ }
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public Consumer<EventMessage> getMessageProcessor() {
+ return messageProcessor;
+ }
+
+ public void replayMessages() {
+ subscriberProvider.replayMessages(subscriber.getSubscriptionNameString());
+ }
+
+ public void shutdown() {
+ try {
+ subscriber
+ .stopAsync()
+ .awaitTerminated(config.getShutdownTimeoutInSeconds(), TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ logger.atSevere().withCause(e).log("Timeout during subscriber shutdown");
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
new file mode 100644
index 0000000..24ef602
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -0,0 +1,94 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class PubSubPublisher {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ public interface Factory {
+ public PubSubPublisher create(String topic);
+ }
+
+ private final Gson gson;
+ private final Publisher publisher;
+ private final PubSubConfiguration pubSubProperties;
+
+ @Inject
+ public PubSubPublisher(
+ PubSubConfiguration pubSubProperties,
+ PublisherProvider publisherProvider,
+ Gson gson,
+ @Assisted String topic)
+ throws IOException {
+ this.gson = gson;
+ this.publisher = publisherProvider.get(topic);
+ this.pubSubProperties = pubSubProperties;
+ }
+
+ public boolean publish(Event event) {
+ return publish(gson.toJson(event));
+ }
+
+ public boolean publish(EventMessage event) {
+ return publish(gson.toJson(event));
+ }
+
+ private boolean publish(String eventPayload) {
+ ByteString data = ByteString.copyFromUtf8(eventPayload);
+ PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
+ if (pubSubProperties.isSendAsync()) {
+ return publishAsync(pubsubMessage) != null;
+ }
+
+ publishSync(pubsubMessage);
+ return true;
+ }
+
+ private ApiFuture<String> publishAsync(PubsubMessage pubsubMessage) {
+ return publisher.publish(pubsubMessage);
+ }
+
+ private void publishSync(PubsubMessage pubsubMessage) {
+ try {
+ ApiFuture<String> messageIdFuture = publishAsync(pubsubMessage);
+ messageIdFuture.get(1000, TimeUnit.SECONDS);
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ logger.atSevere().withCause(e).log("Cannot send the message");
+ }
+ }
+
+ public void close() throws InterruptedException {
+ if (publisher != null) {
+ publisher.shutdown();
+ publisher.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PublisherProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PublisherProvider.java
new file mode 100644
index 0000000..a7096d0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PublisherProvider.java
@@ -0,0 +1,41 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+
+@Singleton
+public class PublisherProvider {
+
+ protected CredentialsProvider credentials;
+ protected PubSubConfiguration config;
+
+ @Inject
+ public PublisherProvider(CredentialsProvider credentials, PubSubConfiguration config) {
+ this.credentials = credentials;
+ this.config = config;
+ }
+
+ public Publisher get(String topic) throws IOException {
+ return Publisher.newBuilder(TopicName.of(config.getGCloudProject(), topic))
+ .setCredentialsProvider(credentials)
+ .build();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/ScheduledExecutorServiceProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ScheduledExecutorServiceProvider.java
new file mode 100644
index 0000000..1e93288
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ScheduledExecutorServiceProvider.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class ScheduledExecutorServiceProvider implements Provider<ScheduledExecutorService> {
+ private WorkQueue workQueue;
+ private PubSubConfiguration configuration;
+
+ @Inject
+ public ScheduledExecutorServiceProvider(WorkQueue workQueue, PubSubConfiguration configuration) {
+ this.workQueue = workQueue;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public ScheduledExecutorService get() {
+ return workQueue.createQueue(configuration.getNumberOfSubscribers(), "pubsub-subscriber");
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/ServiceAccountCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ServiceAccountCredentialsProvider.java
new file mode 100644
index 0000000..717b2bc
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/ServiceAccountCredentialsProvider.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class ServiceAccountCredentialsProvider implements Provider<CredentialsProvider> {
+ private CredentialsProvider credentials;
+
+ @Inject
+ public ServiceAccountCredentialsProvider(PubSubConfiguration pubSubProperties)
+ throws FileNotFoundException, IOException {
+ this.credentials =
+ FixedCredentialsProvider.create(
+ ServiceAccountCredentials.fromStream(
+ new FileInputStream(pubSubProperties.getPrivateKeyLocation())));
+ }
+
+ @Override
+ public CredentialsProvider get() {
+ return credentials;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
new file mode 100644
index 0000000..31024d8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/SubscriberProvider.java
@@ -0,0 +1,130 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.rpc.NotFoundException;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.protobuf.Duration;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.GetSubscriptionRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.SeekRequest;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SubscriberProvider {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ protected CredentialsProvider credentials;
+ protected PubSubConfiguration pubSubProperties;
+ protected ScheduledExecutorService executor;
+
+ @Inject
+ public SubscriberProvider(
+ CredentialsProvider credentials,
+ PubSubConfiguration pubSubProperties,
+ @ConsumerExecutor ScheduledExecutorService executor) {
+ this.credentials = credentials;
+ this.pubSubProperties = pubSubProperties;
+ this.executor = executor;
+ }
+
+ public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
+ return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+ .setExecutorProvider(FixedExecutorProvider.create(executor))
+ .setCredentialsProvider(credentials)
+ .build();
+ }
+
+ protected SubscriptionAdminSettings createSubscriptionAdminSettings() throws IOException {
+ return SubscriptionAdminSettings.newBuilder().setCredentialsProvider(credentials).build();
+ }
+
+ protected Subscription getOrCreateSubscription(String topicId) throws IOException {
+ try (SubscriptionAdminClient subscriptionAdminClient =
+ SubscriptionAdminClient.create(createSubscriptionAdminSettings())) {
+ String subscriptionName =
+ String.format("%s-%s", pubSubProperties.getSubscriptionId(), topicId);
+ ProjectSubscriptionName projectSubscriptionName =
+ ProjectSubscriptionName.of(pubSubProperties.getGCloudProject(), subscriptionName);
+
+ return getSubscription(subscriptionAdminClient, projectSubscriptionName)
+ .orElseGet(
+ () ->
+ subscriptionAdminClient.createSubscription(
+ createSubscriptionRequest(projectSubscriptionName, topicId)));
+ }
+ }
+
+ protected Subscription createSubscriptionRequest(
+ ProjectSubscriptionName projectSubscriptionName, String topicId) {
+ return Subscription.newBuilder()
+ .setName(projectSubscriptionName.toString())
+ .setTopic(TopicName.of(pubSubProperties.getGCloudProject(), topicId).toString())
+ .setAckDeadlineSeconds(pubSubProperties.getAckDeadlineSeconds())
+ .setRetainAckedMessages(true)
+ .build();
+ }
+
+ protected Optional<Subscription> getSubscription(
+ SubscriptionAdminClient subscriptionAdminClient,
+ ProjectSubscriptionName projectSubscriptionName) {
+ try {
+ // we should use subscriptionAdminClient.listSubscriptions but for local setup this method
+ // throws UNKNOWN_EXCEPTION
+ return Optional.of(subscriptionAdminClient.getSubscription(projectSubscriptionName));
+ } catch (NotFoundException e) {
+ return Optional.empty();
+ }
+ }
+
+ public void replayMessages(String subscriptionName) {
+ try (SubscriptionAdminClient subscriptionAdminClient =
+ SubscriptionAdminClient.create(createSubscriptionAdminSettings())) {
+ Duration messageRetentionDuration =
+ subscriptionAdminClient
+ .getSubscription(
+ GetSubscriptionRequest.newBuilder().setSubscription(subscriptionName).build())
+ .getMessageRetentionDuration();
+ LocalDateTime retentionTime =
+ LocalDateTime.now().minusSeconds(messageRetentionDuration.getSeconds());
+ Timestamp retentionTimeEpoch =
+ Timestamp.newBuilder()
+ .setSeconds(retentionTime.atZone(ZoneOffset.UTC).toEpochSecond())
+ .build();
+
+ SeekRequest request =
+ SeekRequest.newBuilder()
+ .setSubscription(subscriptionName)
+ .setTime(retentionTimeEpoch)
+ .build();
+ subscriptionAdminClient.seek(request);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Cannot replay messages");
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/EnvironmentChecker.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/EnvironmentChecker.java
new file mode 100644
index 0000000..0587188
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/EnvironmentChecker.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.inject.Singleton;
+import java.util.Optional;
+
+@Singleton
+public class EnvironmentChecker {
+ public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST";
+
+ private Optional<String> hostPort;
+
+ public EnvironmentChecker() {
+ this.hostPort =
+ Optional.ofNullable(System.getenv(PUBSUB_EMULATOR_HOST))
+ .or(() -> Optional.ofNullable(System.getProperty(PUBSUB_EMULATOR_HOST)));
+ }
+
+ public Optional<String> getLocalHostAndPort() {
+ return hostPort;
+ }
+
+ public Boolean isLocalEnvironment() {
+ return getLocalHostAndPort().isPresent();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalCredentialsProvider.java
new file mode 100644
index 0000000..b7c1c94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalCredentialsProvider.java
@@ -0,0 +1,35 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.inject.Provider;
+
+public class LocalCredentialsProvider implements Provider<CredentialsProvider> {
+
+ CredentialsProvider credentials;
+
+ public LocalCredentialsProvider() {
+ this.credentials =
+ FixedCredentialsProvider.create(NoCredentialsProvider.create().getCredentials());
+ }
+
+ @Override
+ public CredentialsProvider get() {
+ return credentials;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalPublisherProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalPublisherProvider.java
new file mode 100644
index 0000000..d5475cb
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalPublisherProvider.java
@@ -0,0 +1,77 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.AlreadyExistsException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.inject.Inject;
+import com.google.pubsub.v1.TopicName;
+import com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration;
+import com.googlesource.gerrit.plugins.pubsub.PublisherProvider;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+
+public class LocalPublisherProvider extends PublisherProvider {
+ private EnvironmentChecker environmentChecker;
+
+ @Inject
+ public LocalPublisherProvider(
+ CredentialsProvider credentials,
+ PubSubConfiguration pubSubProperties,
+ EnvironmentChecker environmentChecker) {
+ super(credentials, pubSubProperties);
+ this.environmentChecker = environmentChecker;
+ }
+
+ @Override
+ public Publisher get(String topic) throws IOException {
+ ManagedChannel channel =
+ ManagedChannelBuilder.forTarget(environmentChecker.getLocalHostAndPort().get())
+ .usePlaintext()
+ .build();
+ TransportChannelProvider channelProvider =
+ FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ createTopic(channel, config.getGCloudProject(), topic);
+ return Publisher.newBuilder(TopicName.of(config.getGCloudProject(), topic))
+ .setChannelProvider(channelProvider)
+ .setCredentialsProvider(credentials)
+ .build();
+ }
+
+ private static void createTopic(ManagedChannel channel, String project, String topicId)
+ throws IOException {
+
+ TopicAdminSettings topicAdminSettings =
+ TopicAdminSettings.newBuilder()
+ .setTransportChannelProvider(
+ FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)))
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .build();
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
+ TopicName topicName = TopicName.of(project, topicId);
+ topicAdminClient.createTopic(topicName);
+ } catch (AlreadyExistsException e) {
+ // topic already exists do nothing
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
new file mode 100644
index 0000000..a0da1e5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/local/LocalSubscriberProvider.java
@@ -0,0 +1,97 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub.local;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedExecutorProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.AlreadyExistsException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.inject.Inject;
+import com.google.pubsub.v1.TopicName;
+import com.googlesource.gerrit.plugins.pubsub.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.pubsub.PubSubConfiguration;
+import com.googlesource.gerrit.plugins.pubsub.SubscriberProvider;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class LocalSubscriberProvider extends SubscriberProvider {
+ private EnvironmentChecker environmentChecker;
+
+ @Inject
+ public LocalSubscriberProvider(
+ PubSubConfiguration pubSubProperties,
+ CredentialsProvider credentials,
+ EnvironmentChecker environmentChecker,
+ @ConsumerExecutor ScheduledExecutorService executor) {
+ super(credentials, pubSubProperties, executor);
+ this.environmentChecker = environmentChecker;
+ }
+
+ @Override
+ public Subscriber get(String topic, MessageReceiver receiver) throws IOException {
+ TransportChannelProvider channelProvider = createChannelProvider();
+ createTopic(channelProvider, pubSubProperties.getGCloudProject(), topic);
+ return Subscriber.newBuilder(getOrCreateSubscription(topic).getName(), receiver)
+ .setChannelProvider(channelProvider)
+ .setExecutorProvider(FixedExecutorProvider.create(executor))
+ .setCredentialsProvider(credentials)
+ .build();
+ }
+
+ @Override
+ protected SubscriptionAdminSettings createSubscriptionAdminSettings() throws IOException {
+ TransportChannelProvider channelProvider = createChannelProvider();
+ return SubscriptionAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentials)
+ .build();
+ }
+
+ private TransportChannelProvider createChannelProvider() {
+ ManagedChannel channel =
+ ManagedChannelBuilder.forTarget(environmentChecker.getLocalHostAndPort().get())
+ .usePlaintext()
+ .build();
+ TransportChannelProvider channelProvider =
+ FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ return channelProvider;
+ }
+
+ private static void createTopic(
+ TransportChannelProvider channelProvider, String project, String topicId) throws IOException {
+
+ TopicAdminSettings topicAdminSettings =
+ TopicAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .build();
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
+ TopicName topicName = TopicName.of(project, topicId);
+ topicAdminClient.createTopic(topicName);
+ } catch (AlreadyExistsException e) {
+ // topic already exists do nothing
+ }
+ }
+}
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
new file mode 100644
index 0000000..73f1a9f
--- /dev/null
+++ b/src/main/resources/Documentation/build.md
@@ -0,0 +1,42 @@
+# Build
+
+The events-gcloud-pubsub plugin can be built as a regular 'in-tree' plugin. That means
+that is required to clone a Gerrit source tree first and then to have the plugin
+source directory into the `/plugins` path.
+
+Additionally, the `plugins/external_plugin_deps.bzl` file needs to be updated to
+match the events-gcloud-pubsub plugin one.
+
+```shell script
+git clone --recursive https://gerrit.googlesource.com/gerrit
+cd gerrit
+git clone "https://gerrit.googlesource.com/plugins/events-gcloud-pubsub" plugins/events-gcloud-pubsub
+ln -sf ../plugins/events-gcloud-pubsub/external_plugin_deps.bzl plugins/.
+bazelisk build plugins/events-gcloud-pubsub
+```
+
+The output is created in
+
+```
+bazel-bin/plugins/events-gcloud-pubsub/events-gcloud-pubsub.jar
+```
+
+This project can be imported into the Eclipse IDE.
+Add the plugin name to the `CUSTOM_PLUGINS` set in
+Gerrit core in `tools/bzl/plugins.bzl`, and execute:
+
+```
+ ./tools/eclipse/project.py
+```
+
+To execute the tests run either one of:
+
+```
+ bazelisk test --test_tag_filters=@PLUGIN@ //...
+ bazelisk test plugins/@PLUGIN@:@PLUGIN@_tests
+```
+Tests prerequisite:
+* Docker
+
+How to build the Gerrit Plugin API is described in the [Gerrit
+documentation](../../../Documentation/dev-bazel.html#_extension_and_plugin_api_jar_files).
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
new file mode 100644
index 0000000..2e93838
--- /dev/null
+++ b/src/main/resources/Documentation/config.md
@@ -0,0 +1,59 @@
+Configuration
+=========================
+
+The events-gcloud-pubsub plugin is configured by adding a plugin stanza in the
+`gerrit.config` file, for example:
+
+```text
+[plugin "events-gcloud-pubsub"]
+ numberOfSubscribers = 6
+ subscriptionId = instance-1
+ gcloudProject = test_project
+ privateKeyLocation = /var/gerrit/etc/secured_key.json
+
+```
+
+`plugin.events-gcloud-pubsub.gcloudProject`
+: GCloud [project name](https://cloud.google.com/docs/overview#projects)
+
+`plugin.events-gcloud-pubsub.subscriptionId`
+: Conditional. This value identifies the subscriber and it must be unique within your
+ Gerrit cluster to allow different Gerrit nodes to consume data from the
+ stream independently. It can be omitted when `gerrit.instanceId` is
+ configured, otherwise it is mandatory.
+ Default: `gerrit.instanceId` value (when defined)
+ See also: [gerrit.instanceId](https://gerrit-review.googlesource.com/Documentation/config-gerrit.html#gerrit.instanceId)
+
+`plugin.events-gcloud-pubsub.privateKeyLocation`
+: Path to the JSON file that contains service account key. The file
+ should be readable only by the daemon process because it contains information
+ that wouldn’t normally be exposed to everyone.
+
+`plugin.events-gcloud-pubsub.numberOfSubscribers`
+: Optional. The number of expected events-gcloud-pubsub subscribers. This will be used
+ to allocate a thread pool able to run all subscribers.
+ Default: 6
+
+`plugin.events-gcloud-pubsub.sendAsync`
+: Optional. Send messages to GCloud PubSub asynchronously, detaching the calling
+ process from the acknowledge of the message being sent.
+ The drawback of the enabling the sendAsync parameter is that the broker would only
+ return the status of the successful invocation of the message send operation and not
+ the actual status received by the broker. This means that when sendAsync is enabled
+ 'broker_msg_publisher_failure_counter' metric is not incremented when message send
+ failure occurs.
+ Default: true
+
+`plugin.events-gcloud-pubsub.ackDeadlineSeconds`
+: Optional. The approximate amount of time (on a best-effort basis) Pub/Sub waits for
+ the subscriber to acknowledge receipt before resending the message.
+ Default: 10
+
+`plugin.events-gcloud-pubsub.subscribtionTimeoutInSeconds`
+: Optional. Maximum time in seconds to wait for the subscriber to connect to GCloud PubSub topic.
+ Default: 10
+
+`plugin.events-gcloud-pubsub.streamEventsTopic`
+: Optional. Name of the GCloud PubSub topic for stream events. events-gcloud-pubsub plugin exposes
+ all stream events under this topic name.
+ Default: gerrit
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
new file mode 100644
index 0000000..08a200b
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApiIT.java
@@ -0,0 +1,262 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.counting;
+import static java.util.stream.Collectors.groupingBy;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.NoHttpd;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.WaitUtil;
+import com.google.gerrit.acceptance.config.GerritConfig;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.TopicName;
+import com.googlesource.gerrit.plugins.pubsub.local.EnvironmentChecker;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.junit.Test;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@NoHttpd
+@TestPlugin(
+ name = "events-gcloud-pubsub",
+ sysModule = "com.googlesource.gerrit.plugins.pubsub.Module")
+public class PubSubBrokerApiIT extends LightweightPluginDaemonTest {
+ private static final String PROJECT_ID = "test_project";
+ private static final String TOPIC_ID = "test_topic";
+ private static final String SUBSCRIPTION_ID = "test_subscription_id";
+
+ private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5);
+ private static final String PRIVATE_KEY_LOCATION = "not used in test";
+
+ @Inject private Gson gson;
+
+ private TransportChannelProvider channelProvider;
+ private NoCredentialsProvider credentialsProvider;
+ private ManagedChannel channel;
+
+ private BrokerApi objectUnderTest;
+
+ public PubSubEmulatorContainer emulator =
+ new PubSubEmulatorContainer(
+ DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:316.0.0-emulators"));
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ emulator.start();
+ String hostport = emulator.getEmulatorEndpoint();
+ System.setProperty(EnvironmentChecker.PUBSUB_EMULATOR_HOST, hostport);
+ channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
+ channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ credentialsProvider = NoCredentialsProvider.create();
+
+ createTopic(TOPIC_ID, channelProvider, credentialsProvider);
+
+ super.setUpTestPlugin();
+
+ objectUnderTest = plugin.getSysInjector().getInstance(BrokerApi.class);
+ }
+
+ @Override
+ public void tearDownTestPlugin() {
+ channel.shutdown();
+ emulator.close();
+ super.tearDownTestPlugin();
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
+ public void shouldSendEvent() throws IOException {
+ createSubscription(SUBSCRIPTION_ID, TOPIC_ID, channelProvider, credentialsProvider);
+ UUID id = UUID.randomUUID();
+ Event event = new ProjectCreatedEvent();
+ EventMessage eventMessage = new EventMessage(new EventMessage.Header(id, id), event);
+ String expectedMessageJson = gson.toJson(eventMessage);
+
+ objectUnderTest.send(TOPIC_ID, eventMessage);
+
+ readMessageAndValidate(
+ (pullResponse) -> {
+ assertThat(pullResponse.getReceivedMessagesList()).hasSize(1);
+ assertThat(pullResponse.getReceivedMessages(0).getMessage().getData().toStringUtf8())
+ .isEqualTo(expectedMessageJson);
+ });
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
+ public void shouldProduceStreamEvents() throws Exception {
+ String subscriptionId = "gerrit-subscription-id";
+ String topicId = "gerrit";
+ createSubscription(subscriptionId, topicId, channelProvider, credentialsProvider);
+
+ createChange();
+
+ readMessageAndValidate(
+ (pullResponse) -> {
+ List<ReceivedMessage> messages = pullResponse.getReceivedMessagesList();
+ assertThat(messages).hasSize(4);
+ Map<String, Long> messageTypeCount =
+ messages.stream()
+ .map(m -> gson.fromJson(m.getMessage().getData().toStringUtf8(), Map.class))
+ .map(m -> m.get("type").toString())
+ .collect(groupingBy(t -> t, counting()));
+
+ assertThat(messageTypeCount.get(RefUpdatedEvent.TYPE)).isEqualTo(3);
+ assertThat(messageTypeCount.get("patchset-created")).isEqualTo(1);
+ },
+ PROJECT_ID,
+ subscriptionId);
+ }
+
+ @Test
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.gcloudProject", value = PROJECT_ID)
+ @GerritConfig(name = "plugin.events-gcloud-pubsub.subscriptionId", value = SUBSCRIPTION_ID)
+ @GerritConfig(
+ name = "plugin.events-gcloud-pubsub.privateKeyLocation",
+ value = PRIVATE_KEY_LOCATION)
+ public void shouldConsumeEvent() throws InterruptedException {
+ UUID id = UUID.randomUUID();
+ Event event = new ProjectCreatedEvent();
+ EventMessage eventMessage = new EventMessage(new EventMessage.Header(id, id), event);
+ String expectedMessageJson = gson.toJson(eventMessage);
+ TestConsumer consumer = new TestConsumer();
+
+ objectUnderTest.receiveAsync(TOPIC_ID, consumer);
+
+ objectUnderTest.send(TOPIC_ID, eventMessage);
+
+ WaitUtil.waitUntil(
+ () ->
+ consumer.getMessage() != null
+ && expectedMessageJson.equals(gson.toJson(consumer.getMessage())),
+ TEST_TIMEOUT);
+ }
+
+ private void readMessageAndValidate(Consumer<PullResponse> validate) throws IOException {
+ readMessageAndValidate(validate, PROJECT_ID, SUBSCRIPTION_ID);
+ }
+
+ private void readMessageAndValidate(
+ Consumer<PullResponse> validate, String projectId, String subscriptionId) throws IOException {
+ SubscriberStubSettings subscriberStubSettings =
+ SubscriberStubSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
+ PullRequest pullRequest =
+ PullRequest.newBuilder()
+ .setMaxMessages(Integer.MAX_VALUE) // make sure that we read all messages
+ .setSubscription(ProjectSubscriptionName.format(projectId, subscriptionId))
+ .build();
+ PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
+
+ validate.accept(pullResponse);
+ }
+ }
+
+ private void createTopic(
+ String topicId,
+ TransportChannelProvider channelProvider,
+ NoCredentialsProvider credentialsProvider)
+ throws IOException {
+ TopicAdminSettings topicAdminSettings =
+ TopicAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create(topicAdminSettings)) {
+ TopicName topicName = TopicName.of(PROJECT_ID, topicId);
+ topicAdminClient.createTopic(topicName);
+ }
+ }
+
+ private void createSubscription(
+ String subscriptionId,
+ String topicId,
+ TransportChannelProvider channelProvider,
+ NoCredentialsProvider credentialsProvider)
+ throws IOException {
+ SubscriptionAdminSettings subscriptionAdminSettings =
+ SubscriptionAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build();
+ SubscriptionAdminClient subscriptionAdminClient =
+ SubscriptionAdminClient.create(subscriptionAdminSettings);
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
+ subscriptionAdminClient
+ .createSubscription(
+ subscriptionName,
+ TopicName.of(PROJECT_ID, topicId),
+ PushConfig.getDefaultInstance(),
+ 10)
+ .getName();
+ }
+
+ private class TestConsumer implements Consumer<EventMessage> {
+ private EventMessage msg;
+
+ @Override
+ public void accept(EventMessage msg) {
+ this.msg = msg;
+ }
+
+ public EventMessage getMessage() {
+ return msg;
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfigurationTest.java
new file mode 100644
index 0000000..a1834ad
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfigurationTest.java
@@ -0,0 +1,146 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PubSubConfigurationTest {
+ private static final String PLUGIN_NAME = "events-gcloud-pubsub";
+ private static final String subscriptionId = "some-subscription-id";
+ private static final String gerritInstanceId = "some-gerrit-id";
+ private static final String gCloudProject = "gcloud-test-project";
+ private static final String privateKeyLocation = "/some/path";
+
+ private PluginConfig.Update pluginConfig;
+ @Mock private PluginConfigFactory pluginConfigFactoryMock;
+
+ @Before
+ public void setup() {
+ pluginConfig = PluginConfig.Update.forTest(PLUGIN_NAME, new Config());
+ pluginConfig.setString("subscriptionId", subscriptionId);
+ pluginConfig.setString("gcloudProject", gCloudProject);
+ pluginConfig.setString("privateKeyLocation", privateKeyLocation);
+ }
+
+ @Test
+ public void shouldUseSubscriptionIdWhenConfiguredEvenIfGerritInstanceIdIsNull() {
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ PubSubConfiguration configuration =
+ new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, null);
+
+ assertThat(configuration.getSubscriptionId()).isEqualTo(subscriptionId);
+ }
+
+ @Test
+ public void shouldUseSubscriptionIdWhenConfiguredEvenIfGerritInstanceIdIsDefined() {
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ PubSubConfiguration configuration =
+ new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+ assertThat(configuration.getSubscriptionId()).isEqualTo(subscriptionId);
+ }
+
+ @Test
+ public void shouldUseGerritInstanceIdWhenSubscriptionIdIsEmpty() {
+ pluginConfig.setString("subscriptionId", "");
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ PubSubConfiguration configuration =
+ new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+ assertThat(configuration.getSubscriptionId()).isEqualTo(gerritInstanceId);
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenSubscriptionIdIsNotDefinedAndGerritInstanceIdIsNull() {
+ pluginConfig.setString("subscriptionId", "");
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ IllegalStateException thrown =
+ assertThrows(
+ IllegalStateException.class,
+ () -> new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, null));
+
+ assertThat(thrown).hasMessageThat().contains("parameter 'subscriptionId' is mandatory");
+ }
+
+ @Test
+ public void shouldReadGCloudProjectWhenConfigured() {
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ PubSubConfiguration configuration =
+ new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+ assertThat(configuration.getGCloudProject()).isEqualTo(gCloudProject);
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenGCloudProjectIsNotDefined() {
+ pluginConfig.setString("gcloudProject", "");
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ IllegalStateException thrown =
+ assertThrows(
+ IllegalStateException.class,
+ () -> new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId));
+
+ assertThat(thrown).hasMessageThat().contains("parameter 'gcloudProject' is mandatory");
+ }
+
+ @Test
+ public void shouldReadPrivateKeyLocationWhenConfigured() {
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ PubSubConfiguration configuration =
+ new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId);
+
+ assertThat(configuration.getPrivateKeyLocation()).isEqualTo(privateKeyLocation);
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenPrivateKeyLocationIsNotDefined() {
+ pluginConfig.setString("privateKeyLocation", "");
+ when(pluginConfigFactoryMock.getFromGerritConfig(PLUGIN_NAME))
+ .thenReturn(pluginConfig.asPluginConfig());
+
+ IllegalStateException thrown =
+ assertThrows(
+ IllegalStateException.class,
+ () -> new PubSubConfiguration(pluginConfigFactoryMock, PLUGIN_NAME, gerritInstanceId));
+
+ assertThat(thrown).hasMessageThat().contains("parameter 'privateKeyLocation' is mandatory");
+ }
+}