Merge branch 'stable-3.0'
* stable-3.0:
Remove kafka related code from multi-site
Fix NPE when removing refs
Leverage BrokerApi interface
Change-Id: Icd5ba630ac6b2fc2f4fe83dcf13510c83415860e
diff --git a/BUILD b/BUILD
index 6736634..eb2eeba 100644
--- a/BUILD
+++ b/BUILD
@@ -17,8 +17,8 @@
],
resources = glob(["src/main/resources/**/*"]),
deps = [
- "@kafka-client//jar",
"@global-refdb//jar",
+ "@events-broker//jar",
"//plugins/replication",
],
)
@@ -43,9 +43,7 @@
exports = PLUGIN_DEPS + PLUGIN_TEST_DEPS + [
":multi-site__plugin",
"@wiremock//jar",
- "@kafka-client//jar",
- "@testcontainers-kafka//jar",
- "//lib/testcontainers",
"@global-refdb//jar",
+ "@events-broker//jar",
],
)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index ca03c9d..7e8fee4 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -8,19 +8,13 @@
)
maven_jar(
- name = "kafka-client",
- artifact = "org.apache.kafka:kafka-clients:2.1.0",
- sha1 = "34d9983705c953b97abb01e1cd04647f47272fe5",
- )
-
- maven_jar(
- name = "testcontainers-kafka",
- artifact = "org.testcontainers:kafka:1.11.3",
- sha1 = "932d1baa2541f218b1b44a0546ae83d530011468",
- )
-
- maven_jar(
name = "global-refdb",
artifact = "com.gerritforge:global-refdb:3.1.0-rc1",
sha1 = "61fc8defaed9c364e6bfa101563e434fcc70038f",
)
+
+ maven_jar(
+ name = "events-broker",
+ artifact = "com.gerritforge:events-broker:3.1.2",
+ sha1 = "b4ed20d7be8a7023111c511ca5dc00ec18e9313a",
+ )
diff --git a/setup_local_env/configs/gerrit.config b/setup_local_env/configs/gerrit.config
index f0f56dc..3cdf940 100644
--- a/setup_local_env/configs/gerrit.config
+++ b/setup_local_env/configs/gerrit.config
@@ -36,4 +36,13 @@
[plugins]
allowRemoteAdmin = true
[plugin "websession-flatfile"]
- directory = $FAKE_NFS
\ No newline at end of file
+ directory = $FAKE_NFS
+[plugin "kafka-events"]
+ bootstrapServers = localhost:$KAFKA_PORT
+ groupId = $KAFKA_GROUP_ID
+ numberOfSubscribers = 4
+ securityProtocol = PLAINTEXT
+ pollingIntervalMs = 1000
+ enableAutoCommit = true
+ autoCommitIntervalMs = 1000
+ autoOffsetReset = latest
diff --git a/setup_local_env/configs/multi-site.config b/setup_local_env/configs/multi-site.config
index 5287f61..442c37e 100644
--- a/setup_local_env/configs/multi-site.config
+++ b/setup_local_env/configs/multi-site.config
@@ -1,17 +1,9 @@
[index]
maxTries = 50
retryInterval = 30000
-[kafka]
- bootstrapServers = localhost:$KAFKA_PORT
- securityProtocol = PLAINTEXT
- indexEventTopic = gerrit_index
- streamEventTopic = gerrit_stream
- projectListEventTopic = gerrit_list_project
- cacheEventTopic = gerrit_cache_eviction
-[kafka "subscriber"]
- pollingIntervalMs = 1000
- KafkaProp-enableAutoCommit = true
- KafkaProp-autoCommitIntervalMs = 1000
- KafkaProp-autoOffsetReset = latest
-[ref-database "zookeeper"]
- connectString = localhost:$ZK_PORT
+
+[broker]
+ indexEventTopic = gerrit_index
+ streamEventTopic = gerrit_stream
+ projectListEventTopic = gerrit_list_project
+ cacheEventTopic = gerrit_cache_eviction
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh
index faa6548..bcbaf9c 100755
--- a/setup_local_env/setup.sh
+++ b/setup_local_env/setup.sh
@@ -62,6 +62,7 @@
export GERRIT_HOSTNAME=$7
export REPLICATION_HOSTNAME=$8
export REMOTE_DEBUG_PORT=$9
+ export KAFKA_GROUP_ID=${10}
export REPLICATION_URL=$(get_replication_url $REPLICATION_LOCATION_TEST_SITE $REPLICATION_HOSTNAME)
echo "Replacing variables for file $file and copying to $CONFIG_TEST_SITE/$file_name"
@@ -105,19 +106,21 @@
GERRIT_SITE1_SSHD_PORT=$3
CONFIG_TEST_SITE_1=$LOCATION_TEST_SITE_1/etc
GERRIT_SITE1_REMOTE_DEBUG_PORT="5005"
+ GERRIT_SITE1_KAFKA_GROUP_ID="instance-1"
# SITE 2
GERRIT_SITE2_HOSTNAME=$4
GERRIT_SITE2_HTTPD_PORT=$5
GERRIT_SITE2_SSHD_PORT=$6
CONFIG_TEST_SITE_2=$LOCATION_TEST_SITE_2/etc
GERRIT_SITE2_REMOTE_DEBUG_PORT="5006"
+ GERRIT_SITE2_KAFKA_GROUP_ID="instance-2"
# Set config SITE1
- copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT
+ copy_config_files $CONFIG_TEST_SITE_1 $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_SSHD_PORT $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE1_REMOTE_DEBUG_PORT $GERRIT_SITE1_KAFKA_GROUP_ID
# Set config SITE2
- copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT
+ copy_config_files $CONFIG_TEST_SITE_2 $GERRIT_SITE2_HTTPD_PORT $LOCATION_TEST_SITE_2 $GERRIT_SITE2_SSHD_PORT $GERRIT_SITE1_HTTPD_PORT $LOCATION_TEST_SITE_1 $GERRIT_SITE1_HOSTNAME $GERRIT_SITE2_HOSTNAME $GERRIT_SITE2_REMOTE_DEBUG_PORT $GERRIT_SITE2_KAFKA_GROUP_ID
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 14c6be6..ebe28d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -18,6 +18,7 @@
import static com.google.common.base.Suppliers.ofInstance;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
@@ -62,6 +63,7 @@
private final Supplier<Index> index;
private final Supplier<SharedRefDatabase> sharedRefDb;
private final Supplier<Collection<Message>> replicationConfigValidation;
+ private final Supplier<Broker> broker;
private final Config multiSiteConfig;
@Inject
@@ -78,6 +80,7 @@
event = memoize(() -> new Event(lazyMultiSiteCfg));
index = memoize(() -> new Index(lazyMultiSiteCfg));
sharedRefDb = memoize(() -> new SharedRefDatabase(lazyMultiSiteCfg));
+ broker = memoize(() -> new Broker(lazyMultiSiteCfg));
}
public Config getMultiSiteConfig() {
@@ -100,6 +103,10 @@
return index.get();
}
+ public Broker broker() {
+ return broker.get();
+ }
+
public Collection<Message> validate() {
return replicationConfigValidation.get();
}
@@ -278,6 +285,19 @@
}
}
+ public static class Broker {
+ static final String BROKER_SECTION = "broker";
+ private final Config cfg;
+
+ Broker(Supplier<Config> cfgSupplier) {
+ cfg = cfgSupplier.get();
+ }
+
+ public String getTopic(String topicKey, String defValue) {
+ return MoreObjects.firstNonNull(cfg.getString(BROKER_SECTION, null, topicKey), defValue);
+ }
+ }
+
static boolean getBoolean(
Supplier<Config> cfg, String section, String subsection, String name, boolean defaultValue) {
try {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index db7dd63..7c88655 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,12 +14,14 @@
package com.googlesource.gerrit.plugins.multisite;
+import com.gerritforge.gerrit.eventbroker.EventGsonProvider;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.gerrit.extensions.systemstatus.ServerInformation;
import com.google.gerrit.server.util.PluginLogFile;
import com.google.gerrit.server.util.SystemLog;
+import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import org.apache.log4j.PatternLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,15 +30,18 @@
public class Log4jMessageLogger extends PluginLogFile implements MessageLogger {
private static final String LOG_NAME = "message_log";
private final Logger msgLog;
+ private final Gson gson;
@Inject
- public Log4jMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+ public Log4jMessageLogger(
+ SystemLog systemLog, ServerInformation serverInfo, EventGsonProvider gsonProvider) {
super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
- msgLog = LoggerFactory.getLogger(LOG_NAME);
+ this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+ this.gson = gsonProvider.get();
}
@Override
- public void log(Direction direction, SourceAwareEventWrapper event) {
- msgLog.info("{} Header[{}] Body[{}]", direction, event.getHeader(), event.getBody());
+ public void log(Direction direction, String topic, EventMessage event) {
+ msgLog.info("{} {} {}", direction, topic, gson.toJson(event));
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java
index ae5d96c..6a73d19 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jSharedRefLogger.java
@@ -63,17 +63,19 @@
RevWalk walk = new RevWalk(repository)) {
GitPerson committer = null;
String commitMessage = null;
- int objectType = walk.parseAny(newRefValue).getType();
- switch (objectType) {
- case OBJ_COMMIT:
- RevCommit commit = walk.parseCommit(newRefValue);
- committer = CommonConverters.toGitPerson(commit.getCommitterIdent());
- commitMessage = commit.getShortMessage();
- break;
- case OBJ_BLOB:
- break;
- default:
- throw new IncorrectObjectTypeException(newRefValue, Constants.typeString(objectType));
+ if (newRefValue != null) {
+ int objectType = walk.parseAny(newRefValue).getType();
+ switch (objectType) {
+ case OBJ_COMMIT:
+ RevCommit commit = walk.parseCommit(newRefValue);
+ committer = CommonConverters.toGitPerson(commit.getCommitterIdent());
+ commitMessage = commit.getShortMessage();
+ break;
+ case OBJ_BLOB:
+ break;
+ default:
+ throw new IncorrectObjectTypeException(newRefValue, Constants.typeString(objectType));
+ }
}
sharedRefDBLog.info(
gson.toJson(
@@ -81,7 +83,7 @@
project,
currRef.getName(),
currRef.getObjectId().getName(),
- newRefValue.getName(),
+ newRefValue == null ? ObjectId.zeroId().name() : newRefValue.getName(),
committer,
commitMessage)));
} catch (IOException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index 8b07115..b1f3e79 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
public interface MessageLogger {
@@ -23,5 +23,5 @@
CONSUME;
}
- public void log(Direction direction, SourceAwareEventWrapper event);
+ public void log(Direction direction, String topic, EventMessage event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index b796f42..920cf41 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -15,9 +15,7 @@
package com.googlesource.gerrit.plugins.multisite;
import com.gerritforge.gerrit.globalrefdb.GlobalRefDatabase;
-import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.CreationException;
@@ -30,9 +28,9 @@
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.RouterModule;
import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
import com.googlesource.gerrit.plugins.multisite.validation.dfsrefdb.NoopSharedRefDatabase;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -73,7 +71,10 @@
listener().to(Log4jMessageLogger.class);
bind(MessageLogger.class).to(Log4jMessageLogger.class);
+ install(brokerModule);
+
install(new ForwarderModule());
+ install(new BrokerForwarderModule());
if (config.cache().synchronize()) {
install(new CacheModule());
@@ -85,14 +86,7 @@
install(new IndexModule());
}
- install(brokerModule);
-
install(new RouterModule());
-
- if (config.getSharedRefDb().isEnabled()) {
- DynamicSet.bind(binder(), ProjectDeletedListener.class)
- .to(ProjectDeletedSharedDbCleanup.class);
- }
}
@Provides
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index f0ad970..bd90be4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -15,24 +15,17 @@
package com.googlesource.gerrit.plugins.multisite;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
-import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
-import com.google.inject.Scopes;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
public class PluginModule extends LifecycleModule {
private Configuration config;
- private KafkaBrokerModule kafkaBrokerModule;
@Inject
- public PluginModule(Configuration config, KafkaBrokerModule kafkaBrokerModule) {
+ public PluginModule(Configuration config) {
this.config = config;
- this.kafkaBrokerModule = kafkaBrokerModule;
}
@Override
@@ -42,8 +35,5 @@
DynamicSet.bind(binder(), ProjectDeletedListener.class)
.to(ProjectDeletedSharedDbCleanup.class);
}
- DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
- listener().to(KafkaBrokerApi.class);
- install(kafkaBrokerModule);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
deleted file mode 100644
index 35350e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.function.Consumer;
-
-/** API for sending/receiving events through a message Broker. */
-public interface BrokerApi {
-
- /**
- * Send an event to a topic.
- *
- * @param topic
- * @param event
- * @return true if the event was successfully sent. False otherwise.
- */
- boolean send(String topic, Event event);
-
- /**
- * Receive asynchronously events from a topic.
- *
- * @param topic
- * @param eventConsumer
- */
- void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
index 19cf0f7..b38ac6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -14,17 +14,39 @@
package com.googlesource.gerrit.plugins.multisite.broker;
-import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import java.util.Set;
import java.util.function.Consumer;
public class BrokerApiNoOp implements BrokerApi {
+ private final Set<TopicSubscriber> topicSubscribers;
+
+ @Inject
+ public BrokerApiNoOp() {
+ topicSubscribers = Sets.newHashSet();
+ }
@Override
- public boolean send(String topic, Event event) {
+ public boolean send(String topic, EventMessage event) {
return true;
}
@Override
- public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+ public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) {
+ topicSubscribers.add(TopicSubscriber.topicSubscriber(topic, eventConsumer));
+ }
+
+ @Override
+ public void disconnect() {
+ topicSubscribers.clear();
+ }
+
+ @Override
+ public Set<TopicSubscriber> topicSubscribers() {
+ return topicSubscribers;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index e83fe53..b266eb0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -14,29 +14,53 @@
package com.googlesource.gerrit.plugins.multisite.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
+import java.util.Set;
+import java.util.UUID;
import java.util.function.Consumer;
public class BrokerApiWrapper implements BrokerApi {
private final DynamicItem<BrokerApi> apiDelegate;
private final BrokerMetrics metrics;
+ private final MessageLogger msgLog;
+ private final UUID instanceId;
@Inject
- public BrokerApiWrapper(DynamicItem<BrokerApi> apiDelegate, BrokerMetrics metrics) {
+ public BrokerApiWrapper(
+ DynamicItem<BrokerApi> apiDelegate,
+ BrokerMetrics metrics,
+ MessageLogger msgLog,
+ @InstanceId UUID instanceId) {
this.apiDelegate = apiDelegate;
this.metrics = metrics;
+ this.msgLog = msgLog;
+ this.instanceId = instanceId;
+ }
+
+ public boolean send(String topic, Event event) {
+ return send(topic, apiDelegate.get().newMessage(instanceId, event));
}
@Override
- public boolean send(String topic, Event event) {
+ public boolean send(String topic, EventMessage message) {
+ if (Context.isForwardedEvent()) {
+ return true;
+ }
boolean succeeded = false;
try {
- succeeded = apiDelegate.get().send(topic, event);
+ succeeded = apiDelegate.get().send(topic, message);
} finally {
if (succeeded) {
+ msgLog.log(Direction.PUBLISH, topic, message);
metrics.incrementBrokerPublishedMessage();
} else {
metrics.incrementBrokerFailedToPublishMessage();
@@ -46,7 +70,17 @@
}
@Override
- public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
- apiDelegate.get().receiveAync(topic, eventConsumer);
+ public void receiveAsync(String topic, Consumer<EventMessage> messageConsumer) {
+ apiDelegate.get().receiveAsync(topic, messageConsumer);
+ }
+
+ @Override
+ public void disconnect() {
+ apiDelegate.get().disconnect();
+ }
+
+ @Override
+ public Set<TopicSubscriber> topicSubscribers() {
+ return apiDelegate.get().topicSubscribers();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index 6983984..093b920 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
deleted file mode 100644
index b04fbff..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker;
-
-public interface BrokerSession {
-
- boolean isOpen();
-
- void connect();
-
- void disconnect();
-
- boolean publish(String topic, String payload);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
deleted file mode 100644
index 743d323..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import java.util.UUID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class BrokerPublisher implements LifecycleListener {
- protected final Logger log = LoggerFactory.getLogger(getClass());
-
- private final BrokerSession session;
- private final Gson gson;
- private final UUID instanceId;
- private final MessageLogger msgLog;
-
- @Inject
- public BrokerPublisher(
- BrokerSession session,
- @EventGson Gson gson,
- @InstanceId UUID instanceId,
- MessageLogger msgLog) {
- this.session = session;
- this.gson = gson;
- this.instanceId = instanceId;
- this.msgLog = msgLog;
- }
-
- @Override
- public void start() {
- if (!session.isOpen()) {
- session.connect();
- }
- }
-
- @Override
- public void stop() {
- if (session.isOpen()) {
- session.disconnect();
- }
- }
-
- public boolean publish(String topic, Event event) {
- if (Context.isForwardedEvent()) {
- return true;
- }
-
- SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
- Boolean eventPublished = session.publish(topic, getPayload(brokerEvent));
- if (eventPublished) {
- msgLog.log(Direction.PUBLISH, brokerEvent);
- }
- return eventPublished;
- }
-
- private String getPayload(SourceAwareEventWrapper event) {
- return gson.toJson(event);
- }
-
- private SourceAwareEventWrapper toBrokerEvent(Event event) {
- JsonObject body = eventToJson(event);
- return new SourceAwareEventWrapper(
- new SourceAwareEventWrapper.EventHeader(
- UUID.randomUUID(), event.getType(), instanceId, event.eventCreatedOn),
- body);
- }
-
- @VisibleForTesting
- public JsonObject eventToJson(Event event) {
- return gson.toJsonTree(event).getAsJsonObject();
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
deleted file mode 100644
index 6594544..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaSession implements BrokerSession {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
- private KafkaConfiguration properties;
- private final UUID instanceId;
- private volatile Producer<String, String> producer;
-
- @Inject
- public KafkaSession(KafkaConfiguration kafkaConfig, @InstanceId UUID instanceId) {
- this.properties = kafkaConfig;
- this.instanceId = instanceId;
- }
-
- @Override
- public boolean isOpen() {
- if (producer != null) {
- return true;
- }
- return false;
- }
-
- @Override
- public void connect() {
- if (isOpen()) {
- LOGGER.debug("Already connected.");
- return;
- }
-
- LOGGER.info("Connect to {}...", properties.getKafka().getBootstrapServers());
- /* Need to make sure that the thread of the running connection uses
- * the correct class loader otherwize you can endup with hard to debug
- * ClassNotFoundExceptions
- */
- setConnectionClassLoader();
- producer = new KafkaProducer<>(properties.kafkaPublisher());
- LOGGER.info("Connection established.");
- }
-
- private void setConnectionClassLoader() {
- Thread.currentThread().setContextClassLoader(KafkaSession.class.getClassLoader());
- }
-
- @Override
- public void disconnect() {
- LOGGER.info("Disconnecting...");
- if (producer != null) {
- LOGGER.info("Closing Producer {}...", producer);
- producer.close();
- }
- producer = null;
- }
-
- @Override
- public boolean publish(String topic, String payload) {
- return publishToTopic(properties.getKafka().getTopicAlias(EventTopic.of(topic)), payload);
- }
-
- private boolean publishToTopic(String topic, String payload) {
- Future<RecordMetadata> future =
- producer.send(new ProducerRecord<>(topic, instanceId.toString(), payload));
- try {
- RecordMetadata metadata = future.get();
- LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
- return true;
- } catch (InterruptedException | ExecutionException e) {
- LOGGER.error("Cannot send the message", e);
- return false;
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 3d1046f..ec6072c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,76 +14,73 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
import com.google.gerrit.server.permissions.PermissionBackendException;
-import com.google.gson.Gson;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
import java.io.IOException;
import java.util.UUID;
+import java.util.function.Consumer;
-public abstract class AbstractSubcriber implements Runnable {
+public abstract class AbstractSubcriber {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final BrokerApi brokerApi;
private final ForwardedEventRouter eventRouter;
private final DynamicSet<DroppedEventListener> droppedEventListeners;
- private final Gson gson;
private final UUID instanceId;
private final MessageLogger msgLog;
private SubscriberMetrics subscriberMetrics;
+ private final Configuration cfg;
+ private final String topic;
public AbstractSubcriber(
- BrokerApiWrapper brokerApi,
ForwardedEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @EventGson Gson gson,
@InstanceId UUID instanceId,
MessageLogger msgLog,
- SubscriberMetrics subscriberMetrics) {
+ SubscriberMetrics subscriberMetrics,
+ Configuration cfg) {
this.eventRouter = eventRouter;
this.droppedEventListeners = droppedEventListeners;
- this.gson = gson;
this.instanceId = instanceId;
this.msgLog = msgLog;
this.subscriberMetrics = subscriberMetrics;
- this.brokerApi = brokerApi;
- }
-
- @Override
- public void run() {
- brokerApi.receiveAync(getTopic().topic(), this::processRecord);
+ this.cfg = cfg;
+ this.topic = getTopic().topic(cfg);
}
protected abstract EventTopic getTopic();
- private void processRecord(SourceAwareEventWrapper event) {
+ public Consumer<EventMessage> getConsumer() {
+ return this::processRecord;
+ }
- if (event.getHeader().getSourceInstanceId().equals(instanceId)) {
+ private void processRecord(EventMessage event) {
+
+ if (event.getHeader().sourceInstanceId.equals(instanceId)) {
logger.atFiner().log(
"Dropping event %s produced by our instanceId %s",
event.toString(), instanceId.toString());
droppedEventListeners.forEach(l -> l.onEventDropped(event));
} else {
try {
- msgLog.log(Direction.CONSUME, event);
- eventRouter.route(event.getEventBody(gson));
+ msgLog.log(Direction.CONSUME, topic, event);
+ eventRouter.route(event.getEvent());
subscriberMetrics.incrementSubscriberConsumedMessage();
} catch (IOException e) {
logger.atSevere().withCause(e).log(
- "Malformed event '%s': [Exception: %s]", event.getHeader().getEventType());
+ "Malformed event '%s': [Exception: %s]", event.getHeader());
subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
} catch (PermissionBackendException | CacheNotFoundException e) {
logger.atSevere().withCause(e).log(
- "Cannot handle message %s: [Exception: %s]", event.getHeader().getEventType());
+ "Cannot handle message %s: [Exception: %s]", event.getHeader());
subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 53aae99..6c67c46 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -15,13 +15,11 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import java.util.UUID;
@@ -30,21 +28,14 @@
public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@Inject
public CacheEvictionEventSubscriber(
- BrokerApiWrapper brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @EventGson Gson gsonProvider,
@InstanceId UUID instanceId,
MessageLogger msgLog,
- SubscriberMetrics subscriberMetrics) {
- super(
- brokerApi,
- eventRouter,
- droppedEventListeners,
- gsonProvider,
- instanceId,
- msgLog,
- subscriberMetrics);
+ SubscriberMetrics subscriberMetrics,
+ Configuration cfg) {
+
+ super(eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
deleted file mode 100644
index 936d07a..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.consumer;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 680e8ed..6f4680c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,11 +14,13 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.eventbroker.EventMessage;
+
public interface DroppedEventListener {
/**
* Invoked when any event is dropped.
*
* @param event information about the event.
*/
- void onEventDropped(SourceAwareEventWrapper event);
+ void onEventDropped(EventMessage event);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index eacccbf..696cf03 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -15,13 +15,11 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
import java.util.UUID;
@@ -30,21 +28,14 @@
public class IndexEventSubscriber extends AbstractSubcriber {
@Inject
public IndexEventSubscriber(
- BrokerApiWrapper brokerApi,
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @EventGson Gson gsonProvider,
@InstanceId UUID instanceId,
MessageLogger msgLog,
- SubscriberMetrics subscriberMetrics) {
+ SubscriberMetrics subscriberMetrics,
+ Configuration cfg) {
super(
- brokerApi,
- eventRouter,
- droppedEventListeners,
- gsonProvider,
- instanceId,
- msgLog,
- subscriberMetrics);
+ eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
index 1778961..f14dde5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
@@ -14,36 +14,42 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.google.common.collect.Lists;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import java.util.concurrent.ExecutorService;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
@Singleton
public class MultiSiteConsumerRunner implements LifecycleListener {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final DynamicSet<AbstractSubcriber> consumers;
- private final ExecutorService executor;
+ private DynamicItem<BrokerApi> brokerApi;
+ private Configuration cfg;
@Inject
public MultiSiteConsumerRunner(
- @ConsumerExecutor ExecutorService executor, DynamicSet<AbstractSubcriber> consumers) {
+ DynamicItem<BrokerApi> brokerApi,
+ DynamicSet<AbstractSubcriber> consumers,
+ Configuration cfg) {
this.consumers = consumers;
- this.executor = executor;
+ this.brokerApi = brokerApi;
+ this.cfg = cfg;
}
@Override
public void start() {
logger.atInfo().log("starting consumers");
- consumers.forEach(c -> executor.execute(c));
+ consumers.forEach(
+ consumer ->
+ brokerApi.get().receiveAsync(consumer.getTopic().topic(cfg), consumer.getConsumer()));
}
@Override
- public void stop() {
- logger.atInfo().log("shutting down consumers");
- executor.shutdown();
- }
+ public void stop() {}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index 4fa7f64..bf8b33d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -15,13 +15,11 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
import java.util.UUID;
@@ -30,15 +28,14 @@
public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
@Inject
public ProjectUpdateEventSubscriber(
- BrokerApiWrapper brokerApi,
ProjectListUpdateRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @EventGson Gson gson,
@InstanceId UUID instanceId,
MessageLogger msgLog,
- SubscriberMetrics subscriberMetrics) {
+ SubscriberMetrics subscriberMetrics,
+ Configuration cfg) {
super(
- brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
+ eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
deleted file mode 100644
index b8bd0d8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.consumer;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.gerrit.server.events.Event;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.UUID;
-
-public class SourceAwareEventWrapper {
-
- private final EventHeader header;
- private final JsonObject body;
-
- public EventHeader getHeader() {
- return header;
- }
-
- public JsonObject getBody() {
- return body;
- }
-
- public Event getEventBody(Gson gson) {
- return gson.fromJson(this.body, Event.class);
- }
-
- public static class EventHeader {
- private final UUID eventId;
- private final String eventType;
- private final UUID sourceInstanceId;
- private final Long eventCreatedOn;
-
- public EventHeader(UUID eventId, String eventType, UUID sourceInstanceId, Long eventCreatedOn) {
- this.eventId = eventId;
- this.eventType = eventType;
- this.sourceInstanceId = sourceInstanceId;
- this.eventCreatedOn = eventCreatedOn;
- }
-
- public UUID getEventId() {
- return eventId;
- }
-
- public String getEventType() {
- return eventType;
- }
-
- public UUID getSourceInstanceId() {
- return sourceInstanceId;
- }
-
- public Long getEventCreatedOn() {
- return eventCreatedOn;
- }
-
- public void validate() {
- requireNonNull(eventId, "EventId cannot be null");
- requireNonNull(eventType, "EventType cannot be null");
- requireNonNull(sourceInstanceId, "Source Instance ID cannot be null");
- }
-
- @Override
- public String toString() {
- return "{"
- + "eventId="
- + eventId
- + ", eventType='"
- + eventType
- + '\''
- + ", sourceInstanceId="
- + sourceInstanceId
- + ", eventCreatedOn="
- + eventCreatedOn
- + '}';
- }
- }
-
- public SourceAwareEventWrapper(EventHeader header, JsonObject body) {
- this.header = header;
- this.body = body;
- }
-
- public void validate() {
- requireNonNull(header, "Header cannot be null");
- requireNonNull(body, "Body cannot be null");
- header.validate();
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index 2918657..39ace0e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -15,13 +15,11 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import java.util.UUID;
@@ -30,15 +28,14 @@
public class StreamEventSubscriber extends AbstractSubcriber {
@Inject
public StreamEventSubscriber(
- BrokerApiWrapper brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
- @EventGson Gson gson,
@InstanceId UUID instanceId,
MessageLogger msgLog,
- SubscriberMetrics subscriberMetrics) {
+ SubscriberMetrics subscriberMetrics,
+ Configuration cfg) {
super(
- brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
+ eventRouter, droppedEventListeners, instanceId, msgLog, subscriberMetrics, cfg);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
index ef10151..36f618e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberMetrics.java
@@ -26,12 +26,9 @@
private static final String SUBSCRIBER_SUCCESS_COUNTER = "subscriber_msg_consumer_counter";
private static final String SUBSCRIBER_FAILURE_COUNTER =
"subscriber_msg_consumer_failure_counter";
- private static final String SUBSCRIBER_POLL_FAILURE_COUNTER =
- "subscriber_msg_consumer_poll_failure_counter";
private final Counter1<String> subscriberSuccessCounter;
private final Counter1<String> subscriberFailureCounter;
- private final Counter1<String> subscriberPollFailureCounter;
@Inject
public SubscriberMetrics(MetricMaker metricMaker) {
@@ -50,15 +47,6 @@
.setRate()
.setUnit("errors"),
stringField(SUBSCRIBER_FAILURE_COUNTER, "Subscriber failed to consume messages count"));
-
- this.subscriberPollFailureCounter =
- metricMaker.newCounter(
- "multi_site/subscriber/subscriber_message_consumer_poll_failure_counter",
- new Description("Number of failed attempts to poll messages by the subscriber")
- .setRate()
- .setUnit("errors"),
- stringField(
- SUBSCRIBER_POLL_FAILURE_COUNTER, "Subscriber failed to poll messages count"));
}
public void incrementSubscriberConsumedMessage() {
@@ -68,8 +56,4 @@
public void incrementSubscriberFailedToConsumeMessage() {
subscriberFailureCounter.increment(SUBSCRIBER_FAILURE_COUNTER);
}
-
- public void incrementSubscriberFailedToPollMessages() {
- subscriberPollFailureCounter.increment(SUBSCRIBER_POLL_FAILURE_COUNTER);
- }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
index 0a0c350..ee1430c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SubscriberModule.java
@@ -16,22 +16,21 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
public class SubscriberModule extends LifecycleModule {
@Override
protected void configure() {
MultiSiteEvent.registerEventTypes();
- bind(ExecutorService.class)
- .annotatedWith(ConsumerExecutor.class)
- .toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
- listener().to(MultiSiteConsumerRunner.class);
DynamicSet.setOf(binder(), AbstractSubcriber.class);
DynamicSet.setOf(binder(), DroppedEventListener.class);
+ listener().to(MultiSiteConsumerRunner.class);
+
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index dff21c1..b32e5ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -16,7 +16,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
@@ -24,15 +24,17 @@
@Singleton
public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
- private final BrokerApi broker;
+ private final BrokerApiWrapper broker;
+ private final Configuration cfg;
@Inject
- BrokerCacheEvictionForwarder(BrokerApiWrapper broker) {
+ BrokerCacheEvictionForwarder(BrokerApiWrapper broker, Configuration cfg) {
this.broker = broker;
+ this.cfg = cfg;
}
@Override
public boolean evict(CacheEvictionEvent event) {
- return broker.send(EventTopic.CACHE_TOPIC.topic(), event);
+ return broker.send(EventTopic.CACHE_TOPIC.topic(cfg), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
new file mode 100644
index 0000000..6bd6437
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+
+public class BrokerForwarderModule extends LifecycleModule {
+ @Override
+ protected void configure() {
+ DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+ DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
+ DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+ .to(BrokerProjectListUpdateForwarder.class);
+ DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index c2cc3dc..0b4252d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -15,22 +15,24 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
public class BrokerIndexEventForwarder implements IndexEventForwarder {
- private final BrokerApi broker;
+ private final BrokerApiWrapper broker;
+ private final Configuration cfg;
@Inject
- BrokerIndexEventForwarder(BrokerApiWrapper broker) {
+ BrokerIndexEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
this.broker = broker;
+ this.cfg = cfg;
}
@Override
public boolean index(IndexEvent event) {
- return broker.send(EventTopic.INDEX_TOPIC.topic(), event);
+ return broker.send(EventTopic.INDEX_TOPIC.topic(cfg), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 1a8b652..34e0300 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -18,22 +18,24 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
@Singleton
public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
- private final BrokerApi broker;
+ private final BrokerApiWrapper broker;
+ private final Configuration cfg;
@Inject
- BrokerProjectListUpdateForwarder(BrokerApiWrapper broker) {
+ BrokerProjectListUpdateForwarder(BrokerApiWrapper broker, Configuration cfg) {
this.broker = broker;
+ this.cfg = cfg;
}
@Override
public boolean updateProjectList(ProjectListUpdateEvent event) {
- return broker.send(PROJECT_LIST_TOPIC.topic(), event);
+ return broker.send(PROJECT_LIST_TOPIC.topic(cfg), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index ed3a717..9ff4688 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -17,22 +17,24 @@
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
@Singleton
public class BrokerStreamEventForwarder implements StreamEventForwarder {
- private final BrokerApi broker;
+ private final BrokerApiWrapper broker;
+ private final Configuration cfg;
@Inject
- BrokerStreamEventForwarder(BrokerApiWrapper broker) {
+ BrokerStreamEventForwarder(BrokerApiWrapper broker, Configuration cfg) {
this.broker = broker;
+ this.cfg = cfg;
}
@Override
public boolean send(Event event) {
- return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(), event);
+ return broker.send(EventTopic.STREAM_EVENT_TOPIC.topic(cfg), event);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
index 371abae..eaa2df9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventTopic.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.events;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
+
public enum EventTopic {
INDEX_TOPIC("GERRIT.EVENT.INDEX", "indexEvent"),
CACHE_TOPIC("GERRIT.EVENT.CACHE", "cacheEvent"),
@@ -28,8 +30,8 @@
this.aliasKey = aliasKey;
}
- public String topic() {
- return topic;
+ public String topic(Configuration config) {
+ return config.broker().getTopic(topicAliasKey(), topic);
}
public String topicAliasKey() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
deleted file mode 100644
index ff23b78..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.server.events.Event;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Consumer;
-
-public class KafkaBrokerApi implements BrokerApi, LifecycleListener {
-
- private final BrokerPublisher publisher;
- private final Provider<KafkaEventSubscriber> subscriberProvider;
- private List<KafkaEventSubscriber> subscribers;
-
- @Inject
- public KafkaBrokerApi(
- BrokerPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
- this.publisher = publisher;
- this.subscriberProvider = subscriberProvider;
- subscribers = new ArrayList<>();
- }
-
- @Override
- public boolean send(String topic, Event event) {
- return publisher.publish(topic, event);
- }
-
- @Override
- public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
- KafkaEventSubscriber subscriber = subscriberProvider.get();
- synchronized (subscribers) {
- subscribers.add(subscriber);
- }
- subscriber.subscribe(EventTopic.of(topic), eventConsumer);
- }
-
- @Override
- public void start() {}
-
- @Override
- public void stop() {
- for (KafkaEventSubscriber subscriber : subscribers) {
- subscriber.shutdown();
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
deleted file mode 100644
index 665ad49..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
-import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventDeserializer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaBrokerModule extends LifecycleModule {
-
- @Override
- protected void configure() {
- bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer());
- bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
- .to(KafkaEventDeserializer.class);
-
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
- DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
-
- listener().to(BrokerPublisher.class);
- bind(BrokerSession.class).to(KafkaSession.class);
-
- DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
- DynamicSet.bind(binder(), CacheEvictionForwarder.class).to(BrokerCacheEvictionForwarder.class);
- DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
- .to(BrokerProjectListUpdateForwarder.class);
- DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
deleted file mode 100644
index 0857334..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
+++ /dev/null
@@ -1,202 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import static com.google.common.base.Suppliers.memoize;
-
-import com.google.common.base.CaseFormat;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.eclipse.jgit.lib.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class KafkaConfiguration {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
- static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
- static final String KAFKA_SECTION = "kafka";
- private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
- private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
-
- private final Supplier<KafkaSubscriber> subscriber;
- private final Supplier<Kafka> kafka;
- private final Supplier<KafkaPublisher> publisher;
-
- @Inject
- public KafkaConfiguration(PluginConfigFactory configFactory, @PluginName String pluginName) {
- Config cfg = configFactory.getGlobalPluginConfig(pluginName);
- kafka = memoize(() -> new Kafka(cfg));
- publisher = memoize(() -> new KafkaPublisher(cfg));
- subscriber = memoize(() -> new KafkaSubscriber(cfg));
- }
-
- public Kafka getKafka() {
- return kafka.get();
- }
-
- public KafkaSubscriber kafkaSubscriber() {
- return subscriber.get();
- }
-
- private static void applyKafkaConfig(Config config, String subsectionName, Properties target) {
- for (String section : config.getSubsections(KAFKA_SECTION)) {
- if (section.equals(subsectionName)) {
- for (String name : config.getNames(KAFKA_SECTION, section, true)) {
- if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
- Object value = config.getString(KAFKA_SECTION, subsectionName, name);
- String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
- String propName =
- CaseFormat.LOWER_CAMEL
- .to(CaseFormat.LOWER_HYPHEN, configProperty)
- .replaceAll("-", ".");
- log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
- target.put(propName, value);
- }
- }
- }
- }
- target.put(
- "bootstrap.servers",
- getString(
- config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
- }
-
- private static String getString(
- Config cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.getString(section, subsection, name);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- return defaultValue;
- }
-
- public KafkaPublisher kafkaPublisher() {
- return publisher.get();
- }
-
- public static class Kafka {
- private final Map<EventTopic, String> eventTopics;
- private final String bootstrapServers;
-
- Kafka(Config config) {
- this.bootstrapServers =
- getString(
- config, KAFKA_SECTION, null, "bootstrapServers", DEFAULT_KAFKA_BOOTSTRAP_SERVERS);
-
- this.eventTopics = new HashMap<>();
- for (EventTopic eventTopic : EventTopic.values()) {
- eventTopics.put(
- eventTopic,
- getString(config, KAFKA_SECTION, null, eventTopic.topicAliasKey(), eventTopic.topic()));
- }
- }
-
- public String getTopicAlias(EventTopic topic) {
- return eventTopics.get(topic);
- }
-
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- private static String getString(
- Config cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.getString(section, subsection, name);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- return defaultValue;
- }
- }
-
- public static class KafkaPublisher extends Properties {
- private static final long serialVersionUID = 0L;
-
- public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
- public static final String KAFKA_PUBLISHER_SUBSECTION = "publisher";
-
- private KafkaPublisher(Config kafkaConfig) {
- setDefaults();
- applyKafkaConfig(kafkaConfig, KAFKA_PUBLISHER_SUBSECTION, this);
- }
-
- private void setDefaults() {
- put("acks", "all");
- put("retries", 10);
- put("batch.size", 16384);
- put("linger.ms", 1);
- put("buffer.memory", 33554432);
- put("key.serializer", KAFKA_STRING_SERIALIZER);
- put("value.serializer", KAFKA_STRING_SERIALIZER);
- put("reconnect.backoff.ms", 5000L);
- }
- }
-
- public static class KafkaSubscriber extends Properties {
- private static final long serialVersionUID = 1L;
-
- static final String KAFKA_SUBSCRIBER_SUBSECTION = "subscriber";
-
- private final Integer pollingInterval;
- private final Config cfg;
-
- public KafkaSubscriber(Config kafkaCfg) {
- this.cfg = kafkaCfg;
-
- this.pollingInterval =
- cfg.getInt(
- KAFKA_SECTION,
- KAFKA_SUBSCRIBER_SUBSECTION,
- "pollingIntervalMs",
- DEFAULT_POLLING_INTERVAL_MS);
-
- applyKafkaConfig(kafkaCfg, KAFKA_SUBSCRIBER_SUBSECTION, this);
- }
-
- public Properties initPropsWith(UUID instanceId) {
- String groupId =
- getString(
- cfg, KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, "groupId", instanceId.toString());
- this.put("group.id", groupId);
-
- return this;
- }
-
- public Integer getPollingInterval() {
- return pollingInterval;
- }
-
- private String getString(
- Config cfg, String section, String subsection, String name, String defaultValue) {
- String value = cfg.getString(section, subsection, name);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- return defaultValue;
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
deleted file mode 100644
index 9a5e19f..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.UUID;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-
-@Singleton
-public class KafkaConsumerFactory {
- private KafkaConfiguration config;
-
- @Inject
- public KafkaConsumerFactory(KafkaConfiguration configuration) {
- this.config = configuration;
- }
-
- public Consumer<byte[], byte[]> create(Deserializer<byte[]> keyDeserializer, UUID instanceId) {
- return new KafkaConsumer<>(
- config.kafkaSubscriber().initPropsWith(instanceId),
- keyDeserializer,
- new ByteArrayDeserializer());
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
deleted file mode 100644
index afd2656..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.google.gerrit.server.events.EventGson;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.Map;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-
-@Singleton
-public class KafkaEventDeserializer implements Deserializer<SourceAwareEventWrapper> {
-
- private final StringDeserializer stringDeserializer = new StringDeserializer();
- private Gson gson;
-
- // To be used when providing this deserializer with class name (then need to add a configuration
- // entry to set the gson.provider
- public KafkaEventDeserializer() {}
-
- @Inject
- public KafkaEventDeserializer(@EventGson Gson gson) {
- this.gson = gson;
- }
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {}
-
- @Override
- public SourceAwareEventWrapper deserialize(String topic, byte[] data) {
- final SourceAwareEventWrapper result =
- gson.fromJson(stringDeserializer.deserialize(topic, data), SourceAwareEventWrapper.class);
-
- result.validate();
-
- return result;
- }
-
- @Override
- public void close() {}
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
deleted file mode 100644
index c72fa0d..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.util.ManualRequestContext;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-public class KafkaEventSubscriber {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
- private final Consumer<byte[], byte[]> consumer;
- private final OneOffRequestContext oneOffCtx;
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
- private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
- private final KafkaConfiguration configuration;
- private final SubscriberMetrics subscriberMetrics;
-
- @Inject
- public KafkaEventSubscriber(
- KafkaConfiguration configuration,
- KafkaConsumerFactory consumerFactory,
- Deserializer<byte[]> keyDeserializer,
- Deserializer<SourceAwareEventWrapper> valueDeserializer,
- @InstanceId UUID instanceId,
- OneOffRequestContext oneOffCtx,
- SubscriberMetrics subscriberMetrics) {
-
- this.configuration = configuration;
- this.oneOffCtx = oneOffCtx;
- this.subscriberMetrics = subscriberMetrics;
-
- final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(KafkaEventSubscriber.class.getClassLoader());
- this.consumer = consumerFactory.create(keyDeserializer, instanceId);
- } finally {
- Thread.currentThread().setContextClassLoader(previousClassLoader);
- }
- this.valueDeserializer = valueDeserializer;
- }
-
- public void subscribe(
- EventTopic evenTopic, java.util.function.Consumer<SourceAwareEventWrapper> messageProcessor) {
- try {
- final String topic = configuration.getKafka().getTopicAlias(evenTopic);
- logger.atInfo().log(
- "Kafka consumer subscribing to topic alias [%s] for event topic [%s]", topic, evenTopic);
- consumer.subscribe(Collections.singleton(topic));
- while (!closed.get()) {
- ConsumerRecords<byte[], byte[]> consumerRecords =
- consumer.poll(Duration.ofMillis(configuration.kafkaSubscriber().getPollingInterval()));
- consumerRecords.forEach(
- consumerRecord -> {
- try (ManualRequestContext ctx = oneOffCtx.open()) {
- SourceAwareEventWrapper event =
- valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
- messageProcessor.accept(event);
- } catch (Exception e) {
- logger.atSevere().withCause(e).log(
- "Malformed event '%s': [Exception: %s]",
- new String(consumerRecord.value(), UTF_8));
- subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
- }
- });
- }
- } catch (WakeupException e) {
- // Ignore exception if closing
- if (!closed.get()) throw e;
- } catch (Exception e) {
- subscriberMetrics.incrementSubscriberFailedToPollMessages();
- throw e;
- } finally {
- consumer.close();
- }
- }
-
- public void shutdown() {
- closed.set(true);
- consumer.wakeup();
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 2abc7b0..92fa101 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -5,9 +5,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -19,26 +21,30 @@
@Mock private BrokerMetrics brokerMetrics;
@Mock private BrokerApi brokerApi;
@Mock Event event;
+ @Mock MessageLogger msgLog;
+ private UUID instanceId = UUID.randomUUID();
+ private String topic = "index";
private BrokerApiWrapper objectUnderTest;
@Before
public void setUp() {
objectUnderTest =
- new BrokerApiWrapper(DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics);
+ new BrokerApiWrapper(
+ DynamicItem.itemOf(BrokerApi.class, brokerApi), brokerMetrics, msgLog, instanceId);
}
@Test
public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
when(brokerApi.send(any(), any())).thenReturn(true);
- objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+ objectUnderTest.send(topic, event);
verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
}
@Test
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailed() {
when(brokerApi.send(any(), any())).thenReturn(false);
- objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+ objectUnderTest.send(topic, event);
verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
}
@@ -47,7 +53,7 @@
when(brokerApi.send(any(), any()))
.thenThrow(new RuntimeException("Unexpected runtime exception"));
try {
- objectUnderTest.send(EventTopic.INDEX_TOPIC.topic(), event);
+ objectUnderTest.send(topic, event);
} catch (RuntimeException e) {
// expected
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
deleted file mode 100644
index a6d9193..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ /dev/null
@@ -1,176 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker.kafka;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.only;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.entities.Account;
-import com.google.gerrit.entities.BranchNameKey;
-import com.google.gerrit.entities.Change;
-import com.google.gerrit.extensions.client.ChangeKind;
-import com.google.gerrit.server.data.AccountAttribute;
-import com.google.gerrit.server.data.ApprovalAttribute;
-import com.google.gerrit.server.events.CommentAddedEvent;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gerrit.server.util.time.TimeUtil;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.googlesource.gerrit.plugins.multisite.MessageLogger;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerMetrics;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class BrokerPublisherTest {
-
- @Mock private BrokerMetrics brokerMetrics;
- @Mock private BrokerSession session;
- @Mock private MessageLogger msgLog;
- private BrokerPublisher publisher;
-
- private Gson gson = new EventGsonProvider().get();
-
- private String accountName = "Foo Bar";
- private String accountEmail = "foo@bar.com";
- private String accountUsername = "foobar";
- private String approvalType = ChangeKind.REWORK.toString();
-
- private String approvalDescription = "ApprovalDescription";
- private String approvalValue = "+2";
- private String oldApprovalValue = "+1";
- private Long approvalGrantedOn = 123L;
- private String commentDescription = "Patch Set 1: Code-Review+2";
- private String projectName = "project";
- private String refName = "refs/heads/master";
- private String changeId = "Iabcd1234abcd1234abcd1234abcd1234abcd1234";
- private Long eventCreatedOn = 123L;
-
- @Before
- public void setUp() {
- publisher = new BrokerPublisher(session, gson, UUID.randomUUID(), msgLog);
- }
-
- @Test
- public void shouldSerializeCommentAddedEvent() {
-
- Event event = createSampleEvent();
-
- String expectedSerializedCommentEvent =
- "{\"author\": {\"name\": \""
- + accountName
- + "\",\"email\": \""
- + accountEmail
- + "\",\"username\": \""
- + accountUsername
- + "\"},\"approvals\": [{\"type\": \""
- + approvalType
- + "\",\"description\": \""
- + approvalDescription
- + "\",\"value\": \""
- + approvalValue
- + "\",\"oldValue\": \""
- + oldApprovalValue
- + "\",\"grantedOn\": "
- + approvalGrantedOn
- + ",\"by\": {\"name\": \""
- + accountName
- + "\",\"email\": \""
- + accountEmail
- + "\",\"username\": \""
- + accountUsername
- + "\"}}],\"comment\": \""
- + commentDescription
- + "\",\"project\": \""
- + projectName
- + "\",\"refName\": \""
- + refName
- + "\",\"changeKey\": {\"id\": \""
- + changeId
- + "\""
- + " },\"type\": \"comment-added\",\"eventCreatedOn\": "
- + eventCreatedOn
- + "}";
-
- JsonObject expectedCommentEventJsonObject =
- gson.fromJson(expectedSerializedCommentEvent, JsonElement.class).getAsJsonObject();
-
- assertThat(publisher.eventToJson(event)).isEqualTo(expectedCommentEventJsonObject);
- }
-
- @Test
- public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
- Event event = createSampleEvent();
- when(session.publish(any(), any())).thenReturn(true);
- publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
- verify(msgLog, only()).log(any(), any());
- }
-
- @Test
- public void shouldSkipEventPublishedLoggingWhenMessagePublishigFailed() {
- Event event = createSampleEvent();
- when(session.publish(any(), any())).thenReturn(false);
-
- publisher.publish(EventTopic.INDEX_TOPIC.topic(), event);
- verify(msgLog, never()).log(any(), any());
- }
-
- private Event createSampleEvent() {
- final Change change =
- new Change(
- Change.key(changeId),
- Change.id(1),
- Account.id(1),
- BranchNameKey.create(projectName, refName),
- TimeUtil.nowTs());
-
- CommentAddedEvent event = new CommentAddedEvent(change);
- AccountAttribute accountAttribute = new AccountAttribute();
- accountAttribute.email = accountEmail;
- accountAttribute.name = accountName;
- accountAttribute.username = accountUsername;
-
- event.eventCreatedOn = eventCreatedOn;
- event.approvals =
- () -> {
- ApprovalAttribute approvalAttribute = new ApprovalAttribute();
- approvalAttribute.value = approvalValue;
- approvalAttribute.oldValue = oldApprovalValue;
- approvalAttribute.description = approvalDescription;
- approvalAttribute.by = accountAttribute;
- approvalAttribute.type = ChangeKind.REWORK.toString();
- approvalAttribute.grantedOn = approvalGrantedOn;
-
- return new ApprovalAttribute[] {approvalAttribute};
- };
-
- event.author = () -> accountAttribute;
- event.comment = commentDescription;
-
- return event;
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
similarity index 96%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
index a19134d..c919df8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/CacheEvictionEventRouterTest.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
import static org.mockito.Mockito.verify;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
similarity index 98%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
index f1084c9..923abb2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/IndexEventRouterTest.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.Mockito.verify;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
similarity index 95%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
index 00a239b..93daf92 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/ProjectListUpdateRouterTest.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
import static org.mockito.Mockito.verify;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
similarity index 96%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
index 11560da..605eb45 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/event/StreamEventRouterTest.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.event;
import static org.mockito.Mockito.verify;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
deleted file mode 100644
index 2d266c2..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka;
-
-import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
-import org.eclipse.jgit.lib.Config;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaConfigurationTest {
-
- private Config kafkaConfig;
- @Mock PluginConfigFactory configFactory;
-
- @Before
- public void setup() {
- kafkaConfig = new Config();
- when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
- }
-
- private KafkaConfiguration getConfiguration() {
- return new KafkaConfiguration(configFactory, "multi-site");
- }
-
- @Test
- public void kafkaSubscriberPropertiesAreIgnoredWhenPrefixIsNotSet() {
- final String kafkaPropertyName = "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- kafkaConfig.setString(
- KAFKA_SECTION, KAFKA_SUBSCRIBER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaSubscriber().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
- public void kafkaPublisherPropertiesAreIgnoredWhenPrefixIsNotSet() {
- final String kafkaPropertyName = "fooBarBaz";
- final String kafkaPropertyValue = "aValue";
- kafkaConfig.setString(
- KAFKA_SECTION, KAFKA_PUBLISHER_SUBSECTION, kafkaPropertyName, kafkaPropertyValue);
-
- final String property = getConfiguration().kafkaPublisher().getProperty("foo.bar.baz");
-
- assertThat(property).isNull();
- }
-
- @Test
- public void shouldReturnKafkaTopicAliasForIndexTopic() {
- setKafkaTopicAlias("indexEventTopic", "gerrit_index");
- final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.INDEX_TOPIC);
-
- assertThat(property).isEqualTo("gerrit_index");
- }
-
- @Test
- public void shouldReturnKafkaTopicAliasForStreamEventTopic() {
- setKafkaTopicAlias("streamEventTopic", "gerrit_stream_events");
- final String property =
- getConfiguration().getKafka().getTopicAlias(EventTopic.STREAM_EVENT_TOPIC);
-
- assertThat(property).isEqualTo("gerrit_stream_events");
- }
-
- @Test
- public void shouldReturnKafkaTopicAliasForProjectListEventTopic() {
- setKafkaTopicAlias("projectListEventTopic", "gerrit_project_list");
- final String property =
- getConfiguration().getKafka().getTopicAlias(EventTopic.PROJECT_LIST_TOPIC);
-
- assertThat(property).isEqualTo("gerrit_project_list");
- }
-
- @Test
- public void shouldReturnKafkaTopicAliasForCacheEventTopic() {
- setKafkaTopicAlias("cacheEventTopic", "gerrit_cache");
- final String property = getConfiguration().getKafka().getTopicAlias(EventTopic.CACHE_TOPIC);
-
- assertThat(property).isEqualTo("gerrit_cache");
- }
-
- private void setKafkaTopicAlias(String topicKey, String topic) {
- kafkaConfig.setString(KAFKA_SECTION, null, topicKey, topic);
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
deleted file mode 100644
index 9751269..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ /dev/null
@@ -1,300 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.util.stream.Collectors.toSet;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.acceptance.AbstractDaemonTest;
-import com.google.gerrit.acceptance.GerritConfig;
-import com.google.gerrit.acceptance.LogThreshold;
-import com.google.gerrit.acceptance.NoHttpd;
-import com.google.gerrit.acceptance.UseLocalDisk;
-import com.google.gerrit.extensions.api.changes.ReviewInput;
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.data.PatchSetAttribute;
-import com.google.gerrit.server.events.CommentAddedEvent;
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventGson;
-import com.google.gerrit.server.events.PatchSetCreatedEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.query.change.ChangeData;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.Key;
-import com.google.inject.Scopes;
-import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.GitModule;
-import com.googlesource.gerrit.plugins.multisite.Module;
-import com.googlesource.gerrit.plugins.multisite.PluginModule;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
-import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import org.eclipse.jgit.lib.Config;
-import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.revwalk.RevWalk;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
-import org.junit.Test;
-import org.testcontainers.containers.KafkaContainer;
-
-@NoHttpd
-@LogThreshold(level = "INFO")
-@UseLocalDisk
-public class EventConsumerIT extends AbstractDaemonTest {
- public static final String GERRIT_CONFIG_KEY = "gerrit.installModule";
- public static final String GERRIT_CONFIG_VALUE =
- "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule";
- private static final int QUEUE_POLL_TIMEOUT_MSECS = 10000;
-
- public static class KafkaTestContainerModule extends LifecycleModule {
-
- public static class KafkaStopAtShutdown implements LifecycleListener {
- private final KafkaContainer kafka;
-
- public KafkaStopAtShutdown(KafkaContainer kafka) {
- this.kafka = kafka;
- }
-
- @Override
- public void stop() {
- kafka.stop();
- }
-
- @Override
- public void start() {
- // Do nothing
- }
- }
-
- public static class TestBrokerModule extends BrokerModule {
- @Override
- protected void configure() {
- DynamicItem.itemOf(binder(), BrokerApi.class);
- bind(BrokerApiWrapper.class).in(Scopes.SINGLETON);
-
- install(new SubscriberModule());
- }
- }
-
- private final FileBasedConfig config;
- private final Module multiSiteModule;
- private final PluginModule pluginModule;
- private final GitModule gitModule;
-
- @Inject
- public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
- this.config =
- new FileBasedConfig(
- sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
- config.setBoolean("ref-database", null, "enabled", false);
- config.save();
-
- Configuration multiSiteConfig = new Configuration(config, new Config());
-
- PluginConfigFactory cfgFactory = mock(PluginConfigFactory.class);
- when(cfgFactory.getGlobalPluginConfig("multi-site")).thenReturn(config);
-
- this.multiSiteModule = new Module(multiSiteConfig, new TestBrokerModule());
- this.pluginModule = new PluginModule(multiSiteConfig, new KafkaBrokerModule());
- this.gitModule = new GitModule(multiSiteConfig);
- }
-
- @Override
- protected void configure() {
- try {
- final KafkaContainer kafka = startAndConfigureKafkaConnection();
-
- listener().toInstance(new KafkaStopAtShutdown(kafka));
-
- install(multiSiteModule);
- install(pluginModule);
- install(gitModule);
-
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
-
- private KafkaContainer startAndConfigureKafkaConnection() throws IOException {
- KafkaContainer kafkaContainer = new KafkaContainer();
- kafkaContainer.start();
-
- Config kafkaConfig = new Config();
- kafkaConfig.setString(
- "kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
-
- PluginConfigFactory configFactory = mock(PluginConfigFactory.class);
- when(configFactory.getGlobalPluginConfig("multi-site")).thenReturn(kafkaConfig);
- KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(configFactory, "multi-site");
- bind(KafkaConfiguration.class).toInstance(kafkaConfiguration);
-
- listener().toInstance(new KafkaStopAtShutdown(kafkaContainer));
-
- return kafkaContainer;
- }
- }
-
- @Test
- @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
- public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
- LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
- drainQueue(droppedEventsQueue);
-
- ChangeData change = createChange().getChange();
- String project = change.project().get();
- int changeNum = change.getId().get();
- String changeNotesRef = change.notes().getRefName();
- int patchsetNum = change.currentPatchSet().id().get();
- String patchsetRevision = change.currentPatchSet().commitId().name();
- String patchsetRef = change.currentPatchSet().refName();
-
- Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
- assertThat(eventsByType).isNotEmpty();
-
- assertThat(eventsByType.get("change-index"))
- .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
-
- assertThat(
- eventsByType.get("ref-updated").stream()
- .map(e -> ((RefUpdatedEvent) e).getRefName())
- .collect(toSet()))
- .containsAtLeast(changeNotesRef, patchsetRef); // 'refs/sequences/changes'
- // not always updated thus
- // not checked
-
- List<Event> patchSetCreatedEvents = eventsByType.get("patchset-created");
- assertThat(patchSetCreatedEvents).hasSize(1);
- assertPatchSetAttributes(
- (PatchSetCreatedEvent) patchSetCreatedEvents.get(0),
- patchsetNum,
- patchsetRevision,
- patchsetRef);
- }
-
- private void assertPatchSetAttributes(
- PatchSetCreatedEvent patchSetCreated,
- int patchsetNum,
- String patchsetRevision,
- String patchsetRef) {
- PatchSetAttribute patchSetAttribute = patchSetCreated.patchSet.get();
- assertThat(patchSetAttribute.number).isEqualTo(patchsetNum);
- assertThat(patchSetAttribute.revision).isEqualTo(patchsetRevision);
- assertThat(patchSetAttribute.ref).isEqualTo(patchsetRef);
- }
-
- @Test
- @GerritConfig(name = GERRIT_CONFIG_KEY, value = GERRIT_CONFIG_VALUE)
- public void reviewChangeShouldPropagateChangeIndexAndCommentAdded() throws Exception {
- LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
- ChangeData change = createChange().getChange();
- String project = change.project().get();
- int changeNum = change.getId().get();
- drainQueue(droppedEventsQueue);
-
- ReviewInput in = ReviewInput.recommend();
- in.message = "LGTM";
- gApi.changes().id(changeNum).revision("current").review(in);
-
- Map<String, List<Event>> eventsByType = receiveEventsByType(droppedEventsQueue);
-
- assertThat(eventsByType).isNotEmpty();
-
- assertThat(eventsByType.get("change-index"))
- .containsExactly(createChangeIndexEvent(project, changeNum, getParentCommit(change)));
-
- List<Event> commentAddedEvents = eventsByType.get("comment-added");
- assertThat(commentAddedEvents).hasSize(1);
- assertThat(((CommentAddedEvent) commentAddedEvents.get(0)).comment)
- .isEqualTo("Patch Set 1: Code-Review+1\n\n" + in.message);
- }
-
- private String getParentCommit(ChangeData change) throws Exception {
- RevCommit parent;
- try (Repository repo = repoManager.openRepository(change.project());
- RevWalk walk = new RevWalk(repo)) {
- RevCommit commit = walk.parseCommit(change.currentPatchSet().commitId());
- parent = commit.getParent(0);
- }
- return parent.getId().name();
- }
-
- private ChangeIndexEvent createChangeIndexEvent(
- String projectName, int changeId, String targetSha1) {
- ChangeIndexEvent event = new ChangeIndexEvent(projectName, changeId, false);
- event.targetSha = targetSha1;
- return event;
- }
-
- private LinkedBlockingQueue<SourceAwareEventWrapper> captureDroppedEvents() throws Exception {
- LinkedBlockingQueue<SourceAwareEventWrapper> droppedEvents = new LinkedBlockingQueue<>();
-
- TypeLiteral<DynamicSet<DroppedEventListener>> type =
- new TypeLiteral<DynamicSet<DroppedEventListener>>() {};
- server
- .getTestInjector()
- .getInstance(Key.get(type))
- .add(
- "multi-site",
- new DroppedEventListener() {
- @Override
- public void onEventDropped(SourceAwareEventWrapper event) {
- droppedEvents.offer(event);
- }
- });
- return droppedEvents;
- }
-
- private Map<String, List<Event>> receiveEventsByType(
- LinkedBlockingQueue<SourceAwareEventWrapper> queue) throws InterruptedException {
- return drainQueue(queue).stream()
- .sorted(Comparator.comparing(e -> e.type))
- .collect(Collectors.groupingBy(e -> e.type));
- }
-
- private List<Event> drainQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
- throws InterruptedException {
- Gson gson = server.getTestInjector().getInstance(Key.get(Gson.class, EventGson.class));
- SourceAwareEventWrapper event;
- List<Event> eventsList = new ArrayList<>();
- while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
- eventsList.add(event.getEventBody(gson));
- }
- return eventsList;
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
deleted file mode 100644
index 76ad452..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import static com.google.common.truth.Truth.assertThat;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.gerrit.server.events.EventGsonProvider;
-import com.google.gson.Gson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.UUID;
-import org.junit.Before;
-import org.junit.Test;
-
-public class KafkaEventDeserializerTest {
- private KafkaEventDeserializer deserializer;
-
- @Before
- public void setUp() {
- final Gson gson = new EventGsonProvider().get();
- deserializer = new KafkaEventDeserializer(gson);
- }
-
- @Test
- public void kafkaEventDeserializerShouldParseAKafkaEvent() {
- final UUID eventId = UUID.randomUUID();
- final String eventType = "event-type";
- final UUID sourceInstanceId = UUID.randomUUID();
- final long eventCreatedOn = 10L;
- final String eventJson =
- String.format(
- "{ "
- + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d },"
- + "\"body\": {}"
- + "}",
- eventId, eventType, sourceInstanceId, eventCreatedOn);
- final SourceAwareEventWrapper event =
- deserializer.deserialize("ignored", eventJson.getBytes(UTF_8));
-
- assertThat(event.getBody().entrySet()).isEmpty();
- assertThat(event.getHeader().getEventId()).isEqualTo(eventId);
- assertThat(event.getHeader().getEventType()).isEqualTo(eventType);
- assertThat(event.getHeader().getSourceInstanceId()).isEqualTo(sourceInstanceId);
- assertThat(event.getHeader().getEventCreatedOn()).isEqualTo(eventCreatedOn);
- }
-
- @Test(expected = RuntimeException.class)
- public void kafkaEventDeserializerShouldFailForInvalidJson() {
- deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8));
- }
-
- @Test(expected = RuntimeException.class)
- public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() {
- deserializer.deserialize("ignored", "{}".getBytes(UTF_8));
- }
-}