blob: 2783584f6c10da6f1c3005058ed532840efe9378 [file] [log] [blame]
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.replication;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.UsedAt;
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.util.logging.NamedFluentLogger;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import org.eclipse.jgit.transport.URIish;
/** Manages automatic replication to remote repositories. */
public class ReplicationQueue
implements LifecycleListener,
GitReferenceUpdatedListener,
ProjectDeletedListener,
HeadUpdatedListener {
static final String REPLICATION_LOG_NAME = "replication_log";
static final NamedFluentLogger repLog = NamedFluentLogger.forName(REPLICATION_LOG_NAME);
private final ReplicationStateListener stateLog;
static String replaceName(String in, String name, boolean keyIsOptional) {
String key = "${name}";
int n = in.indexOf(key);
if (0 <= n) {
return in.substring(0, n) + name + in.substring(n + key.length());
}
if (keyIsOptional) {
return in;
}
return null;
}
private final WorkQueue workQueue;
private final DynamicItem<EventDispatcher> dispatcher;
private final ReplicationConfig config;
private final DynamicItem<AdminApiFactory> adminApiFactory;
private final ReplicationTasksStorage replicationTasksStorage;
private volatile boolean running;
private volatile boolean replaying;
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
@Inject
ReplicationQueue(
WorkQueue wq,
DynamicItem<AdminApiFactory> aaf,
ReplicationConfig rc,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
ReplicationTasksStorage rts) {
workQueue = wq;
dispatcher = dis;
config = rc;
stateLog = sl;
adminApiFactory = aaf;
replicationTasksStorage = rts;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@Override
public void start() {
if (!running) {
config.startup(workQueue);
running = true;
firePendingEvents();
fireBeforeStartupEvents();
}
}
@Override
public void stop() {
running = false;
int discarded = config.shutdown();
if (discarded > 0) {
repLog.atWarning().log("Canceled %d replication events during shutdown", discarded);
}
}
public boolean isRunning() {
return running;
}
public boolean isReplaying() {
return replaying;
}
void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
scheduleFullSync(project, urlMatch, state, false);
}
@VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
if (!running) {
stateLog.warn("Replication plugin did not finish startup before event", state);
return;
}
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
if (cfg.wouldPushProject(project)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
replicationTasksStorage.persist(
new ReplicateRefUpdate(
project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
}
}
}
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
onGitReferenceUpdated(event.getProjectName(), event.getRefName());
}
private void onGitReferenceUpdated(String projectName, String refName) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName));
return;
}
Project.NameKey project = new Project.NameKey(projectName);
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
pushReference(cfg, project, refName, state);
}
state.markAllPushTasksScheduled();
}
@UsedAt(UsedAt.Project.COLLABNET)
public void pushReference(Destination cfg, Project.NameKey project, String refName) {
pushReference(cfg, project, refName, null);
}
private void pushReference(
Destination cfg, Project.NameKey project, String refName, ReplicationState state) {
boolean withoutState = state == null;
if (withoutState) {
state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
}
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
for (URIish uri : cfg.getURIs(project, null)) {
replicationTasksStorage.persist(
new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
cfg.schedule(project, refName, uri, state);
}
} else {
repLog.atFine().log("Skipping ref %s on project %s", refName, project.get());
}
if (withoutState) {
state.markAllPushTasksScheduled();
}
}
private void firePendingEvents() {
try {
Set<String> eventsReplayed = new HashSet<>();
replaying = true;
for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
if (t == null) {
repLog.atWarning().log("Encountered null replication event in ReplicationTasksStorage");
continue;
}
String eventKey = String.format("%s:%s", t.project, t.ref);
if (!eventsReplayed.contains(eventKey)) {
repLog.atInfo().log("Firing pending task %s", eventKey);
onGitReferenceUpdated(t.project, t.ref);
eventsReplayed.add(eventKey);
}
}
} finally {
replaying = false;
}
}
@Override
public void onProjectDeleted(ProjectDeletedListener.Event event) {
Project.NameKey projectName = new Project.NameKey(event.getProjectName());
for (URIish uri : getURIs(null, projectName, FilterType.PROJECT_DELETION)) {
deleteProject(uri, projectName);
}
}
@Override
public void onHeadUpdated(HeadUpdatedListener.Event event) {
Project.NameKey project = new Project.NameKey(event.getProjectName());
for (URIish uri : getURIs(null, project, FilterType.ALL)) {
updateHead(uri, project, event.getNewHeadName());
}
}
private void fireBeforeStartupEvents() {
Set<String> eventsReplayed = new HashSet<>();
for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
if (!eventsReplayed.contains(eventKey)) {
repLog.atInfo().log("Firing pending task %s", event);
onGitReferenceUpdated(event.getProjectName(), event.getRefName());
eventsReplayed.add(eventKey);
}
}
}
private Set<URIish> getURIs(
@Nullable String remoteName, Project.NameKey projectName, FilterType filterType) {
if (config.getDestinations(filterType).isEmpty()) {
return Collections.emptySet();
}
if (!running) {
repLog.atSevere().log("Replication plugin did not finish startup before event");
return Collections.emptySet();
}
Set<URIish> uris = new HashSet<>();
for (Destination config : this.config.getDestinations(filterType)) {
if (!config.wouldPushProject(projectName)) {
continue;
}
if (remoteName != null && !config.getRemoteConfigName().equals(remoteName)) {
continue;
}
boolean adminURLUsed = false;
for (String url : config.getAdminUrls()) {
if (Strings.isNullOrEmpty(url)) {
continue;
}
URIish uri;
try {
uri = new URIish(url);
} catch (URISyntaxException e) {
repLog.atWarning().log("adminURL '%s' is invalid: %s", url, e.getMessage());
continue;
}
if (!isGerrit(uri)) {
String path =
replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
if (path == null) {
repLog.atWarning().log("adminURL %s does not contain ${name}", uri);
continue;
}
uri = uri.setPath(path);
if (!isSSH(uri)) {
repLog.atWarning().log("adminURL '%s' is invalid: only SSH is supported", uri);
continue;
}
}
uris.add(uri);
adminURLUsed = true;
}
if (!adminURLUsed) {
for (URIish uri : config.getURIs(projectName, "*")) {
uris.add(uri);
}
}
}
return uris;
}
public boolean createProject(String remoteName, Project.NameKey project, String head) {
boolean success = true;
for (URIish uri : getURIs(remoteName, project, FilterType.PROJECT_CREATION)) {
success &= createProject(uri, project, head);
}
return success;
}
private boolean createProject(URIish replicateURI, Project.NameKey projectName, String head) {
Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
if (adminApi.isPresent() && adminApi.get().createProject(projectName, head)) {
return true;
}
warnCannotPerform("create new project", replicateURI);
return false;
}
private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
if (adminApi.isPresent()) {
adminApi.get().deleteProject(projectName);
return;
}
warnCannotPerform("delete project", replicateURI);
}
private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
if (adminApi.isPresent()) {
adminApi.get().updateHead(projectName, newHead);
return;
}
warnCannotPerform("update HEAD of project", replicateURI);
}
private void warnCannotPerform(String op, URIish uri) {
repLog.atWarning().log("Cannot %s on remote site %s.", op, uri);
}
private static class ReferenceUpdatedEvent {
private String projectName;
private String refName;
public ReferenceUpdatedEvent(String projectName, String refName) {
this.projectName = projectName;
this.refName = refName;
}
public String getProjectName() {
return projectName;
}
public String getRefName() {
return refName;
}
@Override
public int hashCode() {
return Objects.hashCode(projectName, refName);
}
@Override
public boolean equals(Object obj) {
return (obj instanceof ReferenceUpdatedEvent)
&& Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName)
&& Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName);
}
}
}