Merge branch 'stable-3.0'
* stable-3.0:
Extra logging when local ref-db is misaligned with shared-db
Kafka module for multi-site
Change-Id: I49790e42bd331f6d5d843a138c8ad3709b721a2d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
index 7348137..ca01471 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Configuration.java
@@ -45,19 +45,19 @@
public static final String PLUGIN_NAME = "multi-site";
public static final String MULTI_SITE_CONFIG = PLUGIN_NAME + ".config";
- public static final String REPLICATION_CONFIG = "replication.config";
static final String INSTANCE_ID_FILE = "instanceId.data";
-
- // common parameters to cache and index sections
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
- static final int DEFAULT_INDEX_MAX_TRIES = 2;
- static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
static final int DEFAULT_THREAD_POOL_SIZE = 4;
- static final String NUM_STRIPED_LOCKS = "numStripedLocks";
- static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
static final String ENABLE_KEY = "enabled";
+ private static final String REPLICATION_CONFIG = "replication.config";
+ // common parameters to cache and index sections
+ private static final int DEFAULT_INDEX_MAX_TRIES = 2;
+ private static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
+ private static final String NUM_STRIPED_LOCKS = "numStripedLocks";
+ private static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+
private final Supplier<Cache> cache;
private final Supplier<Event> event;
private final Supplier<Index> index;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index b8256a9..1432c65 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -24,13 +24,12 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
import com.googlesource.gerrit.plugins.multisite.event.EventModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
-import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouterModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
import java.io.BufferedReader;
import java.io.BufferedWriter;
@@ -46,12 +45,16 @@
public class Module extends LifecycleModule {
private static final Logger log = LoggerFactory.getLogger(Module.class);
private Configuration config;
+ private KafkaForwardedEventRouterModule kafkaForwardedEventRouterModule;
+ private KafkaBrokerForwarderModule kafkaBrokerForwarderModule;
private final boolean disableGitRepositoryValidation;
- private KafkaConfiguration kafkaConfig;
@Inject
- public Module(Configuration config, KafkaConfiguration kafkaConfig) {
- this(config, kafkaConfig, false);
+ public Module(
+ Configuration config,
+ KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+ KafkaBrokerForwarderModule brokerForwarderModule) {
+ this(config, forwardedEeventRouterModule, brokerForwarderModule, false);
}
// TODO: It is not possible to properly test the libModules in Gerrit.
@@ -61,15 +64,20 @@
@VisibleForTesting
public Module(
Configuration config,
- KafkaConfiguration kafkaConfig,
+ KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+ KafkaBrokerForwarderModule brokerForwarderModule,
boolean disableGitRepositoryValidation) {
- init(config, kafkaConfig);
+ init(config, forwardedEeventRouterModule, brokerForwarderModule);
this.disableGitRepositoryValidation = disableGitRepositoryValidation;
}
- private void init(Configuration config, KafkaConfiguration kafkaConfig) {
+ private void init(
+ Configuration config,
+ KafkaForwardedEventRouterModule forwardedEeventRouterModule,
+ KafkaBrokerForwarderModule brokerForwarderModule) {
this.config = config;
- this.kafkaConfig = kafkaConfig;
+ this.kafkaForwardedEventRouterModule = forwardedEeventRouterModule;
+ this.kafkaBrokerForwarderModule = brokerForwarderModule;
}
@Override
@@ -95,13 +103,9 @@
install(new IndexModule());
}
- if (kafkaConfig.kafkaSubscriber().enabled()) {
- install(new KafkaConsumerModule(kafkaConfig.kafkaSubscriber()));
- install(new ForwardedEventRouterModule());
- }
- if (kafkaConfig.kafkaPublisher().enabled()) {
- install(new BrokerForwarderModule(kafkaConfig.kafkaPublisher()));
- }
+ install(kafkaForwardedEventRouterModule);
+
+ install(kafkaBrokerForwarderModule);
if (config.getSharedRefDb().isEnabled()) {
DynamicSet.bind(binder(), ProjectDeletedListener.class)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
new file mode 100644
index 0000000..62bdcb0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaBrokerForwarderModule.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker.kafka;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
+import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerCacheEvictionForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerIndexEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerProjectListUpdateForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.broker.BrokerStreamEventForwarder;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+
+public class KafkaBrokerForwarderModule extends LifecycleModule {
+ private final KafkaConfiguration config;
+
+ @Inject
+ public KafkaBrokerForwarderModule(KafkaConfiguration config) {
+ this.config = config;
+ }
+
+ @Override
+ protected void configure() {
+ if (config.kafkaPublisher().enabled()) {
+ listener().to(BrokerPublisher.class);
+ bind(BrokerSession.class).to(KafkaSession.class);
+
+ if (config.kafkaPublisher().enabledEvent(EventFamily.INDEX_EVENT)) {
+ DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventFamily.CACHE_EVENT)) {
+ DynamicSet.bind(binder(), CacheEvictionForwarder.class)
+ .to(BrokerCacheEvictionForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+ DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
+ .to(BrokerProjectListUpdateForwarder.class);
+ }
+ if (config.kafkaPublisher().enabledEvent(EventFamily.STREAM_EVENT)) {
+ DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index 7c83346..aaf329a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -16,9 +16,9 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
deleted file mode 100644
index b7ee20b..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerForwarderModule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
-
-import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaSession;
-import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-
-public class BrokerForwarderModule extends LifecycleModule {
- private final KafkaPublisher kafkaPublisher;
-
- public BrokerForwarderModule(KafkaPublisher kafkaPublisher) {
- this.kafkaPublisher = kafkaPublisher;
- }
-
- @Override
- protected void configure() {
- listener().to(BrokerPublisher.class);
- bind(BrokerSession.class).to(KafkaSession.class);
-
- if (kafkaPublisher.enabledEvent(EventFamily.INDEX_EVENT)) {
- DynamicSet.bind(binder(), IndexEventForwarder.class).to(BrokerIndexEventForwarder.class);
- }
- if (kafkaPublisher.enabledEvent(EventFamily.CACHE_EVENT)) {
- DynamicSet.bind(binder(), CacheEvictionForwarder.class)
- .to(BrokerCacheEvictionForwarder.class);
- }
- if (kafkaPublisher.enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
- DynamicSet.bind(binder(), ProjectListUpdateForwarder.class)
- .to(BrokerProjectListUpdateForwarder.class);
- }
- if (kafkaPublisher.enabledEvent(EventFamily.STREAM_EVENT)) {
- DynamicSet.bind(binder(), StreamEventForwarder.class).to(BrokerStreamEventForwarder.class);
- }
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 808245f..cb05a4e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -23,7 +23,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
@Singleton
-class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
+public class BrokerProjectListUpdateForwarder implements ProjectListUpdateForwarder {
private final BrokerPublisher publisher;
@Inject
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
similarity index 96%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
index 53afc40..eaf1ff7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/CacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/CacheEvictionEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.google.inject.Inject;
import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
index 7f740d5..79ab0b9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedCacheEvictionEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedCacheEvictionEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
similarity index 93%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
index f7de8cc..f9ad0c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
index 59dcaa4..a843fcc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedIndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedIndexEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
similarity index 92%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
index 1df3129..37cc3de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedProjectListUpdateRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
index c96ab1a..0e50ea9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedStreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ForwardedStreamEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.google.gerrit.server.events.Event;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
similarity index 98%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
index 958312a..1e21ae2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/IndexEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/IndexEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.DELETE;
import static com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedIndexingHandler.Operation.INDEX;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
similarity index 95%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
index 2cce12c..2248c4c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ProjectListUpdateRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/ProjectListUpdateRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.google.inject.Inject;
import com.google.inject.Singleton;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
similarity index 94%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
index 39d96bc..a330416 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/StreamEventRouter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/router/StreamEventRouter.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
+package com.googlesource.gerrit.plugins.multisite.forwarder.router;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.permissions.PermissionBackendException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
similarity index 91%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
index b88699a..ea16bc3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/KafkaConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfiguration.java
@@ -12,19 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.kafka;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Suppliers.ofInstance;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
-import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import java.io.IOException;
import java.util.HashMap;
@@ -35,7 +34,6 @@
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.util.FS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +41,11 @@
public class KafkaConfiguration {
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
- public static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
+ static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
static final String KAFKA_SECTION = "kafka";
static final String ENABLE_KEY = "enabled";
- static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
- static final boolean DEFAULT_ENABLE_PROCESSING = true;
+ private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final boolean DEFAULT_ENABLE_PROCESSING = true;
private static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
private final Supplier<KafkaSubscriber> subscriber;
@@ -55,22 +53,13 @@
private final Supplier<KafkaPublisher> publisher;
@Inject
- KafkaConfiguration(SitePaths sitePaths) {
- this(getConfigFile(sitePaths, Configuration.MULTI_SITE_CONFIG));
- }
-
- @VisibleForTesting
- public KafkaConfiguration(Config kafkaConfig) {
- Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
+ public KafkaConfiguration(Configuration configuration) {
+ Supplier<Config> lazyCfg = lazyLoad(configuration.getMultiSiteConfig());
kafka = memoize(() -> new Kafka(lazyCfg));
publisher = memoize(() -> new KafkaPublisher(lazyCfg));
subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
}
- private static FileBasedConfig getConfigFile(SitePaths sitePaths, String configFileName) {
- return new FileBasedConfig(sitePaths.etc_dir.resolve(configFileName).toFile(), FS.DETECTED);
- }
-
public Kafka getKafka() {
return kafka.get();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index e4d3fff..25d1b6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -25,11 +25,11 @@
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gson.Gson;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index 589afce..c8b3b55 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -21,10 +21,10 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
similarity index 87%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
index a5d4c59..bfc0e8d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
@@ -21,17 +21,17 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
@Singleton
-public class CacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
+public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
@Inject
- public CacheEvictionEventSubscriber(
+ public KafkaCacheEvictionEventSubscriber(
KafkaConfiguration configuration,
Deserializer<byte[]> keyDeserializer,
Deserializer<SourceAwareEventWrapper> valueDeserializer,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index 3e0e5d7..cc8a849 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -16,10 +16,11 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -27,10 +28,11 @@
public class KafkaConsumerModule extends LifecycleModule {
- private final KafkaSubscriber kafkaSubscriber;
+ private KafkaConfiguration config;
- public KafkaConsumerModule(KafkaSubscriber kafkaSubscriber) {
- this.kafkaSubscriber = kafkaSubscriber;
+ @Inject
+ public KafkaConsumerModule(KafkaConfiguration config) {
+ this.config = config;
}
@Override
@@ -47,17 +49,17 @@
DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
- if (kafkaSubscriber.enabledEvent(EventFamily.INDEX_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventFamily.INDEX_EVENT)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
}
- if (kafkaSubscriber.enabledEvent(EventFamily.STREAM_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventFamily.STREAM_EVENT)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
}
- if (kafkaSubscriber.enabledEvent(EventFamily.CACHE_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventFamily.CACHE_EVENT)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
- .to(CacheEvictionEventSubscriber.class);
+ .to(KafkaCacheEvictionEventSubscriber.class);
}
- if (kafkaSubscriber.enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
+ if (config.kafkaSubscriber().enabledEvent(EventFamily.PROJECT_LIST_EVENT)) {
DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
.to(ProjectUpdateEventSubscriber.class);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 0845595..351d8ab 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -21,10 +21,10 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 2a1e155..a15c7cd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -21,10 +21,10 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
import java.util.UUID;
import org.apache.kafka.common.serialization.Deserializer;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java
deleted file mode 100644
index dd11180..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/ForwardedEventRouterModule.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.kafka.router;
-
-import com.google.inject.AbstractModule;
-
-public class ForwardedEventRouterModule extends AbstractModule {
- @Override
- protected void configure() {
- bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
- bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
- bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
- bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
new file mode 100644
index 0000000..a6422ae
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/router/KafkaForwardedEventRouterModule.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka.router;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedCacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedIndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedStreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaConsumerModule;
+
+public class KafkaForwardedEventRouterModule extends AbstractModule {
+ private KafkaConfiguration kafkaConfig;
+ private KafkaConsumerModule kafkaConsumerModule;
+
+ @Inject
+ public KafkaForwardedEventRouterModule(
+ KafkaConfiguration config, KafkaConsumerModule kafkaConsumerModule) {
+ this.kafkaConfig = config;
+ this.kafkaConsumerModule = kafkaConsumerModule;
+ }
+
+ @Override
+ protected void configure() {
+ if (kafkaConfig.kafkaSubscriber().enabled()) {
+ bind(ForwardedIndexEventRouter.class).to(IndexEventRouter.class);
+ bind(ForwardedCacheEvictionEventRouter.class).to(CacheEvictionEventRouter.class);
+ bind(ForwardedProjectListUpdateRouter.class).to(ProjectListUpdateRouter.class);
+ bind(ForwardedStreamEventRouter.class).to(StreamEventRouter.class);
+
+ install(kafkaConsumerModule);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
index afd3766..efbf390 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/validation/dfsrefdb/zookeeper/ZkSharedRefDatabase.java
@@ -58,10 +58,22 @@
// Assuming this is a delete node NULL_REF
if (valueInZk == null) {
+ logger.atInfo().log(
+ "%s:%s not found in Zookeeper, assumed as delete node NULL_REF",
+ project, ref.getName());
return false;
}
- return readObjectId(valueInZk).equals(ref.getObjectId());
+ ObjectId objectIdInSharedRefDb = readObjectId(valueInZk);
+ Boolean isUpToDate = objectIdInSharedRefDb.equals(ref.getObjectId());
+
+ if (!isUpToDate) {
+ logger.atWarning().log(
+ "%s:%s is out of sync: local=%s zk=%s",
+ project, ref.getName(), ref.getObjectId(), objectIdInSharedRefDb);
+ }
+
+ return isUpToDate;
} catch (Exception e) {
throw new SharedLockException(project, ref.getName(), e);
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index 556917b..59df7bb 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -17,6 +17,8 @@
import static com.google.common.truth.Truth.assertThat;
import com.google.gerrit.server.config.SitePaths;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -36,16 +38,16 @@
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Configuration configMock;
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private KafkaConfiguration kafkaConfig;
+ @Mock private KafkaForwardedEventRouterModule routerModule;
+ @Mock private KafkaBrokerForwarderModule brokerForwarderModule;
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private Module module;
@Before
- public void setUp() {
- module = new Module(configMock, kafkaConfig);
+ public void setup() {
+ module = new Module(configMock, routerModule, brokerForwarderModule);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
similarity index 83%
rename from src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
rename to src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
index ab0d3bc..ba423ff 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/KafkaConfigurationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaConfigurationTest.java
@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite;
+package com.googlesource.gerrit.plugins.multisite.kafka;
import static com.google.common.truth.Truth.assertThat;
-import static com.googlesource.gerrit.plugins.multisite.Configuration.ENABLE_KEY;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KAFKA_SECTION;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
-import static com.googlesource.gerrit.plugins.multisite.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.ENABLE_KEY;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_PROPERTY_PREFIX;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KAFKA_SECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaPublisher.KAFKA_PUBLISHER_SUBSECTION;
+import static com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration.KafkaSubscriber.KAFKA_SUBSCRIBER_SUBSECTION;
+import com.googlesource.gerrit.plugins.multisite.Configuration;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
@@ -31,14 +32,16 @@
public class KafkaConfigurationTest {
private Config globalPluginConfig;
+ private Configuration multiSiteConfig;
@Before
- public void setUp() {
+ public void setup() {
globalPluginConfig = new Config();
+ multiSiteConfig = new Configuration(globalPluginConfig, new Config());
}
private KafkaConfiguration getConfiguration() {
- return new KafkaConfiguration(globalPluginConfig);
+ return new KafkaConfiguration(multiSiteConfig);
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
index ad09323..a19134d 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventRouterTest.java
@@ -21,7 +21,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedCacheEvictionHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.GsonParser;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.CacheEvictionEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.CacheEvictionEventRouter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 60381ae..aa95fed 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -39,9 +39,11 @@
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.googlesource.gerrit.plugins.multisite.Configuration;
-import com.googlesource.gerrit.plugins.multisite.KafkaConfiguration;
import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
+import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -92,12 +94,24 @@
private final Module multiSiteModule;
@Inject
- public KafkaTestContainerModule(SitePaths sitePaths) {
+ public KafkaTestContainerModule(SitePaths sitePaths) throws IOException {
this.config =
new FileBasedConfig(
sitePaths.etc_dir.resolve(Configuration.MULTI_SITE_CONFIG).toFile(), FS.DETECTED);
+ config.setBoolean("kafka", "publisher", "enabled", true);
+ config.setBoolean("kafka", "subscriber", "enabled", true);
+ config.setBoolean("ref-database", null, "enabled", false);
+ config.save();
+
+ Configuration multiSiteConfig = new Configuration(config, new Config());
+ KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(multiSiteConfig);
this.multiSiteModule =
- new Module(new Configuration(config, new Config()), new KafkaConfiguration(config), true);
+ new Module(
+ multiSiteConfig,
+ new KafkaForwardedEventRouterModule(
+ kafkaConfiguration, new KafkaConsumerModule(kafkaConfiguration)),
+ new KafkaBrokerForwarderModule(kafkaConfiguration),
+ true);
}
@Override
@@ -119,9 +133,6 @@
kafkaContainer.start();
config.setString("kafka", null, "bootstrapServers", kafkaContainer.getBootstrapServers());
- config.setBoolean("kafka", "publisher", "enabled", true);
- config.setBoolean("kafka", "subscriber", "enabled", true);
- config.setBoolean("ref-database", null, "enabled", false);
config.save();
Configuration multiSiteConfig = new Configuration(config, new Config());
bind(Configuration.class).toInstance(multiSiteConfig);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
index 750f466..e2cf5ef 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventRouterTest.java
@@ -29,7 +29,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.events.GroupIndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectIndexEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
index 7b3cc74..00a239b 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectListUpdateRouterTest.java
@@ -18,7 +18,7 @@
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedProjectListUpdateHandler;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
index 0caccd5..ef6c04f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventRouterTest.java
@@ -22,7 +22,7 @@
import com.google.gerrit.server.events.CommentAddedEvent;
import com.google.gerrit.server.util.time.TimeUtil;
import com.googlesource.gerrit.plugins.multisite.forwarder.ForwardedEventHandler;
-import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
+import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;