blob: 606d96a68734a293bace38a1126a3ccdae0aad5c [file] [log] [blame]
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.replication;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
import static java.util.stream.Collectors.toList;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.replication.Destination.Factory;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.URIish;
@Singleton
public class DestinationsCollection implements ReplicationDestinations, ReplicationConfigValidator {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Factory destinationFactory;
private final Provider<ReplicationQueue> replicationQueue;
private volatile List<Destination> destinations;
private boolean shuttingDown;
public static class EventQueueNotEmptyException extends Exception {
private static final long serialVersionUID = 1L;
public EventQueueNotEmptyException(String errorMessage) {
super(errorMessage);
}
}
@Inject
public DestinationsCollection(
Destination.Factory destinationFactory,
Provider<ReplicationQueue> replicationQueue,
ReplicationFileBasedConfig replicationConfig,
EventBus eventBus)
throws ConfigInvalidException {
this.destinationFactory = destinationFactory;
this.replicationQueue = replicationQueue;
this.destinations = allDestinations(destinationFactory, validateConfig(replicationConfig));
eventBus.register(this);
}
@Override
public Multimap<Destination, URIish> getURIs(
Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
if (getAll(filterType).isEmpty()) {
return ImmutableMultimap.of();
}
SetMultimap<Destination, URIish> uris = HashMultimap.create();
for (Destination config : getAll(filterType)) {
if (!config.wouldPushProject(projectName)) {
continue;
}
if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
continue;
}
boolean adminURLUsed = false;
for (String url : config.getAdminUrls()) {
if (Strings.isNullOrEmpty(url)) {
continue;
}
URIish uri;
try {
uri = new URIish(url);
} catch (URISyntaxException e) {
repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
continue;
}
if (!isGerrit(uri) && !isGerritHttp(uri)) {
String path =
replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
if (path == null) {
repLog.warn("adminURL {} does not contain ${name}", uri);
continue;
}
uri = uri.setPath(path);
if (!isSSH(uri)) {
repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
continue;
}
}
uris.put(config, uri);
adminURLUsed = true;
}
if (!adminURLUsed) {
for (URIish uri : config.getURIs(projectName, "*")) {
uris.put(config, uri);
}
}
}
return uris;
}
@Override
public synchronized List<Destination> getAll(FilterType filterType) {
Predicate<? super Destination> filter;
switch (filterType) {
case PROJECT_CREATION:
filter = dest -> dest.isCreateMissingRepos();
break;
case PROJECT_DELETION:
filter = dest -> dest.isReplicateProjectDeletions();
break;
case ALL:
default:
filter = dest -> true;
break;
}
return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
}
@Override
public synchronized boolean isEmpty() {
return destinations.isEmpty();
}
@Override
public synchronized void startup(WorkQueue workQueue) {
shuttingDown = false;
for (Destination cfg : destinations) {
cfg.start(workQueue);
}
}
/* shutdown() cannot be set as a synchronized method because
* it may need to wait for pending events to complete;
* e.g. when enabling the drain of replication events before
* shutdown.
*
* As a rule of thumb for synchronized methods, because they
* implicitly define a critical section and associated lock,
* they should never hold waiting for another resource, otherwise
* the risk of deadlock is very high.
*
* See more background about deadlocks, what they are and how to
* prevent them at: https://en.wikipedia.org/wiki/Deadlock
*/
@Override
public int shutdown() {
synchronized (this) {
shuttingDown = true;
}
int discarded = 0;
for (Destination cfg : destinations) {
try {
drainReplicationEvents(cfg);
} catch (EventQueueNotEmptyException e) {
logger.atWarning().log("Event queue not empty: %s", e.getMessage());
} finally {
discarded += cfg.shutdown();
}
}
return discarded;
}
void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
int drainQueueAttempts = destination.getDrainQueueAttempts();
if (drainQueueAttempts == 0) {
return;
}
int pending = destination.getQueueInfo().pending.size();
int inFlight = destination.getQueueInfo().inFlight.size();
while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
try {
logger.atInfo().log(
"Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
inFlight, pending);
Thread.sleep(destination.getReplicationDelaySeconds());
} catch (InterruptedException ie) {
logger.atWarning().withCause(ie).log(
"Wait for replication events to drain has been interrupted");
}
pending = destination.getQueueInfo().pending.size();
inFlight = destination.getQueueInfo().inFlight.size();
drainQueueAttempts--;
}
if (pending > 0 || inFlight > 0) {
throw new EventQueueNotEmptyException(
String.format("Pending: %d - InFlight: %d", pending, inFlight));
}
}
@Subscribe
public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
if (shuttingDown) {
logger.atWarning().log("Shutting down: configuration reload ignored");
return;
}
try {
replicationQueue.get().stop();
destinations = allDestinations(destinationFactory, remoteConfigurations);
logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
} finally {
replicationQueue.get().start();
}
}
@Override
public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig replicationConfig)
throws ConfigInvalidException {
if (!replicationConfig.getConfig().getFile().exists()) {
logger.atWarning().log(
"Config file %s does not exist; not replicating",
replicationConfig.getConfig().getFile());
return Collections.emptyList();
}
if (replicationConfig.getConfig().getFile().length() == 0) {
logger.atInfo().log(
"Config file %s is empty; not replicating", replicationConfig.getConfig().getFile());
return Collections.emptyList();
}
try {
replicationConfig.getConfig().load();
} catch (ConfigInvalidException e) {
throw new ConfigInvalidException(
String.format(
"Config file %s is invalid: %s",
replicationConfig.getConfig().getFile(), e.getMessage()),
e);
} catch (IOException e) {
throw new ConfigInvalidException(
String.format(
"Cannot read %s: %s", replicationConfig.getConfig().getFile(), e.getMessage()),
e);
}
boolean defaultForceUpdate =
replicationConfig.getConfig().getBoolean("gerrit", "defaultForceUpdate", false);
ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder();
for (RemoteConfig c : allRemotes(replicationConfig.getConfig())) {
if (c.getURIs().isEmpty()) {
continue;
}
if (!c.getFetchRefSpecs().isEmpty()) {
repLog.info("Ignore '{}' endpoint: not a 'push' target", c.getName());
continue;
}
// If destination for push is not set assume equal to source.
for (RefSpec ref : c.getPushRefSpecs()) {
if (ref.getDestination() == null) {
ref.setDestination(ref.getSource());
}
}
if (c.getPushRefSpecs().isEmpty()) {
c.addPushRefSpec(
new RefSpec()
.setSourceDestination("refs/*", "refs/*")
.setForceUpdate(defaultForceUpdate));
}
DestinationConfiguration destinationConfiguration =
new DestinationConfiguration(c, replicationConfig.getConfig());
if (!destinationConfiguration.isSingleProjectMatch()) {
for (URIish u : c.getURIs()) {
if (u.getPath() == null || !u.getPath().contains("${name}")) {
throw new ConfigInvalidException(
String.format(
"remote.%s.url \"%s\" lacks ${name} placeholder in %s",
c.getName(), u, replicationConfig.getConfig().getFile()));
}
}
}
confs.add(destinationConfiguration);
}
return confs.build();
}
private List<Destination> allDestinations(
Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {
ImmutableList.Builder<Destination> dest = ImmutableList.builder();
for (RemoteConfiguration c : remoteConfigurations) {
if (c instanceof DestinationConfiguration) {
dest.add(destinationFactory.create((DestinationConfiguration) c));
}
}
return dest.build();
}
private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
Set<String> names = cfg.getSubsections("remote");
List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
for (String name : names) {
try {
result.add(new RemoteConfig(cfg, name));
} catch (URISyntaxException e) {
throw new ConfigInvalidException(
String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
}
}
return result;
}
}