Merge branch 'stable-3.5' into stable-3.6 * stable-3.5: Fix typos in docs Receive stream events from events-broker directly Change-Id: Ic370bfffda8ea15d8875b9dd4d32649d9ccb171b
diff --git a/BUILD b/BUILD index 02ac689..e1f435d 100644 --- a/BUILD +++ b/BUILD
@@ -16,6 +16,7 @@ deps = [ "//lib/commons:io", "//plugins/replication", + "@events-broker//jar:neverlink", ], ) @@ -30,7 +31,8 @@ deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [ ":pull-replication__plugin", ":pull_replication_util", - "//plugins/replication:replication", + "//plugins/replication", + "@events-broker//jar", ], )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl new file mode 100644 index 0000000..f1f162e --- /dev/null +++ b/external_plugin_deps.bzl
@@ -0,0 +1,8 @@ +load("//tools/bzl:maven_jar.bzl", "maven_jar") + +def external_plugin_deps(): + maven_jar( + name = "events-broker", + artifact = "com.gerritforge:events-broker:3.4.0.4", + sha1 = "8d361d863382290e33828116e65698190118d0f1", + )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java index 1671410..cd19be4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -49,6 +49,7 @@ import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient; import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient; import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient; +import com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule; import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule; import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule; import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject; @@ -139,8 +140,11 @@ bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON); } + String eventBrokerTopic = replicationConfig.getString("replication", null, "eventBrokerTopic"); if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) { install(new StreamEventModule()); + } else if (eventBrokerTopic != null) { + install(new EventsBrokerConsumerModule(eventBrokerTopic)); } DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java new file mode 100644 index 0000000..9c2d737 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
@@ -0,0 +1,26 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.event; + +import com.google.gerrit.server.events.Event; + +public class EventRejectedException extends IllegalStateException { + + private static final long serialVersionUID = 3404537560735602192L; + + public EventRejectedException(Event event, Throwable cause) { + super(String.format("Unable to accept event %s", event), cause); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java new file mode 100644 index 0000000..ce1a076 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -0,0 +1,37 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.event; + +import com.google.gerrit.lifecycle.LifecycleModule; +import com.google.inject.Scopes; +import com.google.inject.name.Names; + +public class EventsBrokerConsumerModule extends LifecycleModule { + public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name"; + + private final String topicName; + + public EventsBrokerConsumerModule(String topicName) { + this.topicName = topicName; + } + + @Override + protected void configure() { + bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON); + bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName); + + listener().to(EventsBrokerMessageConsumer.class); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java new file mode 100644 index 0000000..6fda9d3 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -0,0 +1,64 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.event; + +import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME; + +import com.gerritforge.gerrit.eventbroker.BrokerApi; +import com.google.gerrit.extensions.events.LifecycleListener; +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.extensions.restapi.AuthException; +import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.permissions.PermissionBackendException; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import java.util.function.Consumer; + +public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener { + + private final DynamicItem<BrokerApi> eventsBroker; + private final StreamEventListener eventListener; + private final String eventsTopicName; + + @Inject + public EventsBrokerMessageConsumer( + DynamicItem<BrokerApi> eventsBroker, + StreamEventListener eventListener, + @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) { + + this.eventsBroker = eventsBroker; + this.eventListener = eventListener; + this.eventsTopicName = eventsTopicName; + } + + @Override + public void accept(Event event) { + try { + eventListener.fetchRefsForEvent(event); + } catch (AuthException | PermissionBackendException e) { + throw new EventRejectedException(event, e); + } + } + + @Override + public void start() { + eventsBroker.get().receiveAsync(eventsTopicName, this); + } + + @Override + public void stop() { + eventsBroker.get().disconnect(); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java index 0f092e5..c1ffa44 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -68,6 +68,17 @@ @Override public void onEvent(Event event) { + try { + fetchRefsForEvent(event); + } catch (AuthException | PermissionBackendException e) { + logger.atSevere().withCause(e).log( + "This is the event handler of Gerrit's event-bus. It isn't" + + "supposed to throw any exception, otherwise the other handlers " + + "won't be executed"); + } + } + + public void fetchRefsForEvent(Event event) throws AuthException, PermissionBackendException { if (!instanceId.equals(event.instanceId)) { PullReplicationApiRequestMetrics metrics = metricsProvider.get(); metrics.start(event); @@ -93,6 +104,7 @@ } catch (AuthException | PermissionBackendException e) { logger.atSevere().withCause(e).log( "Cannot initialise project:%s", projectCreatedEvent.projectName); + throw e; } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java index 2389678..756e39a 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -22,6 +22,7 @@ @Override protected void configure() { + bind(StreamEventListener.class); DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class); } }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index d3a2859..51b10d8 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -136,11 +136,34 @@ and multi-site to provide backfill mechanism when a node has to catch up with the events after being unreachable. - NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId + When `consumeStreamEvents` is enabled gerrit.instanceId instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used. + Using pull-replication standalone with a broker is also possible. + Check the replication.eventBrokerTopic parameter. + Default: false +replication.eventBrokerTopic +: Topic to consumer stream events from. + + Pull-replication can receive stream events and use it as + notification mechanism as alternative to REST API notifications. + It can work in standalone, not necessarily with the multi-site plugin + (check replication.consumerStreamEvents if you need to use it with + multi-site). This parameter is used to define the topic to consume + stream events messages from when using the pull-replication in + standalone with a broker. + + When `eventBrokerTopic` is enabled gerrit.instanceId + instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) + must be used. + + Bear in mind that if consumeStreamEvents is set to true this + parameter will be ignored. + + Default: unset + replication.maxConnectionsPerRoute : Maximum number of HTTP connections per one HTTP route.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java new file mode 100644 index 0000000..394a8a6 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -0,0 +1,66 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication.pull.event; + +import static com.google.gerrit.testing.GerritJUnit.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; + +import com.gerritforge.gerrit.eventbroker.BrokerApi; +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.extensions.restapi.AuthException; +import com.google.gerrit.server.events.RefUpdatedEvent; +import com.google.gerrit.server.permissions.PermissionBackendException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class EventsBrokerMessageConsumerTest { + + @Mock private StreamEventListener eventListener; + @Mock DynamicItem<BrokerApi> eventsBroker; + + EventsBrokerMessageConsumer objectUnderTest; + + @Before + public void setup() { + objectUnderTest = new EventsBrokerMessageConsumer(eventsBroker, eventListener, "topicName"); + } + + @Test + public void shouldRethrowExceptionWhenFetchThrowsAuthException() + throws AuthException, PermissionBackendException { + doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any()); + assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent())); + } + + @Test + public void shouldRethrowExceptionWhenFetchThrowsPermissionBackendException() + throws AuthException, PermissionBackendException { + doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any()); + assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent())); + } + + @Test + public void shouldNotThrowExceptionWhenFetchSucceed() + throws AuthException, PermissionBackendException { + doNothing().when(eventListener).fetchRefsForEvent(any()); + objectUnderTest.accept(new RefUpdatedEvent()); + } +}