Merge branch 'stable-3.8'

* stable-3.8:
  Allow subscribing to the topic with custom group id
  Consume events-broker from source
  Clone also events-broker for build validation
  Fix issue with using cgit client to fetch without authentication

Change-Id: Ic83c6888291174fe2fa966945adaafdeff45e789
diff --git a/BUILD b/BUILD
index c139c5a..53b55dc 100644
--- a/BUILD
+++ b/BUILD
@@ -16,11 +16,11 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
+        ":events-broker-neverlink",
         "//lib/commons:io",
         "//plugins/delete-project",
         "//plugins/replication",
         "@commons-lang3//jar",
-        "@events-broker//jar:neverlink",
     ],
 )
 
@@ -37,7 +37,7 @@
         ":pull_replication_util",
         "//plugins/delete-project",
         "//plugins/replication",
-        "@events-broker//jar",
+        "//plugins/events-broker",
     ],
 )
 
@@ -76,6 +76,12 @@
     visibility = ["//visibility:public"],
     exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
         ":pull-replication__plugin",
-        "@events-broker//jar",
+        "//plugins/events-broker",
     ],
 )
+
+java_library(
+    name = "events-broker-neverlink",
+    neverlink = 1,
+    exports = ["//plugins/events-broker"],
+)
diff --git a/Jenkinsfile b/Jenkinsfile
index 448692b..a4fff37 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,3 @@
 pluginPipeline(formatCheckId: 'gerritforge:pull-replication-format-3852e64366bb37d13b8baf8af9b15cfd38eb9227',
-               buildCheckId: 'gerritforge:pull-replication-3852e64366bb37d13b8baf8af9b15cfd38eb9227')
\ No newline at end of file
+               buildCheckId: 'gerritforge:pull-replication-3852e64366bb37d13b8baf8af9b15cfd38eb9227',
+               extraModules: ['events-broker'])
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
deleted file mode 100644
index 3bf728d..0000000
--- a/external_plugin_deps.bzl
+++ /dev/null
@@ -1,8 +0,0 @@
-load("//tools/bzl:maven_jar.bzl", "maven_jar")
-
-def external_plugin_deps():
-    maven_jar(
-        name = "events-broker",
-        artifact = "com.gerritforge:events-broker:3.6.0-rc3",
-        sha1 = "cb398afa4f76367be5c62b99a7ffce74ae1d3d8b",
-    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 93dfeb8..cbdfe1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -153,7 +153,7 @@
     if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
       install(new StreamEventModule());
     } else if (eventBrokerTopic != null) {
-      install(new EventsBrokerConsumerModule(eventBrokerTopic));
+      install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig));
     }
 
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
index ce1a076..a8d5cf4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -17,21 +17,28 @@
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Scopes;
 import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
+import org.eclipse.jgit.lib.Config;
 
 public class EventsBrokerConsumerModule extends LifecycleModule {
   public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+  public static final String STREAM_EVENTS_GROUP_ID = "stream_events_group_id";
 
   private final String topicName;
+  private final Config config;
 
-  public EventsBrokerConsumerModule(String topicName) {
+  public EventsBrokerConsumerModule(String topicName, Config config) {
     this.topicName = topicName;
+    this.config = config;
   }
 
   @Override
   protected void configure() {
     bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
     bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
-
+    bind(String.class)
+        .annotatedWith(Names.named(STREAM_EVENTS_GROUP_ID))
+        .toProvider(Providers.of(config.getString("replication", null, "eventBrokerGroupId")));
     listener().to(EventsBrokerMessageConsumer.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
index d06681b..13b038e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -14,9 +14,13 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.event;
 
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_GROUP_ID;
 import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
 
 import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.restapi.AuthException;
@@ -29,22 +33,26 @@
 
 public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
 
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
   private final DynamicItem<BrokerApi> eventsBroker;
   private final StreamEventListener eventListener;
   private final ShutdownState shutdownState;
   private final String eventsTopicName;
+  private final String groupId;
 
   @Inject
   public EventsBrokerMessageConsumer(
       DynamicItem<BrokerApi> eventsBroker,
       StreamEventListener eventListener,
       ShutdownState shutdownState,
-      @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+      @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName,
+      @Nullable @Named(STREAM_EVENTS_GROUP_ID) String groupId) {
 
     this.eventsBroker = eventsBroker;
     this.eventListener = eventListener;
     this.shutdownState = shutdownState;
     this.eventsTopicName = eventsTopicName;
+    this.groupId = groupId;
   }
 
   @Override
@@ -59,7 +67,21 @@
 
   @Override
   public void start() {
-    eventsBroker.get().receiveAsync(eventsTopicName, this);
+    BrokerApi brokerApi = eventsBroker.get();
+    if (groupId == null) {
+      brokerApi.receiveAsync(eventsTopicName, this);
+      return;
+    }
+
+    if (!(eventsBroker instanceof ExtendedBrokerApi)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Failed to load the pull-replication plugin: %s does not support the custom group-id '%s'.\n"
+                  + "Remove replication.eventBrokerGroupId from replication.config or install a different event-broker plugin.",
+              eventsBroker.getClass(), groupId));
+    }
+
+    ((ExtendedBrokerApi) brokerApi).receiveAsync(eventsTopicName, groupId, this);
   }
 
   @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 128179c..0c31995 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -222,6 +222,16 @@
 
 	Default: unset
 
+replication.eventBrokerGroupId
+:	Optional group id for consumers. Used for keeping the offset of the
+	the last acked message on the topic with a specific group id; supported
+	only if the event-broker plugin supports the ExtendedBrokerApi
+	interface.
+	When not set, the messages received are acked using a global group id
+	shared by all subscribers in Gerrit.
+
+	Default: unset
+
 replication.maxConnectionsPerRoute
 :	Maximum number of HTTP connections per one HTTP route.
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
index 9640b82..8739d4b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -49,7 +49,7 @@
     shutdownState = new ShutdownState();
     objectUnderTest =
         new EventsBrokerMessageConsumer(
-            eventsBrokerDynamicItem, eventListener, shutdownState, "topicName");
+            eventsBrokerDynamicItem, eventListener, shutdownState, "topicName", null);
   }
 
   @Test