Merge branch 'stable-3.3'
* stable-3.3:
Deserialize Event and EventMessage
Fix issue with message serialisation
Bump events-broker to v3.3.2
Add message content validation
Change-Id: I86d8e4f559ec4bf979ab31dded198abfbfc6553f
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 5a21ad4..d26ffb9 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -45,8 +45,8 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.3.2",
- sha1 = "d8bcb77047cc12dd7c623b5b4de70a25499d3d6c",
+ artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+ sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
)
maven_jar(
@@ -161,4 +161,4 @@
name = "grpc-auth",
artifact = "io.grpc:grpc-auth:1.36.0",
sha1 = "d9722016658f8e649111c8bb93b299ea38dc207e",
- )
\ No newline at end of file
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
index c989aa4..7d07a5f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubBrokerApi.java
@@ -18,6 +18,7 @@
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import java.util.Collections;
import java.util.Map;
@@ -42,7 +43,7 @@
}
@Override
- public boolean send(String topic, EventMessage message) {
+ public ListenableFuture<Boolean> send(String topic, EventMessage message) {
return publishers.computeIfAbsent(topic, t -> publisherFactory.create(t)).publish(message);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
index 54efac3..614d0aa 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubConfiguration.java
@@ -33,7 +33,6 @@
private final String gcloudProject;
private final String subscriptionId;
private final Integer numberOfSubscribers;
- private final Boolean sendAsync;
private final String privateKeyLocation;
private final Integer ackDeadlineSeconds;
private final Long subscribtionTimeoutInSeconds;
@@ -47,7 +46,6 @@
@PluginName String pluginName,
@Nullable @GerritInstanceId String instanceId) {
this.fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
- this.sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
this.gcloudProject = getMandatoryString("gcloudProject");
this.subscriptionId = getMandatoryString("subscriptionId", instanceId);
this.privateKeyLocation = getMandatoryString("privateKeyLocation");
@@ -68,10 +66,6 @@
fromGerritConfig.getString("shutdownTimeoutInSeconds", DEFAULT_SHUTDOWN_TIMEOUT));
}
- public Boolean isSendAsync() {
- return sendAsync;
- }
-
public String getGCloudProject() {
return gcloudProject;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index a802d71..850a96d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -20,6 +20,9 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.server.events.Event;
import com.google.gson.Gson;
@@ -28,9 +31,8 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
-import java.util.concurrent.ExecutionException;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
public class PubSubPublisher {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -60,26 +62,21 @@
this.pubSubProperties = pubSubProperties;
}
- public boolean publish(Event event) {
+ public ListenableFuture<Boolean> publish(Event event) {
return publish(gson.toJson(event));
}
- public boolean publish(EventMessage event) {
+ public ListenableFuture<Boolean> publish(EventMessage event) {
return publish(gson.toJson(event));
}
- private boolean publish(String eventPayload) {
+ private ListenableFuture<Boolean> publish(String eventPayload) {
ByteString data = ByteString.copyFromUtf8(eventPayload);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
- if (pubSubProperties.isSendAsync()) {
- return publishAsync(pubsubMessage) != null;
- }
-
- publishSync(pubsubMessage);
- return true;
+ return publishAsync(pubsubMessage);
}
- private ApiFuture<String> publishAsync(PubsubMessage pubsubMessage) {
+ private ListenableFuture<Boolean> publishAsync(PubsubMessage pubsubMessage) {
ApiFuture<String> publish = publisher.publish(pubsubMessage);
ApiFutures.addCallback(
publish,
@@ -102,17 +99,11 @@
}
},
MoreExecutors.directExecutor());
- return publish;
- }
- private void publishSync(PubsubMessage pubsubMessage) {
- try {
- ApiFuture<String> messageIdFuture = publishAsync(pubsubMessage);
- messageIdFuture.get(1000, TimeUnit.SECONDS);
-
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- logger.atSevere().withCause(e).log("Cannot send the message");
- }
+ return Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(publish),
+ Objects::nonNull,
+ MoreExecutors.directExecutor());
}
public void close() throws InterruptedException {
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 2e93838..f3263d3 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -34,16 +34,6 @@
to allocate a thread pool able to run all subscribers.
Default: 6
-`plugin.events-gcloud-pubsub.sendAsync`
-: Optional. Send messages to GCloud PubSub asynchronously, detaching the calling
- process from the acknowledge of the message being sent.
- The drawback of the enabling the sendAsync parameter is that the broker would only
- return the status of the successful invocation of the message send operation and not
- the actual status received by the broker. This means that when sendAsync is enabled
- 'broker_msg_publisher_failure_counter' metric is not incremented when message send
- failure occurs.
- Default: true
-
`plugin.events-gcloud-pubsub.ackDeadlineSeconds`
: Optional. The approximate amount of time (on a best-effort basis) Pub/Sub waits for
the subscriber to acknowledge receipt before resending the message.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
index c009973..97a2c8d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -60,18 +60,6 @@
@Test
public void shouldIncrementFailedToPublishMessageWhenAsyncPublishFails() {
- when(confMock.isSendAsync()).thenReturn(true);
- when(publisherMock.publish(any()))
- .thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
-
- objectUnderTest.publish(eventMessage);
-
- verify(pubSubPublisherMetricsMock, only()).incrementFailedToPublishMessage();
- }
-
- @Test
- public void shouldIncrementFailedToPublishMessageWhenSyncPublishFails() {
- when(confMock.isSendAsync()).thenReturn(false);
when(publisherMock.publish(any()))
.thenReturn(ApiFutures.immediateFailedFuture(new Exception("Something went wrong")));
@@ -82,17 +70,6 @@
@Test
public void shouldIncrementSuccessToPublishMessageWhenAsyncPublishSucceeds() {
- when(confMock.isSendAsync()).thenReturn(true);
- when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
-
- objectUnderTest.publish(eventMessage);
-
- verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
- }
-
- @Test
- public void shouldIncrementSuccessToPublishMessageWhenSyncPublishSucceeds() {
- when(confMock.isSendAsync()).thenReturn(false);
when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
objectUnderTest.publish(eventMessage);