Merge branch 'stable-3.6' into stable-3.7

* stable-3.6:
  Fix typos in docs
  Receive stream events from events-broker directly

Change-Id: I1fa238877af95985356c39c9deadf0c450d8cb64
diff --git a/BUILD b/BUILD
index 02ac689..e1f435d 100644
--- a/BUILD
+++ b/BUILD
@@ -16,6 +16,7 @@
     deps = [
         "//lib/commons:io",
         "//plugins/replication",
+        "@events-broker//jar:neverlink",
     ],
 )
 
@@ -30,7 +31,8 @@
     deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
         ":pull-replication__plugin",
         ":pull_replication_util",
-        "//plugins/replication:replication",
+        "//plugins/replication",
+        "@events-broker//jar",
     ],
 )
 
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
new file mode 100644
index 0000000..f1f162e
--- /dev/null
+++ b/external_plugin_deps.bzl
@@ -0,0 +1,8 @@
+load("//tools/bzl:maven_jar.bzl", "maven_jar")
+
+def external_plugin_deps():
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.4.0.4",
+        sha1 = "8d361d863382290e33828116e65698190118d0f1",
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 1671410..cd19be4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -49,6 +49,7 @@
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
 import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
+import com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule;
 import com.googlesource.gerrit.plugins.replication.pull.event.FetchRefReplicatedEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.event.StreamEventModule;
 import com.googlesource.gerrit.plugins.replication.pull.fetch.ApplyObject;
@@ -139,8 +140,11 @@
       bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
+    String eventBrokerTopic = replicationConfig.getString("replication", null, "eventBrokerTopic");
     if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
       install(new StreamEventModule());
+    } else if (eventBrokerTopic != null) {
+      install(new EventsBrokerConsumerModule(eventBrokerTopic));
     }
 
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
new file mode 100644
index 0000000..9c2d737
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventRejectedException.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.server.events.Event;
+
+public class EventRejectedException extends IllegalStateException {
+
+  private static final long serialVersionUID = 3404537560735602192L;
+
+  public EventRejectedException(Event event, Throwable cause) {
+    super(String.format("Unable to accept event %s", event), cause);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
new file mode 100644
index 0000000..ce1a076
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
+import com.google.inject.name.Names;
+
+public class EventsBrokerConsumerModule extends LifecycleModule {
+  public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+
+  private final String topicName;
+
+  public EventsBrokerConsumerModule(String topicName) {
+    this.topicName = topicName;
+  }
+
+  @Override
+  protected void configure() {
+    bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
+    bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
+
+    listener().to(EventsBrokerMessageConsumer.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
new file mode 100644
index 0000000..6fda9d3
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import java.util.function.Consumer;
+
+public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
+
+  private final DynamicItem<BrokerApi> eventsBroker;
+  private final StreamEventListener eventListener;
+  private final String eventsTopicName;
+
+  @Inject
+  public EventsBrokerMessageConsumer(
+      DynamicItem<BrokerApi> eventsBroker,
+      StreamEventListener eventListener,
+      @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+
+    this.eventsBroker = eventsBroker;
+    this.eventListener = eventListener;
+    this.eventsTopicName = eventsTopicName;
+  }
+
+  @Override
+  public void accept(Event event) {
+    try {
+      eventListener.fetchRefsForEvent(event);
+    } catch (AuthException | PermissionBackendException e) {
+      throw new EventRejectedException(event, e);
+    }
+  }
+
+  @Override
+  public void start() {
+    eventsBroker.get().receiveAsync(eventsTopicName, this);
+  }
+
+  @Override
+  public void stop() {
+    eventsBroker.get().disconnect();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
index 0f092e5..c1ffa44 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventListener.java
@@ -68,6 +68,17 @@
 
   @Override
   public void onEvent(Event event) {
+    try {
+      fetchRefsForEvent(event);
+    } catch (AuthException | PermissionBackendException e) {
+      logger.atSevere().withCause(e).log(
+          "This is the event handler of Gerrit's event-bus. It isn't"
+              + "supposed to throw any exception, otherwise the other handlers "
+              + "won't be executed");
+    }
+  }
+
+  public void fetchRefsForEvent(Event event) throws AuthException, PermissionBackendException {
     if (!instanceId.equals(event.instanceId)) {
       PullReplicationApiRequestMetrics metrics = metricsProvider.get();
       metrics.start(event);
@@ -93,6 +104,7 @@
         } catch (AuthException | PermissionBackendException e) {
           logger.atSevere().withCause(e).log(
               "Cannot initialise project:%s", projectCreatedEvent.projectName);
+          throw e;
         }
       }
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
index 2389678..756e39a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/StreamEventModule.java
@@ -22,6 +22,7 @@
 
   @Override
   protected void configure() {
+    bind(StreamEventListener.class);
     DynamicSet.bind(binder(), EventListener.class).to(StreamEventListener.class);
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index d3a2859..51b10d8 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -136,11 +136,34 @@
 	and multi-site to provide backfill mechanism when a node has to
 	catch up with the events after being unreachable.
 
-	NOTE: When `consumeStreamEvents` is enabled gerrit.instanceId
+	When `consumeStreamEvents` is enabled gerrit.instanceId
 	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel) must be used.
 
+	Using pull-replication standalone with a broker is also possible.
+	Check the replication.eventBrokerTopic parameter.
+
 	Default: false
 
+replication.eventBrokerTopic
+:	Topic to consumer stream events from.
+
+	Pull-replication can receive stream events and use it as
+	notification mechanism as alternative to REST API notifications.
+	It can work in standalone, not necessarily with the multi-site plugin
+	(check replication.consumerStreamEvents if you need to use it with
+	multi-site). This parameter is used to define the topic to consume
+	stream events messages from when using the pull-replication in
+	standalone with a broker.
+
+	When `eventBrokerTopic` is enabled gerrit.instanceId
+	instead of [replication.instanceLabel](https://gerrit.googlesource.com/plugins/pull-replication/+/refs/heads/stable-3.4/src/main/resources/Documentation/config.md#replication.instanceLabel)
+	must be used.
+
+	Bear in mind that if consumeStreamEvents is set to true this
+	parameter will be ignored.
+
+	Default: unset
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
new file mode 100644
index 0000000..394a8a6
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -0,0 +1,66 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.event;
+
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventsBrokerMessageConsumerTest {
+
+  @Mock private StreamEventListener eventListener;
+  @Mock DynamicItem<BrokerApi> eventsBroker;
+
+  EventsBrokerMessageConsumer objectUnderTest;
+
+  @Before
+  public void setup() {
+    objectUnderTest = new EventsBrokerMessageConsumer(eventsBroker, eventListener, "topicName");
+  }
+
+  @Test
+  public void shouldRethrowExceptionWhenFetchThrowsAuthException()
+      throws AuthException, PermissionBackendException {
+    doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any());
+    assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent()));
+  }
+
+  @Test
+  public void shouldRethrowExceptionWhenFetchThrowsPermissionBackendException()
+      throws AuthException, PermissionBackendException {
+    doThrow(PermissionBackendException.class).when(eventListener).fetchRefsForEvent(any());
+    assertThrows(EventRejectedException.class, () -> objectUnderTest.accept(new RefUpdatedEvent()));
+  }
+
+  @Test
+  public void shouldNotThrowExceptionWhenFetchSucceed()
+      throws AuthException, PermissionBackendException {
+    doNothing().when(eventListener).fetchRefsForEvent(any());
+    objectUnderTest.accept(new RefUpdatedEvent());
+  }
+}