Receive stream events from events-broker directly

Gerrit EventBus was not designed to manage the receiver
failures, making the execution of replication unreliable
without the implementation of custom-retry logic based on
replication tasks.

Receive the pull replication events directly from the events-broker
which is capable of controlling the acking of messages and their
redeliver in case of failures.

This requires the additional parameter replication.streamEventsTopic
which points to the topic name to use for receiving events.

Pull-replication will be able to work:
- alongside multi-site, by setting "replication.consumeStreamEvents"
  to true
- standalone by setting the topic in "replication.eventBrokerTopic"

Note that if replication.consumeStreamEvents is set to true
replication.eventBrokerTopic will be ignored.

Issue: Bug 16575
Change-Id: Ibffed26eda4ca852e2599ffe664e8d39e35e9dd3
diff --git a/BUILD b/BUILD
index 5c5f240..e1f435d 100644
--- a/BUILD
+++ b/BUILD
@@ -10,12 +10,13 @@
         "Gerrit-PluginName: pull-replication",
         "Gerrit-Module: com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
         "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.pull.SshModule",
-        "Gerrit-HttpModule: com.googlesource.gerrit.plugins.replication.pull.api.HttpModule"
+        "Gerrit-HttpModule: com.googlesource.gerrit.plugins.replication.pull.api.HttpModule",
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
         "//lib/commons:io",
-        "//plugins/replication:replication",
+        "//plugins/replication",
+        "@events-broker//jar:neverlink",
     ],
 )
 
@@ -30,7 +31,8 @@
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
         ":pull_replication_util",
-        "//plugins/replication:replication",
+        "//plugins/replication",
+        "@events-broker//jar",
     ],
 )
 
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..f1f162e
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,8 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+def external_plugin_deps():
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 1671410..cd19be4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -49,6 +49,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
+import com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule;
 import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
@@ -139,8 +140,11 @@
       bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
+    String eventBrokerTopic = replicationConfig.getString("replication", null, "eventBrokerTopic");
     if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
       install(new StreamEventModule());
+    } else if (eventBrokerTopic != null) {
+      install(new EventsBrokerConsumerModule(eventBrokerTopic));
     }
 
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
new file mode 100644
index 0000000..9c2d737
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.server.events.Event;
+
+public class EventRejectedException extends IllegalStateException {
+
+  private static final long serialVersionUID = 3404537560735602192L;
+
+  public EventRejectedException(Event event, Throwable cause) {
+    super(String.format("Unable to accept event %s", event), cause);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
new file mode 100644
index 0000000..ce1a076
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+
+public class EventsBrokerConsumerModule extends LifecycleModule {
+  public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+
+  private final String topicName;
+
+  public EventsBrokerConsumerModule(String topicName) {
+    this.topicName = topicName;
+  }
+
+  @Override
+  protected void configure() {
+    bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
+    bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
+
+    listener().to(EventsBrokerMessageConsumer.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
new file mode 100644
index 0000000..6fda9d3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import java.util.function.Consumer;
+
+public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
+
+  private final DynamicItem<BrokerApi> eventsBroker;
+  private final StreamEventListener eventListener;
+  private final String eventsTopicName;
+
+  @Inject
+  public EventsBrokerMessageConsumer(
+      DynamicItem<BrokerApi> eventsBroker,
+      StreamEventListener eventListener,
+      @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+
+    this.eventsBroker = eventsBroker;
+    this.eventListener = eventListener;
+    this.eventsTopicName = eventsTopicName;
+  }
+
+  @Override
+  public void accept(Event event) {
+    try {
+      eventListener.fetchRefsForEvent(event);
+    } catch (AuthException | PermissionBackendException e) {
+      throw new EventRejectedException(event, e);
+    }
+  }
+
+  @Override
+  public void start() {
+    eventsBroker.get().receiveAsync(eventsTopicName, this);
+  }
+
+  @Override
+  public void stop() {
+    eventsBroker.get().disconnect();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
index 0f092e5..c1ffa44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -68,6 +68,17 @@
 
   @Override
   public void onEvent(Event event) {
+    try {
+      fetchRefsForEvent(event);
+    } catch (AuthException | PermissionBackendException e) {
+      logger.atSevere().withCause(e).log(
+          "This is the event handler of Gerrit's event-bus. It isn't"
+              + "supposed to throw any exception, otherwise the other handlers "
+              + "won't be executed");
+    }
+  }
+
+  public void fetchRefsForEvent(Event event) throws AuthException, PermissionBackendException {
     if (!instanceId.equals(event.instanceId)) {
       PullReplicationApiRequestMetrics metrics = metricsProvider.get();
       metrics.start(event);
@@ -93,6 +104,7 @@
         } catch (AuthException | PermissionBackendException e) {
           logger.atSevere().withCause(e).log(
               "Cannot initialise project:%s", projectCreatedEvent.projectName);
+          throw e;
         }
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
index 2389678..756e39a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -22,6 +22,7 @@
 
   @Override
   protected void configure() {
+    bind(StreamEventListener.class);
     DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index d3a2859..60994ba 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -136,11 +136,34 @@
 	and multi-site to provide backfill mechanism when a node has to
 	catch up with the events after being unreachable.
 
-	NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+	When `consumeStreamEvents` is enabled gerrit.instanceId
 	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
 
+	Using pull-replication standalone with a broker is also possible.
+	Check the replication.eventBrokerTopic parameter.
+
 	Default: false
 
+replication.eventBrokerTopic
+:	Topic to consumer stream events from.
+
+	Pull-replication can receive stream events and use it as
+	notfication mechanism as alternative to REST API notifications.
+	It can work in standalone, not necessarely with the multi-site plugin
+	(check replication.consumerStreamEvents if you need to use it with
+	multi-site). This parameter is used to define the topic to consumer
+	stream events messages from when using the pull-replication in
+	standalone and a broker.
+
+	When `eventBrokerTopic` is enabled gerrit.instanceId
+	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel)
+	must be used.
+
+	Bear in mind that if consumeStreamEvents is set to true this
+	parameter will be ignored.
+
+	Default: unset
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
new file mode 100644
index 0000000..394a8a6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -0,0 +1,66 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventsBrokerMessageConsumerTest {
+
+  @Mock private StreamEventListener eventListener;
+  @Mock DynamicItem<BrokerApi> eventsBroker;
+
+  EventsBrokerMessageConsumer objectUnderTest;
+
+  @Before
+  public void setup() {
+    objectUnderTest = new EventsBrokerMessageConsumer(eventsBroker, eventListener, "topicName");
+  }
+
+  @Test
+  public void shouldRethrowExceptionWhenFetchThrowsAuthException()
+      throws AuthException, PermissionBackendException {
+    doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any());
+    assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent()));
+  }
+
+  @Test
+  public void shouldRethrowExceptionWhenFetchThrowsPermissionBackendException()
+      throws AuthException, PermissionBackendException {
+    doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any());
+    assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent()));
+  }
+
+  @Test
+  public void shouldNotThrowExceptionWhenFetchSucceed()
+      throws AuthException, PermissionBackendException {
+    doNothing().when(eventListener).fetchRefsForEvent(any());
+    objectUnderTest.accept(new RefUpdatedEvent());
+  }
+}