Merge branch 'stable-3.8'
* stable-3.8:
Fix update HEAD action detection from BearerAuthenticationFilter
Allow subscribing to the topic with custom group id
Consume events-broker from source
Clone also events-broker for build validation
Fix issue with using cgit client to fetch without authentication
Change-Id: Ide7aeaf593994e81c0ac69370216fa9c078579f6
diff --git a/BUILD b/BUILD
index c139c5a..53b55dc 100644
--- a/BUILD
+++ b/BUILD
@@ -16,11 +16,11 @@
],
resources = glob(["src/main/resources/**/*"]),
deps = [
+ ":events-broker-neverlink",
"//lib/commons:io",
"//plugins/delete-project",
"//plugins/replication",
"@commons-lang3//jar",
- "@events-broker//jar:neverlink",
],
)
@@ -37,7 +37,7 @@
":pull_replication_util",
"//plugins/delete-project",
"//plugins/replication",
- "@events-broker//jar",
+ "//plugins/events-broker",
],
)
@@ -76,6 +76,12 @@
visibility = ["//visibility:public"],
exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
":pull-replication__plugin",
- "@events-broker//jar",
+ "//plugins/events-broker",
],
)
+
+java_library(
+ name = "events-broker-neverlink",
+ neverlink = 1,
+ exports = ["//plugins/events-broker"],
+)
diff --git a/Jenkinsfile b/Jenkinsfile
index 448692b..a4fff37 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,2 +1,3 @@
pluginPipeline(formatCheckId: 'gerritforge:pull-replication-format-3852e64366bb37d13b8baf8af9b15cfd38eb9227',
- buildCheckId: 'gerritforge:pull-replication-3852e64366bb37d13b8baf8af9b15cfd38eb9227')
\ No newline at end of file
+ buildCheckId: 'gerritforge:pull-replication-3852e64366bb37d13b8baf8af9b15cfd38eb9227',
+ extraModules: ['events-broker'])
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
deleted file mode 100644
index 3bf728d..0000000
--- a/external_plugin_deps.bzl
+++ /dev/null
@@ -1,8 +0,0 @@
-load("//tools/bzl:maven_jar.bzl", "maven_jar")
-
-def external_plugin_deps():
- maven_jar(
- name = "events-broker",
- artifact = "com.gerritforge:events-broker:3.6.0-rc3",
- sha1 = "cb398afa4f76367be5c62b99a7ffce74ae1d3d8b",
- )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 93dfeb8..cbdfe1b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -153,7 +153,7 @@
if (replicationConfig.getBoolean("replication", "consumeStreamEvents", false)) {
install(new StreamEventModule());
} else if (eventBrokerTopic != null) {
- install(new EventsBrokerConsumerModule(eventBrokerTopic));
+ install(new EventsBrokerConsumerModule(eventBrokerTopic, replicationConfig));
}
DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
index 71cbf2e..64a0bd8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilter.java
@@ -85,7 +85,7 @@
if (isBasicAuthenticationRequest(requestURI)) {
filterChain.doFilter(servletRequest, servletResponse);
- } else if (isPullReplicationApiRequest(requestURI)
+ } else if (isPullReplicationApiRequest(httpRequest.getMethod(), requestURI)
|| (isGitUploadPackRequest(httpRequest)
&& isAuthenticationHeaderWithBearerToken(authorizationHeader))) {
if (isBearerTokenAuthenticated(authorizationHeader, bearerToken))
@@ -121,7 +121,7 @@
return requestURI.startsWith("/a/");
}
- private boolean isPullReplicationApiRequest(String requestURI) {
+ private boolean isPullReplicationApiRequest(String requestMethod, String requestURI) {
return (requestURI.contains(pluginName)
&& (requestURI.endsWith(String.format("/%s~apply-object", pluginName))
|| requestURI.endsWith(String.format("/%s~apply-objects", pluginName))
@@ -130,7 +130,8 @@
|| requestURI.endsWith(String.format("/%s~batch-fetch", pluginName))
|| requestURI.endsWith(String.format("/%s~delete-project", pluginName))
|| requestURI.contains(String.format("/%s/init-project/", pluginName))))
- || requestURI.matches(".*/projects/[^/]+/HEAD");
+ || (requestURI.matches(String.format(".*/projects/[^/]+/%s~HEAD", pluginName))
+ && "PUT".equals(requestMethod));
}
private Optional<String> extractBearerToken(String authorizationHeader) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
index ce1a076..a8d5cf4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerConsumerModule.java
@@ -17,21 +17,28 @@
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
+import com.google.inject.util.Providers;
+import org.eclipse.jgit.lib.Config;
public class EventsBrokerConsumerModule extends LifecycleModule {
public static final String STREAM_EVENTS_TOPIC_NAME = "stream_events_topic_name";
+ public static final String STREAM_EVENTS_GROUP_ID = "stream_events_group_id";
private final String topicName;
+ private final Config config;
- public EventsBrokerConsumerModule(String topicName) {
+ public EventsBrokerConsumerModule(String topicName, Config config) {
this.topicName = topicName;
+ this.config = config;
}
@Override
protected void configure() {
bind(EventsBrokerMessageConsumer.class).in(Scopes.SINGLETON);
bind(String.class).annotatedWith(Names.named(STREAM_EVENTS_TOPIC_NAME)).toInstance(topicName);
-
+ bind(String.class)
+ .annotatedWith(Names.named(STREAM_EVENTS_GROUP_ID))
+ .toProvider(Providers.of(config.getString("replication", null, "eventBrokerGroupId")));
listener().to(EventsBrokerMessageConsumer.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
index d06681b..13b038e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumer.java
@@ -14,9 +14,13 @@
package com.googlesource.gerrit.plugins.replication.pull.event;
+import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_GROUP_ID;
import static com.googlesource.gerrit.plugins.replication.pull.event.EventsBrokerConsumerModule.STREAM_EVENTS_TOPIC_NAME;
import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.ExtendedBrokerApi;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
@@ -29,22 +33,26 @@
public class EventsBrokerMessageConsumer implements Consumer<Event>, LifecycleListener {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final DynamicItem<BrokerApi> eventsBroker;
private final StreamEventListener eventListener;
private final ShutdownState shutdownState;
private final String eventsTopicName;
+ private final String groupId;
@Inject
public EventsBrokerMessageConsumer(
DynamicItem<BrokerApi> eventsBroker,
StreamEventListener eventListener,
ShutdownState shutdownState,
- @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName) {
+ @Named(STREAM_EVENTS_TOPIC_NAME) String eventsTopicName,
+ @Nullable @Named(STREAM_EVENTS_GROUP_ID) String groupId) {
this.eventsBroker = eventsBroker;
this.eventListener = eventListener;
this.shutdownState = shutdownState;
this.eventsTopicName = eventsTopicName;
+ this.groupId = groupId;
}
@Override
@@ -59,7 +67,21 @@
@Override
public void start() {
- eventsBroker.get().receiveAsync(eventsTopicName, this);
+ BrokerApi brokerApi = eventsBroker.get();
+ if (groupId == null) {
+ brokerApi.receiveAsync(eventsTopicName, this);
+ return;
+ }
+
+ if (!(eventsBroker instanceof ExtendedBrokerApi)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to load the pull-replication plugin: %s does not support the custom group-id '%s'.\n"
+ + "Remove replication.eventBrokerGroupId from replication.config or install a different event-broker plugin.",
+ eventsBroker.getClass(), groupId));
+ }
+
+ ((ExtendedBrokerApi) brokerApi).receiveAsync(eventsTopicName, groupId, this);
}
@Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 128179c..0c31995 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -222,6 +222,16 @@
Default: unset
+replication.eventBrokerGroupId
+: Optional group id for consumers. Used for keeping the offset of the
+ the last acked message on the topic with a specific group id; supported
+ only if the event-broker plugin supports the ExtendedBrokerApi
+ interface.
+ When not set, the messages received are acked using a global group id
+ shared by all subscribers in Gerrit.
+
+ Default: unset
+
replication.maxConnectionsPerRoute
: Maximum number of HTTP connections per one HTTP route.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
index dc6a12f..5babf4d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/BearerAuthenticationFilterTest.java
@@ -52,6 +52,12 @@
@Mock private FilterChain filterChain;
private final String pluginName = "pull-replication";
+ private void authenticateAndFilter(String method, String uri, Optional<String> queryStringMaybe)
+ throws ServletException, IOException {
+ when(httpServletRequest.getMethod()).thenReturn(method);
+ authenticateAndFilter(uri, queryStringMaybe);
+ }
+
private void authenticateAndFilter(String uri, Optional<String> queryStringMaybe)
throws ServletException, IOException {
final String bearerToken = "some-bearer-token";
@@ -107,7 +113,8 @@
@Test
public void shouldAuthenticateWhenUpdateHead() throws ServletException, IOException {
- authenticateAndFilter("any-prefix/projects/my-project/HEAD", NO_QUERY_PARAMETERS);
+ authenticateAndFilter(
+ "PUT", "any-prefix/projects/my-project/pull-replication~HEAD", NO_QUERY_PARAMETERS);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
index 9640b82..8739d4b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/event/EventsBrokerMessageConsumerTest.java
@@ -49,7 +49,7 @@
shutdownState = new ShutdownState();
objectUnderTest =
new EventsBrokerMessageConsumer(
- eventsBrokerDynamicItem, eventListener, shutdownState, "topicName");
+ eventsBrokerDynamicItem, eventListener, shutdownState, "topicName", null);
}
@Test