Implements async send interface
Events-broker 3.4.0-rc2 version exposes an async send API.
This allows, for example, to attach a callback to the API return value
and correctly calculate publisher metrics.
Bug: Issue 14396
Change-Id: If26a056b5edfb588d16a6df5f9b3008153f1e5f5
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index fc1b8ad..77e9d52 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -9,6 +9,6 @@
maven_jar(
name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.4.0-rc0",
- sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064",
+ artifact = "com.gerritforge:events-broker:3.4.0-rc2",
+ sha1 = "f72b4166e6d785fd1a41c997a4ffb14461dd7d87",
)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index f44f4f7..44c4554 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -22,6 +22,7 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
@@ -72,6 +73,8 @@
install(new IndexModule());
}
+ install(new BrokerModule());
+
install(new RouterModule());
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 71be5e6..cff9899 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -17,6 +17,10 @@
import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
@@ -26,9 +30,14 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BrokerApiWrapper implements BrokerApi {
+ private static final Logger log = LoggerFactory.getLogger(BrokerApiWrapper.class);
+ private final Executor executor;
private final DynamicItem<BrokerApi> apiDelegate;
private final BrokerMetrics metrics;
private final MessageLogger msgLog;
@@ -36,37 +45,64 @@
@Inject
public BrokerApiWrapper(
+ @BrokerExecutor Executor executor,
DynamicItem<BrokerApi> apiDelegate,
BrokerMetrics metrics,
MessageLogger msgLog,
@InstanceId UUID instanceId) {
this.apiDelegate = apiDelegate;
+ this.executor = executor;
this.metrics = metrics;
this.msgLog = msgLog;
this.instanceId = instanceId;
}
public boolean send(String topic, Event event) {
- return send(topic, apiDelegate.get().newMessage(instanceId, event));
+ try {
+ return send(topic, apiDelegate.get().newMessage(instanceId, event)).get();
+ } catch (Throwable e) {
+ log.error(
+ "Failed to publish event '{}' to topic '{}' - error: {} - stack trace: {}",
+ event,
+ topic,
+ e.getMessage(),
+ e.getStackTrace());
+ metrics.incrementBrokerFailedToPublishMessage();
+ return false;
+ }
}
@Override
- public boolean send(String topic, EventMessage message) {
+ public ListenableFuture<Boolean> send(String topic, EventMessage message) {
+ SettableFuture<Boolean> resultFuture = SettableFuture.create();
if (Context.isForwardedEvent()) {
- return true;
+ resultFuture.set(true);
+ return resultFuture;
}
- boolean succeeded = false;
- try {
- succeeded = apiDelegate.get().send(topic, message);
- } finally {
- if (succeeded) {
- msgLog.log(Direction.PUBLISH, topic, message);
- metrics.incrementBrokerPublishedMessage();
- } else {
- metrics.incrementBrokerFailedToPublishMessage();
- }
- }
- return succeeded;
+
+ ListenableFuture<Boolean> resfultF = apiDelegate.get().send(topic, message);
+ Futures.addCallback(
+ resfultF,
+ new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean result) {
+ msgLog.log(Direction.PUBLISH, topic, message);
+ metrics.incrementBrokerPublishedMessage();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ log.error(
+ "Failed to publish message '{}' to topic '{}' - error: {}",
+ message.toString(),
+ topic,
+ throwable.getMessage());
+ metrics.incrementBrokerFailedToPublishMessage();
+ }
+ },
+ executor);
+
+ return resfultF;
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
new file mode 100644
index 0000000..aa24eb1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+@interface BrokerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
new file mode 100644
index 0000000..e843263
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerExecutorProvider.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.ExecutorProvider;
+
+@Singleton
+class BrokerExecutorProvider extends ExecutorProvider {
+
+ @Inject
+ BrokerExecutorProvider(WorkQueue workQueue) {
+ super(workQueue, 1, "Multi-Site-Broker");
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
new file mode 100644
index 0000000..a5dac4a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import java.util.concurrent.Executor;
+
+public class BrokerModule extends LifecycleModule {
+
+ @Override
+ protected void configure() {
+ bind(Executor.class)
+ .annotatedWith(BrokerExecutor.class)
+ .toProvider(BrokerExecutorProvider.class);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 92fa101..bb63caf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -6,6 +6,8 @@
import static org.mockito.Mockito.when;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
@@ -31,19 +33,27 @@
public void setUp() {
objectUnderTest =
new BrokerApiWrapper(
- DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
+ MoreExecutors.directExecutor(),
+ DynamicItem.itemOf(BrokerApi.class, brokerApi),
+ brokerMetrics,
+ msgLog,
+ instanceId);
}
@Test
public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
- when(brokerApi.send(any(), any())).thenReturn(true);
+ SettableFuture<Boolean> resultF = SettableFuture.create();
+ resultF.set(true);
+ when(brokerApi.send(any(), any())).thenReturn(resultF);
objectUnderTest.send(topic, event);
verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
}
@Test
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
- when(brokerApi.send(any(), any())).thenReturn(false);
+ SettableFuture<Boolean> resultF = SettableFuture.create();
+ resultF.setException(new Exception("Force Future failure"));
+ when(brokerApi.send(any(), any())).thenReturn(resultF);
objectUnderTest.send(topic, event);
verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
}