Merge branch 'stable-3.6' into stable-3.7
* stable-3.6:
Fix typos in docs
Receive stream events from events-broker directly
Change-Id: I1fa238877af95985356c39c9deadf0c450d8cb64
diff --git a/BUILD b/BUILD
index 02ac689..e1f435d 100644
--- a/BUILD
+++ b/BUILD
@@ -16,6 +16,7 @@
deps = [
"//lib/commons:io",
"//plugins/replication",
+ "@events-broker//jar:neverlink",
],
)
@@ -30,7 +31,8 @@
deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
":pull-replication__plugin",
":pull_replication_util",
- "//plugins/replication:replication",
+ "//plugins/replication",
+ "@events-broker//jar",
],
)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..f1f162e
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,8 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+def external_plugin_deps():
+ maven_jar(
+ name = "events-broker",
+ artifact = "com.gerritforge:events-broker:3.4.0.4",
+ sha1 = "8d361d863382290e33828116e65698190118d0f1",
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 1671410..cd19be4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -49,6 +49,7 @@
import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
+import com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule;
import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
@@ -139,8 +140,11 @@
bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
}
+ String eventBrokerTopic = replicationConfig.getString("replication", null, "eventBrokerTopic");
if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
install(new StreamEventModule());
+ } else if (eventBrokerTopic != null) {
+ install(new EventsBrokerConsumerModule(eventBrokerTopic));
}
DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
new file mode 100644
index 0000000..9c2d737
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.server.events.Event;
+
+public class EventRejectedException extends IllegalStateException {
+
+ private static final long serialVersionUID = 3404537560735602192L;
+
+ public EventRejectedException(Event event, Throwable cause) {
+ super(String.format("Unable to accept event %s", event), cause);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
new file mode 100644
index 0000000..ce1a076
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+
+public class EventsBrokerConsumerModule extends LifecycleModule {
+ public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+
+ private final String topicName;
+
+ public EventsBrokerConsumerModule(String topicName) {
+ this.topicName = topicName;
+ }
+
+ @Override
+ protected void configure() {
+ bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
+ bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
+
+ listener().to(EventsBrokerMessageConsumer.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
new file mode 100644
index 0000000..6fda9d3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import java.util.function.Consumer;
+
+public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
+
+ private final DynamicItem<BrokerApi> eventsBroker;
+ private final StreamEventListener eventListener;
+ private final String eventsTopicName;
+
+ @Inject
+ public EventsBrokerMessageConsumer(
+ DynamicItem<BrokerApi> eventsBroker,
+ StreamEventListener eventListener,
+ @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+
+ this.eventsBroker = eventsBroker;
+ this.eventListener = eventListener;
+ this.eventsTopicName = eventsTopicName;
+ }
+
+ @Override
+ public void accept(Event event) {
+ try {
+ eventListener.fetchRefsForEvent(event);
+ } catch (AuthException | PermissionBackendException e) {
+ throw new EventRejectedException(event, e);
+ }
+ }
+
+ @Override
+ public void start() {
+ eventsBroker.get().receiveAsync(eventsTopicName, this);
+ }
+
+ @Override
+ public void stop() {
+ eventsBroker.get().disconnect();
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
index 0f092e5..c1ffa44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -68,6 +68,17 @@
@Override
public void onEvent(Event event) {
+ try {
+ fetchRefsForEvent(event);
+ } catch (AuthException | PermissionBackendException e) {
+ logger.atSevere().withCause(e).log(
+ "This is the event handler of Gerrit's event-bus. It isn't"
+ + "supposed to throw any exception, otherwise the other handlers "
+ + "won't be executed");
+ }
+ }
+
+ public void fetchRefsForEvent(Event event) throws AuthException, PermissionBackendException {
if (!instanceId.equals(event.instanceId)) {
PullReplicationApiRequestMetrics metrics = metricsProvider.get();
metrics.start(event);
@@ -93,6 +104,7 @@
} catch (AuthException | PermissionBackendException e) {
logger.atSevere().withCause(e).log(
"Cannot initialise project:%s", projectCreatedEvent.projectName);
+ throw e;
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
index 2389678..756e39a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -22,6 +22,7 @@
@Override
protected void configure() {
+ bind(StreamEventListener.class);
DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
}
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index d3a2859..51b10d8 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -136,11 +136,34 @@
and multi-site to provide backfill mechanism when a node has to
catch up with the events after being unreachable.
- NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+ When `consumeStreamEvents` is enabled gerrit.instanceId
instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
+ Using pull-replication standalone with a broker is also possible.
+ Check the replication.eventBrokerTopic parameter.
+
Default: false
+replication.eventBrokerTopic
+: Topic to consumer stream events from.
+
+ Pull-replication can receive stream events and use it as
+ notification mechanism as alternative to REST API notifications.
+ It can work in standalone, not necessarily with the multi-site plugin
+ (check replication.consumerStreamEvents if you need to use it with
+ multi-site). This parameter is used to define the topic to consume
+ stream events messages from when using the pull-replication in
+ standalone with a broker.
+
+ When `eventBrokerTopic` is enabled gerrit.instanceId
+ instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel)
+ must be used.
+
+ Bear in mind that if consumeStreamEvents is set to true this
+ parameter will be ignored.
+
+ Default: unset
+
replication.maxConnectionsPerRoute
: Maximum number of HTTP connections per one HTTP route.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
new file mode 100644
index 0000000..394a8a6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -0,0 +1,66 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventsBrokerMessageConsumerTest {
+
+ @Mock private StreamEventListener eventListener;
+ @Mock DynamicItem<BrokerApi> eventsBroker;
+
+ EventsBrokerMessageConsumer objectUnderTest;
+
+ @Before
+ public void setup() {
+ objectUnderTest = new EventsBrokerMessageConsumer(eventsBroker, eventListener, "topicName");
+ }
+
+ @Test
+ public void shouldRethrowExceptionWhenFetchThrowsAuthException()
+ throws AuthException, PermissionBackendException {
+ doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any());
+ assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent()));
+ }
+
+ @Test
+ public void shouldRethrowExceptionWhenFetchThrowsPermissionBackendException()
+ throws AuthException, PermissionBackendException {
+ doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any());
+ assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent()));
+ }
+
+ @Test
+ public void shouldNotThrowExceptionWhenFetchSucceed()
+ throws AuthException, PermissionBackendException {
+ doNothing().when(eventListener).fetchRefsForEvent(any());
+ objectUnderTest.accept(new RefUpdatedEvent());
+ }
+}