blob: 3376b495e3f1cb7c293cc0312d656012a2d0a243 [file] [log] [blame]
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.multisite;
import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Suppliers.ofInstance;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.googlesource.gerrit.plugins.multisite.Configuration.Kafka;
import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaPublisher;
import com.googlesource.gerrit.plugins.multisite.Configuration.KafkaSubscriber;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaConfiguration {
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
static final String KAFKA_PROPERTY_PREFIX = "KafkaProp-";
static final String KAFKA_SECTION = "kafka";
static final String ENABLE_KEY = "enabled";
static final String DEFAULT_KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
static final boolean DEFAULT_ENABLE_PROCESSING = true;
static final int DEFAULT_POLLING_INTERVAL_MS = 1000;
private final Supplier<KafkaSubscriber> subscriber;
private final Supplier<Kafka> kafka;
private final Supplier<KafkaPublisher> publisher;
@VisibleForTesting
public KafkaConfiguration(Config kafkaConfig) {
Supplier<Config> lazyCfg = lazyLoad(kafkaConfig);
kafka = memoize(() -> new Kafka(lazyCfg));
publisher = memoize(() -> new KafkaPublisher(lazyCfg));
subscriber = memoize(() -> new KafkaSubscriber(lazyCfg));
}
public Kafka getKafka() {
return kafka.get();
}
public KafkaSubscriber kafkaSubscriber() {
return subscriber.get();
}
static void applyKafkaConfig(
Supplier<Config> configSupplier, String subsectionName, Properties target) {
Config config = configSupplier.get();
for (String section : config.getSubsections(KAFKA_SECTION)) {
if (section.equals(subsectionName)) {
for (String name : config.getNames(KAFKA_SECTION, section, true)) {
if (name.startsWith(KAFKA_PROPERTY_PREFIX)) {
Object value = config.getString(KAFKA_SECTION, subsectionName, name);
String configProperty = name.replaceFirst(KAFKA_PROPERTY_PREFIX, "");
String propName =
CaseFormat.LOWER_CAMEL
.to(CaseFormat.LOWER_HYPHEN, configProperty)
.replaceAll("-", ".");
log.info("[{}] Setting kafka property: {} = {}", subsectionName, propName, value);
target.put(propName, value);
}
}
}
}
target.put(
"bootstrap.servers",
getString(
configSupplier,
KAFKA_SECTION,
null,
"bootstrapServers",
DEFAULT_KAFKA_BOOTSTRAP_SERVERS));
}
private static String getString(
Supplier<Config> cfg, String section, String subsection, String name, String defaultValue) {
String value = cfg.get().getString(section, subsection, name);
if (!Strings.isNullOrEmpty(value)) {
return value;
}
return defaultValue;
}
static Map<EventFamily, Boolean> eventsEnabled(Supplier<Config> config, String subsection) {
Map<EventFamily, Boolean> eventsEnabled = new HashMap<>();
for (EventFamily eventFamily : EventFamily.values()) {
String enabledConfigKey = eventFamily.lowerCamelName() + "Enabled";
eventsEnabled.put(
eventFamily,
config
.get()
.getBoolean(
KafkaConfiguration.KAFKA_SECTION,
subsection,
enabledConfigKey,
KafkaConfiguration.DEFAULT_ENABLE_PROCESSING));
}
return eventsEnabled;
}
public KafkaPublisher kafkaPublisher() {
return publisher.get();
}
private Supplier<Config> lazyLoad(Config config) {
if (config instanceof FileBasedConfig) {
return memoize(
() -> {
FileBasedConfig fileConfig = (FileBasedConfig) config;
String fileConfigFileName = fileConfig.getFile().getPath();
try {
log.info("Loading configuration from {}", fileConfigFileName);
fileConfig.load();
} catch (IOException | ConfigInvalidException e) {
log.error("Unable to load configuration from " + fileConfigFileName, e);
}
return fileConfig;
});
}
return ofInstance(config);
}
}