Merge branch 'stable-2.15' into stable-2.16
* stable-2.15:
Fix issue with dropping events on start
Change-Id: I8d1b52bcf8f8c288892492b4375178756c784ec3
diff --git a/BUILD b/BUILD
index 0a54c3e..50615d8 100644
--- a/BUILD
+++ b/BUILD
@@ -21,7 +21,10 @@
junit_tests(
name = "replication_tests",
- srcs = glob(["src/test/java/**/*Test.java"]),
+ srcs = glob([
+ "src/test/java/**/*Test.java",
+ "src/test/java/**/*IT.java",
+ ]),
tags = ["replication"],
visibility = ["//visibility:public"],
deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
@@ -32,7 +35,7 @@
java_library(
name = "replication_util",
- testonly = 1,
+ testonly = True,
srcs = glob(
["src/test/java/**/*.java"],
exclude = ["src/test/java/**/*Test.java"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
new file mode 100644
index 0000000..acbf763
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.reviewdb.client.Project;
+
+public interface AdminApi {
+ public boolean createProject(Project.NameKey project, String head);
+
+ public boolean deleteProject(Project.NameKey project);
+
+ public boolean updateHead(Project.NameKey project, String newHead);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
new file mode 100644
index 0000000..de6e91e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
@@ -0,0 +1,73 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Factory for creating an {@link AdminApi} instance for a remote URI. */
+public interface AdminApiFactory {
+ /**
+ * Create an {@link AdminApi} for the given remote URI.
+ *
+ * @param uri the remote URI.
+ * @return An API for the given remote URI, or {@code Optional.empty} if there is no appropriate
+ * API for the URI.
+ */
+ Optional<AdminApi> create(URIish uri);
+
+ @Singleton
+ static class DefaultAdminApiFactory implements AdminApiFactory {
+ protected final SshHelper sshHelper;
+
+ @Inject
+ public DefaultAdminApiFactory(SshHelper sshHelper) {
+ this.sshHelper = sshHelper;
+ }
+
+ @Override
+ public Optional<AdminApi> create(URIish uri) {
+ if (isGerrit(uri)) {
+ return Optional.of(new GerritSshApi(sshHelper, uri));
+ } else if (!uri.isRemote()) {
+ return Optional.of(new LocalFS(uri));
+ } else if (isSSH(uri)) {
+ return Optional.of(new RemoteSsh(sshHelper, uri));
+ }
+ return Optional.empty();
+ }
+ }
+
+ static boolean isGerrit(URIish uri) {
+ String scheme = uri.getScheme();
+ return scheme != null && scheme.toLowerCase().equals("gerrit+ssh");
+ }
+
+ static boolean isSSH(URIish uri) {
+ if (!uri.isRemote()) {
+ return false;
+ }
+ String scheme = uri.getScheme();
+ if (scheme != null && scheme.toLowerCase().contains("ssh")) {
+ return true;
+ }
+ if (scheme == null && uri.getHost() != null && uri.getPath() != null) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 5d6e409..02daa6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -13,36 +13,50 @@
// limitations under the License.
package com.googlesource.gerrit.plugins.replication;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.common.FileUtil;
+import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import com.google.inject.Singleton;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.List;
import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Singleton
public class AutoReloadConfigDecorator implements ReplicationConfig {
- private static final Logger log = LoggerFactory.getLogger(AutoReloadConfigDecorator.class);
- private ReplicationFileBasedConfig currentConfig;
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private volatile ReplicationFileBasedConfig currentConfig;
private long currentConfigTs;
+ private long lastFailedConfigTs;
private final SitePaths site;
- private final WorkQueue workQueue;
- private final DestinationFactory destinationFactory;
+ private final Destination.Factory destinationFactory;
+ private final Path pluginDataDir;
+ // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
+ // ReplicationConfig
+ private final Provider<ReplicationQueue> replicationQueue;
+
+ private volatile boolean shuttingDown;
@Inject
public AutoReloadConfigDecorator(
- SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory)
+ SitePaths site,
+ Destination.Factory destinationFactory,
+ Provider<ReplicationQueue> replicationQueue,
+ @PluginData Path pluginDataDir)
throws ConfigInvalidException, IOException {
this.site = site;
this.destinationFactory = destinationFactory;
+ this.pluginDataDir = pluginDataDir;
this.currentConfig = loadConfig();
this.currentConfigTs = getLastModified(currentConfig);
- this.workQueue = workQueue;
+ this.replicationQueue = replicationQueue;
}
private static long getLastModified(ReplicationFileBasedConfig cfg) {
@@ -50,7 +64,7 @@
}
private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
- return new ReplicationFileBasedConfig(site, destinationFactory);
+ return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
}
private synchronized boolean isAutoReload() {
@@ -64,25 +78,42 @@
}
private void reloadIfNeeded() {
- try {
- if (isAutoReload()) {
- long lastModified = getLastModified(currentConfig);
- if (lastModified > currentConfigTs) {
- ReplicationFileBasedConfig newConfig = loadConfig();
- newConfig.startup(workQueue);
- int discarded = currentConfig.shutdown();
+ reload(false);
+ }
- this.currentConfig = newConfig;
- this.currentConfigTs = lastModified;
- log.info(
- "Configuration reloaded: {} destinations, {} replication events discarded",
- currentConfig.getDestinations(FilterType.ALL).size(),
- discarded);
+ @VisibleForTesting
+ public void forceReload() {
+ reload(true);
+ }
+
+ private void reload(boolean force) {
+ if (force || isAutoReload()) {
+ ReplicationQueue queue = replicationQueue.get();
+
+ long lastModified = getLastModified(currentConfig);
+ try {
+ if (force
+ || (!shuttingDown
+ && lastModified > currentConfigTs
+ && lastModified > lastFailedConfigTs
+ && queue.isRunning()
+ && !queue.isReplaying())) {
+ queue.stop();
+ currentConfig = loadConfig();
+ currentConfigTs = lastModified;
+ lastFailedConfigTs = 0;
+ logger.atInfo().log(
+ "Configuration reloaded: %d destinations",
+ currentConfig.getDestinations(FilterType.ALL).size());
}
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Cannot reload replication configuration: keeping existing settings");
+ lastFailedConfigTs = lastModified;
+ return;
+ } finally {
+ queue.start();
}
- } catch (Exception e) {
- log.error("Cannot reload replication configuration: keeping existing settings", e);
- return;
}
}
@@ -102,12 +133,32 @@
}
@Override
- public synchronized int shutdown() {
+ public Path getEventsDirectory() {
+ return currentConfig.getEventsDirectory();
+ }
+
+ /* shutdown() cannot be set as a synchronized method because
+ * it may need to wait for pending events to complete;
+ * e.g. when enabling the drain of replication events before
+ * shutdown.
+ *
+ * As a rule of thumb for synchronized methods, because they
+ * implicitly define a critical section and associated lock,
+ * they should never hold waiting for another resource, otherwise
+ * the risk of deadlock is very high.
+ *
+ * See more background about deadlocks, what they are and how to
+ * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+ */
+ @Override
+ public int shutdown() {
+ this.shuttingDown = true;
return currentConfig.shutdown();
}
@Override
public synchronized void startup(WorkQueue workQueue) {
+ shuttingDown = false;
currentConfig.startup(workQueue);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
index f8737b6..29a7ee6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
@@ -16,18 +16,16 @@
import static com.google.gerrit.common.FileUtil.lastModified;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import java.io.IOException;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class AutoReloadSecureCredentialsFactoryDecorator implements CredentialsFactory {
- private static final Logger log =
- LoggerFactory.getLogger(AutoReloadSecureCredentialsFactoryDecorator.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory;
private volatile long secureCredentialsFactoryLoadTs;
@@ -58,13 +56,12 @@
secureCredentialsFactory.compareAndSet(
secureCredentialsFactory.get(), new SecureCredentialsFactory(site));
secureCredentialsFactoryLoadTs = getSecureConfigLastEditTs();
- log.info("secure.config reloaded as it was updated on the file system");
+ logger.atInfo().log("secure.config reloaded as it was updated on the file system");
}
} catch (Exception e) {
- log.error(
+ logger.atSevere().withCause(e).log(
"Unexpected error while trying to reload "
- + "secure.config: keeping existing credentials",
- e);
+ + "secure.config: keeping existing credentials");
}
return secureCredentialsFactory.get().create(remoteName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 0a06093..36960a1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -24,9 +24,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
-import com.google.gerrit.common.EventDispatcher;
import com.google.gerrit.common.data.GroupReference;
-import com.google.gerrit.extensions.client.ProjectState;
import com.google.gerrit.extensions.config.FactoryModule;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
@@ -42,6 +40,7 @@
import com.google.gerrit.server.account.GroupIncludeCache;
import com.google.gerrit.server.account.ListGroupMembership;
import com.google.gerrit.server.config.RequestScopedReviewDbProvider;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.git.WorkQueue;
@@ -50,15 +49,18 @@
import com.google.gerrit.server.permissions.ProjectPermission;
import com.google.gerrit.server.permissions.RefPermission;
import com.google.gerrit.server.project.NoSuchProjectException;
-import com.google.gerrit.server.project.PerRequestProjectControlCache;
-import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.util.RequestContext;
+import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Provider;
import com.google.inject.Provides;
+import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -83,18 +85,25 @@
public class Destination {
private static final Logger repLog = ReplicationQueue.repLog;
+
+ public interface Factory {
+ Destination create(DestinationConfiguration config);
+ }
+
private final ReplicationStateListener stateLog;
private final Object stateLock = new Object();
private final Map<URIish, PushOne> pending = new HashMap<>();
private final Map<URIish, PushOne> inFlight = new HashMap<>();
private final PushOne.Factory opFactory;
- private final ProjectControl.Factory projectControlFactory;
private final GitRepositoryManager gitManager;
private final PermissionBackend permissionBackend;
+ private final Provider<CurrentUser> userProvider;
+ private final ProjectCache projectCache;
private volatile ScheduledExecutorService pool;
private final PerThreadRequestScope.Scoper threadScoper;
private final DestinationConfiguration config;
private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final Provider<ReplicationTasksStorage> replicationTasksStorage;
protected enum RetryReason {
TRANSPORT_ERROR,
@@ -112,23 +121,28 @@
}
}
+ @Inject
protected Destination(
Injector injector,
- DestinationConfiguration cfg,
- RemoteSiteUser.Factory replicationUserFactory,
PluginUser pluginUser,
GitRepositoryManager gitRepositoryManager,
PermissionBackend permissionBackend,
+ Provider<CurrentUser> userProvider,
+ ProjectCache projectCache,
GroupBackend groupBackend,
- ReplicationStateListener stateLog,
+ ReplicationStateListeners stateLog,
GroupIncludeCache groupIncludeCache,
- DynamicItem<EventDispatcher> eventDispatcher) {
- config = cfg;
+ DynamicItem<EventDispatcher> eventDispatcher,
+ Provider<ReplicationTasksStorage> rts,
+ @Assisted DestinationConfiguration cfg) {
this.eventDispatcher = eventDispatcher;
gitManager = gitRepositoryManager;
this.permissionBackend = permissionBackend;
+ this.userProvider = userProvider;
+ this.projectCache = projectCache;
this.stateLog = stateLog;
-
+ this.replicationTasksStorage = rts;
+ config = cfg;
CurrentUser remoteUser;
if (!cfg.getAuthGroupNames().isEmpty()) {
ImmutableSet.Builder<AccountGroup.UUID> builder = ImmutableSet.builder();
@@ -141,7 +155,7 @@
repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
}
}
- remoteUser = replicationUserFactory.create(new ListGroupMembership(builder.build()));
+ remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
} else {
remoteUser = pluginUser;
}
@@ -153,7 +167,6 @@
protected void configure() {
bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
bind(PerThreadRequestScope.Propagator.class);
- bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
bind(Destination.class).toInstance(Destination.this);
bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
@@ -185,7 +198,6 @@
}
});
- projectControlFactory = child.getInstance(ProjectControl.Factory.class);
opFactory = child.getInstance(PushOne.Factory.class);
threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
}
@@ -245,15 +257,21 @@
}
}
- private boolean shouldReplicate(ProjectControl ctl) throws PermissionBackendException {
- if (!config.replicateHiddenProjects() && ctl.getProject().getState() == ProjectState.HIDDEN) {
+ private boolean shouldReplicate(ProjectState state, CurrentUser user)
+ throws PermissionBackendException {
+ if (!config.replicateHiddenProjects()
+ && state.getProject().getState()
+ == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
return false;
}
+
+ // Hidden projects(permitsRead = false) should only be accessible by the project owners.
+ // READ_CONFIG is checked here because it's only allowed to project owners(ACCESS may also
+ // be allowed for other users).
+ ProjectPermission permissionToCheck =
+ state.statePermitsRead() ? ProjectPermission.ACCESS : ProjectPermission.READ_CONFIG;
try {
- permissionBackend
- .user(ctl.getUser())
- .project(ctl.getProject().getNameKey())
- .check(ProjectPermission.ACCESS);
+ permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
return true;
} catch (AuthException e) {
return false;
@@ -268,8 +286,19 @@
new Callable<Boolean>() {
@Override
public Boolean call() throws NoSuchProjectException, PermissionBackendException {
- ProjectControl projectControl = controlFor(project);
- if (!shouldReplicate(projectControl)) {
+ ProjectState projectState;
+ try {
+ projectState = projectCache.checkedGet(project);
+ } catch (IOException e) {
+ return false;
+ }
+ if (projectState == null) {
+ throw new NoSuchProjectException(project);
+ }
+ if (!projectState.statePermitsRead()) {
+ return false;
+ }
+ if (!shouldReplicate(projectState, userProvider.get())) {
return false;
}
if (PushOne.ALL_REFS.equals(ref)) {
@@ -277,7 +306,7 @@
}
try {
permissionBackend
- .user(projectControl.getUser())
+ .user(userProvider.get())
.project(project)
.ref(ref)
.check(RefPermission.READ);
@@ -304,7 +333,16 @@
new Callable<Boolean>() {
@Override
public Boolean call() throws NoSuchProjectException, PermissionBackendException {
- return shouldReplicate(controlFor(project));
+ ProjectState projectState;
+ try {
+ projectState = projectCache.checkedGet(project);
+ } catch (IOException e) {
+ return false;
+ }
+ if (projectState == null) {
+ throw new NoSuchProjectException(project);
+ }
+ return shouldReplicate(projectState, userProvider.get());
}
})
.call();
@@ -354,21 +392,21 @@
}
synchronized (stateLock) {
- PushOne e = getPendingPush(uri);
- if (e == null) {
- e = opFactory.create(project, uri);
- addRef(e, ref);
- e.addState(ref, state);
+ PushOne task = getPendingPush(uri);
+ if (task == null) {
+ task = opFactory.create(project, uri);
+ addRef(task, ref);
+ task.addState(ref, state);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
- pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
- pending.put(uri, e);
- } else if (!e.getRefs().contains(ref)) {
- addRef(e, ref);
- e.addState(ref, state);
+ pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+ pending.put(uri, task);
+ } else if (!task.getRefs().contains(ref)) {
+ addRef(task, ref);
+ task.addState(ref, state);
}
state.increasePushTaskCount(project.get(), ref);
- repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
+ repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
}
}
@@ -492,10 +530,6 @@
}
}
- ProjectControl controlFor(Project.NameKey project) throws NoSuchProjectException {
- return projectControlFactory.controlFor(project);
- }
-
RunwayStatus requestRunway(PushOne op) {
synchronized (stateLock) {
if (op.wasCanceled()) {
@@ -511,12 +545,31 @@
return RunwayStatus.allowed();
}
- void notifyFinished(PushOne op) {
+ void notifyFinished(PushOne task) {
synchronized (stateLock) {
- inFlight.remove(op.getURI());
+ inFlight.remove(task.getURI());
+ if (!task.wasCanceled()) {
+ for (String ref : task.getRefs()) {
+ if (!refHasPendingPush(task.getURI(), ref)) {
+ replicationTasksStorage
+ .get()
+ .delete(
+ new ReplicateRefUpdate(
+ task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
+ }
+ }
+ }
}
}
+ private boolean refHasPendingPush(URIish opUri, String ref) {
+ return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
+ }
+
+ private boolean pushContainsRef(PushOne op, String ref) {
+ return op != null && op.getRefs().contains(ref);
+ }
+
boolean wouldPushProject(Project.NameKey project) {
if (!shouldReplicate(project)) {
return false;
@@ -649,6 +702,14 @@
return config.getMaxRetries();
}
+ public int getDrainQueueAttempts() {
+ return config.getDrainQueueAttempts();
+ }
+
+ public int getReplicationDelaySeconds() {
+ return config.getDelay() * 1000;
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index b2d0de2..f688cfc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -22,10 +22,12 @@
public class DestinationConfiguration {
static final int DEFAULT_REPLICATION_DELAY = 15;
static final int DEFAULT_RESCHEDULE_DELAY = 3;
+ static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
private final int delay;
private final int rescheduleDelay;
private final int retryDelay;
+ private final int drainQueueAttempts;
private final int lockErrorMaxRetries;
private final ImmutableList<String> adminUrls;
private final int poolThreads;
@@ -50,6 +52,8 @@
projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+ drainQueueAttempts =
+ Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS));
poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
@@ -77,6 +81,10 @@
return retryDelay;
}
+ public int getDrainQueueAttempts() {
+ return drainQueueAttempts;
+ }
+
public int getPoolThreads() {
return poolThreads;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
deleted file mode 100644
index 83eab86..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import com.google.gerrit.common.EventDispatcher;
-import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.server.PluginUser;
-import com.google.gerrit.server.account.GroupBackend;
-import com.google.gerrit.server.account.GroupIncludeCache;
-import com.google.gerrit.server.git.GitRepositoryManager;
-import com.google.gerrit.server.permissions.PermissionBackend;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Singleton;
-
-@Singleton
-public class DestinationFactory {
- private final Injector injector;
- private final RemoteSiteUser.Factory replicationUserFactory;
- private final PluginUser pluginUser;
- private final GitRepositoryManager gitRepositoryManager;
- private final PermissionBackend permissionBackend;
- private final GroupBackend groupBackend;
- private final ReplicationStateListener stateLog;
- private final GroupIncludeCache groupIncludeCache;
- private final DynamicItem<EventDispatcher> eventDispatcher;
-
- @Inject
- public DestinationFactory(
- Injector injector,
- RemoteSiteUser.Factory replicationUserFactory,
- PluginUser pluginUser,
- GitRepositoryManager gitRepositoryManager,
- PermissionBackend permissionBackend,
- GroupBackend groupBackend,
- ReplicationStateListener stateLog,
- GroupIncludeCache groupIncludeCache,
- DynamicItem<EventDispatcher> eventDispatcher) {
- this.injector = injector;
- this.replicationUserFactory = replicationUserFactory;
- this.pluginUser = pluginUser;
- this.gitRepositoryManager = gitRepositoryManager;
- this.permissionBackend = permissionBackend;
- this.groupBackend = groupBackend;
- this.stateLog = stateLog;
- this.groupIncludeCache = groupIncludeCache;
- this.eventDispatcher = eventDispatcher;
- }
-
- Destination create(DestinationConfiguration config) {
- return new Destination(
- injector,
- config,
- replicationUserFactory,
- pluginUser,
- gitRepositoryManager,
- permissionBackend,
- groupBackend,
- stateLog,
- groupIncludeCache,
- eventDispatcher);
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index b46a0d9..46c8892 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -14,33 +14,34 @@
package com.googlesource.gerrit.plugins.replication;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.ssh.SshAddressesModule;
-import com.google.inject.Inject;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class GerritSshApi {
+public class GerritSshApi implements AdminApi {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
static int SSH_COMMAND_FAILED = -1;
- private static final Logger log = LoggerFactory.getLogger(GerritSshApi.class);
private static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+";
- private final SshHelper sshHelper;
+ protected final SshHelper sshHelper;
+ protected final URIish uri;
private final Set<URIish> withoutDeleteProjectPlugin = new HashSet<>();
- @Inject
- protected GerritSshApi(SshHelper sshHelper) {
+ protected GerritSshApi(SshHelper sshHelper, URIish uri) {
this.sshHelper = sshHelper;
+ this.uri = uri;
}
- protected boolean createProject(URIish uri, Project.NameKey projectName, String head) {
+ @Override
+ public boolean createProject(Project.NameKey projectName, String head) {
OutputStream errStream = sshHelper.newErrorBufferStream();
String cmd = "gerrit create-project --branch " + head + " " + projectName.get();
try {
@@ -52,7 +53,8 @@
return true;
}
- protected boolean deleteProject(URIish uri, Project.NameKey projectName) {
+ @Override
+ public boolean deleteProject(Project.NameKey projectName) {
if (!withoutDeleteProjectPlugin.contains(uri)) {
OutputStream errStream = sshHelper.newErrorBufferStream();
String cmd = "deleteproject delete --yes-really-delete --force " + projectName.get();
@@ -64,30 +66,23 @@
return false;
}
if (exitCode == 1) {
- log.info(
- "DeleteProject plugin is not installed on {}; will not try to forward this operation to that host");
+ logger.atInfo().log(
+ "DeleteProject plugin is not installed on %s;"
+ + " will not try to forward this operation to that host");
withoutDeleteProjectPlugin.add(uri);
- return true;
}
}
return true;
}
- protected boolean updateHead(URIish uri, Project.NameKey projectName, String newHead) {
+ @Override
+ public boolean updateHead(Project.NameKey projectName, String newHead) {
OutputStream errStream = sshHelper.newErrorBufferStream();
String cmd = "gerrit set-head " + projectName.get() + " --new-head " + newHead;
try {
execute(uri, cmd, errStream);
} catch (IOException e) {
- log.error(
- "Error updating HEAD of remote repository at {} to {}:\n"
- + " Exception: {}\n Command: {}\n Output: {}",
- uri,
- newHead,
- e,
- cmd,
- errStream,
- e);
+ logError("updating HEAD of", uri, errStream, cmd, e);
return false;
}
return true;
@@ -114,19 +109,14 @@
URIish sshUri = toSshUri(uri);
return sshHelper.executeRemoteSsh(sshUri, cmd, errStream);
} catch (URISyntaxException e) {
- log.error("Cannot convert {} to SSH uri", uri, e);
+ logger.atSevere().withCause(e).log("Cannot convert %s to SSH uri", uri);
}
return SSH_COMMAND_FAILED;
}
public void logError(String msg, URIish uri, OutputStream errStream, String cmd, IOException e) {
- log.error(
- "Error {} remote repository at {}:\n Exception: {}\n Command: {}\n Output: {}",
- msg,
- uri,
- e,
- cmd,
- errStream,
- e);
+ logger.atSevere().withCause(e).log(
+ "Error %s remote repository at %s:\n Exception: %s\n Command: %s\n Output: %s",
+ msg, uri, e, cmd, errStream);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
new file mode 100644
index 0000000..aa6e16c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -0,0 +1,97 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
+import com.google.gerrit.reviewdb.client.Project;
+import java.io.File;
+import java.io.IOException;
+import org.eclipse.jgit.internal.storage.file.FileRepository;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
+
+public class LocalFS implements AdminApi {
+
+ private final URIish uri;
+
+ public LocalFS(URIish uri) {
+ this.uri = uri;
+ }
+
+ @Override
+ public boolean createProject(Project.NameKey project, String head) {
+ try (Repository repo = new FileRepository(uri.getPath())) {
+ repo.create(true /* bare */);
+
+ if (head != null && head.startsWith(Constants.R_REFS)) {
+ RefUpdate u = repo.updateRef(Constants.HEAD);
+ u.disableRefLog();
+ u.link(head);
+ }
+ repLog.info("Created local repository: {}", uri);
+ } catch (IOException e) {
+ repLog.error("Error creating local repository {}", uri.getPath(), e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteProject(Project.NameKey project) {
+ try {
+ recursivelyDelete(new File(uri.getPath()));
+ repLog.info("Deleted local repository: {}", uri);
+ } catch (IOException e) {
+ repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean updateHead(Project.NameKey project, String newHead) {
+ try (Repository repo = new FileRepository(uri.getPath())) {
+ if (newHead != null) {
+ RefUpdate u = repo.updateRef(Constants.HEAD);
+ u.link(newHead);
+ }
+ } catch (IOException e) {
+ repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+ return false;
+ }
+ return true;
+ }
+
+ private static void recursivelyDelete(File dir) throws IOException {
+ File[] contents = dir.listFiles();
+ if (contents != null) {
+ for (File d : contents) {
+ if (d.isDirectory()) {
+ recursivelyDelete(d);
+ } else {
+ if (!d.delete()) {
+ throw new IOException("Failed to delete: " + d.getAbsolutePath());
+ }
+ }
+ }
+ }
+ if (!dir.delete()) {
+ throw new IOException("Failed to delete: " + dir.getAbsolutePath());
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index 227804d..8b0aa3d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -15,10 +15,10 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.common.util.concurrent.Atomics;
-import com.google.gerrit.common.EventDispatcher;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import java.util.concurrent.Future;
@@ -29,7 +29,6 @@
private final AtomicReference<Future<?>> pushAllFuture;
private final ServerInformation srvInfo;
private final PushAll.Factory pushAll;
- private final ReplicationQueue queue;
private final ReplicationConfig config;
private final DynamicItem<EventDispatcher> eventDispatcher;
@@ -37,12 +36,10 @@
protected OnStartStop(
ServerInformation srvInfo,
PushAll.Factory pushAll,
- ReplicationQueue queue,
ReplicationConfig config,
DynamicItem<EventDispatcher> eventDispatcher) {
this.srvInfo = srvInfo;
this.pushAll = pushAll;
- this.queue = queue;
this.config = config;
this.eventDispatcher = eventDispatcher;
this.pushAllFuture = Atomics.newReference();
@@ -50,8 +47,6 @@
@Override
public void start() {
- queue.start();
-
if (srvInfo.getState() == ServerInformation.State.STARTUP
&& config.isReplicateAllOnPluginStart()) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
@@ -68,6 +63,5 @@
if (f != null) {
f.cancel(true);
}
- queue.stop();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index db067e2..833b02b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -43,7 +43,7 @@
WorkQueue wq,
ProjectCache projectCache,
ReplicationQueue rq,
- ReplicationStateListener stateLog,
+ ReplicationStateListeners stateLog,
@Assisted @Nullable String urlMatch,
@Assisted ReplicationFilter filter,
@Assisted ReplicationState state,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 5f0c066..5794f6e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -22,20 +22,23 @@
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.extensions.restapi.ResourceConflictException;
import com.google.gerrit.metrics.Timer1;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.git.ProjectRunnable;
-import com.google.gerrit.server.git.VisibleRefFilter;
import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
+import com.google.gerrit.server.ioutil.HexFormat;
import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.permissions.ProjectPermission;
-import com.google.gerrit.server.project.NoSuchProjectException;
-import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -92,7 +95,6 @@
private final RemoteConfig config;
private final CredentialsProvider credentialsProvider;
private final PerThreadRequestScope.Scoper threadScoper;
- private final VisibleRefFilter.Factory refFilterFactory;
private final ReplicationQueue replicationQueue;
private final Project.NameKey projectName;
@@ -110,7 +112,10 @@
private final int id;
private final long createdAt;
private final ReplicationMetrics metrics;
+ private final ProjectCache projectCache;
private final AtomicBoolean canceledWhileRunning;
+ private final TransportFactory transportFactory;
+ private DynamicItem<ReplicationPushFilter> replicationPushFilter;
@Inject
PushOne(
@@ -118,20 +123,20 @@
PermissionBackend permissionBackend,
Destination p,
RemoteConfig c,
- VisibleRefFilter.Factory rff,
CredentialsFactory cpFactory,
PerThreadRequestScope.Scoper ts,
ReplicationQueue rq,
IdGenerator ig,
- ReplicationStateListener sl,
+ ReplicationStateListeners sl,
ReplicationMetrics m,
+ ProjectCache pc,
+ TransportFactory tf,
@Assisted Project.NameKey d,
@Assisted URIish u) {
gitManager = grm;
this.permissionBackend = permissionBackend;
pool = p;
config = c;
- refFilterFactory = rff;
credentialsProvider = cpFactory.create(c.getName());
threadScoper = ts;
replicationQueue = rq;
@@ -143,13 +148,20 @@
stateLog = sl;
createdAt = System.nanoTime();
metrics = m;
+ projectCache = pc;
canceledWhileRunning = new AtomicBoolean(false);
maxRetries = p.getMaxRetries();
+ transportFactory = tf;
+ }
+
+ @Inject(optional = true)
+ public void setReplicationPushFilter(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+ this.replicationPushFilter = replicationPushFilter;
}
@Override
public void cancel() {
- repLog.info("Replication [{}] to {} was canceled", IdGenerator.format(id), getURI());
+ repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI());
canceledByReplication();
pool.pushWasCanceled(this);
}
@@ -158,7 +170,7 @@
public void setCanceledWhileRunning() {
repLog.info(
"Replication [{}] to {} was canceled while being executed",
- IdGenerator.format(id),
+ HexFormat.fromInt(id),
getURI());
canceledWhileRunning.set(true);
}
@@ -180,7 +192,7 @@
@Override
public String toString() {
- String print = "[" + IdGenerator.format(id) + "] push " + uri;
+ String print = "[" + HexFormat.fromInt(id) + "] push " + uri;
if (retryCount > 0) {
print = "(retry " + retryCount + ") " + print;
@@ -301,7 +313,7 @@
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
- MDC.put(ID_MDC_KEY, IdGenerator.format(id));
+ MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
RunwayStatus status = pool.requestRunway(this);
if (!status.isAllowed()) {
if (status.isCanceled()) {
@@ -310,7 +322,7 @@
repLog.info(
"Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
uri,
- IdGenerator.format(status.getInFlightPushId()));
+ HexFormat.fromInt(status.getInFlightPushId()));
pool.reschedule(this, Destination.RetryReason.COLLISION);
}
return;
@@ -407,15 +419,12 @@
if (pool.isCreateMissingRepos()) {
try {
Ref head = git.exactRef(Constants.HEAD);
- if (replicationQueue.createProject(projectName, head != null ? getName(head) : null)) {
+ if (replicationQueue.createProject(
+ config.getName(), projectName, head != null ? getName(head) : null)) {
repLog.warn("Missing repository created; retry replication to {}", uri);
pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
} else {
- repLog.warn(
- "Missing repository could not be created when replicating {}. "
- + "You can only create missing repositories locally, over SSH or when "
- + "using adminUrl in replication.config. See documentation for more information.",
- uri);
+ repLog.warn("Missing repository could not be created when replicating {}", uri);
}
} catch (IOException ioe) {
stateLog.error(
@@ -438,7 +447,7 @@
private void runImpl() throws IOException, PermissionBackendException {
PushResult res;
- try (Transport tn = Transport.open(git, uri)) {
+ try (Transport tn = transportFactory.open(git, uri)) {
res = pushVia(tn);
}
updateStates(res.getRemoteUpdates());
@@ -465,19 +474,19 @@
private List<RemoteRefUpdate> generateUpdates(Transport tn)
throws IOException, PermissionBackendException {
- ProjectControl pc;
- try {
- pc = pool.controlFor(projectName);
- } catch (NoSuchProjectException e) {
+ ProjectState projectState = projectCache.checkedGet(projectName);
+ if (projectState == null) {
return Collections.emptyList();
}
Map<String, Ref> local = git.getAllRefs();
boolean filter;
+ PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName);
try {
- permissionBackend.user(pc.getUser()).project(projectName).check(ProjectPermission.READ);
+ projectState.checkStatePermitsRead();
+ forProject.check(ProjectPermission.READ);
filter = false;
- } catch (AuthException e) {
+ } catch (AuthException | ResourceConflictException e) {
filter = true;
}
if (filter) {
@@ -494,10 +503,15 @@
}
local = n;
}
- local = refFilterFactory.create(pc.getProjectState(), git).filter(local, true);
+ local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build());
}
- return pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+ List<RemoteRefUpdate> remoteUpdatesList =
+ pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+
+ return replicationPushFilter == null || replicationPushFilter.get() == null
+ ? remoteUpdatesList
+ : replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList);
}
private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
index c71a792..ae0662d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -14,7 +14,8 @@
package com.googlesource.gerrit.plugins.replication;
-import com.google.gerrit.common.EventDispatcher;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.events.RefEvent;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gwtorm.server.OrmException;
@@ -23,8 +24,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.URIish;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public abstract class PushResultProcessing {
@@ -159,7 +158,7 @@
}
public static class GitUpdateProcessing extends PushResultProcessing {
- private static final Logger log = LoggerFactory.getLogger(GitUpdateProcessing.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final EventDispatcher dispatcher;
@@ -189,7 +188,7 @@
try {
dispatcher.postEvent(event);
} catch (OrmException | PermissionBackendException e) {
- log.error("Cannot post event", e);
+ logger.atSevere().withCause(e).log("Cannot post event");
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
index 91fce7f..c3556af 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
@@ -16,18 +16,11 @@
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.account.GroupMembership;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
public class RemoteSiteUser extends CurrentUser {
- public interface Factory {
- RemoteSiteUser create(@Assisted GroupMembership authGroups);
- }
-
private final GroupMembership effectiveGroups;
- @Inject
- RemoteSiteUser(@Assisted GroupMembership authGroups) {
+ public RemoteSiteUser(GroupMembership authGroups) {
effectiveGroups = authGroups;
}
@@ -35,4 +28,10 @@
public GroupMembership getEffectiveGroups() {
return effectiveGroups;
}
+
+ @Override
+ public Object getCacheKey() {
+ // Never cache a remote user
+ return new Object();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
new file mode 100644
index 0000000..ee9d4c0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -0,0 +1,110 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+
+import com.google.gerrit.reviewdb.client.Project;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.QuotedString;
+
+public class RemoteSsh implements AdminApi {
+
+ private final SshHelper sshHelper;
+ private URIish uri;
+
+ RemoteSsh(SshHelper sshHelper, URIish uri) {
+ this.sshHelper = sshHelper;
+ this.uri = uri;
+ }
+
+ @Override
+ public boolean createProject(Project.NameKey project, String head) {
+ String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+ String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare";
+ if (head != null) {
+ cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head);
+ }
+ OutputStream errStream = sshHelper.newErrorBufferStream();
+ try {
+ sshHelper.executeRemoteSsh(uri, cmd, errStream);
+ repLog.info("Created remote repository: {}", uri);
+ } catch (IOException e) {
+ repLog.error(
+ "Error creating remote repository at {}:\n"
+ + " Exception: {}\n"
+ + " Command: {}\n"
+ + " Output: {}",
+ uri,
+ e,
+ cmd,
+ errStream,
+ e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteProject(Project.NameKey project) {
+ String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+ String cmd = "rm -rf " + quotedPath;
+ OutputStream errStream = sshHelper.newErrorBufferStream();
+ try {
+ sshHelper.executeRemoteSsh(uri, cmd, errStream);
+ repLog.info("Deleted remote repository: {}", uri);
+ } catch (IOException e) {
+ repLog.error(
+ "Error deleting remote repository at {}:\n"
+ + " Exception: {}\n"
+ + " Command: {}\n"
+ + " Output: {}",
+ uri,
+ e,
+ cmd,
+ errStream,
+ e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean updateHead(Project.NameKey project, String newHead) {
+ String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
+ String cmd =
+ "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
+ OutputStream errStream = sshHelper.newErrorBufferStream();
+ try {
+ sshHelper.executeRemoteSsh(uri, cmd, errStream);
+ } catch (IOException e) {
+ repLog.error(
+ "Error updating HEAD of remote repository at {} to {}:\n"
+ + " Exception: {}\n"
+ + " Command: {}\n"
+ + " Output: {}",
+ uri,
+ newHead,
+ e,
+ cmd,
+ errStream,
+ e);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 869a49b..c9531e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.server.git.WorkQueue;
+import java.nio.file.Path;
import java.util.List;
public interface ReplicationConfig {
@@ -32,6 +33,8 @@
boolean isEmpty();
+ Path getEventsDirectory();
+
int shutdown();
void startup(WorkQueue workQueue);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
new file mode 100644
index 0000000..b92a54a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.inject.AbstractModule;
+
+/**
+ * Gerrit libModule for applying a ref-filter for outgoing replications.
+ *
+ * <p>It should be used only when an actual filter is defined, otherwise the default replication
+ * plugin behaviour will be pushing all refs without any filtering.
+ */
+public class ReplicationExtensionPointModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ DynamicItem.itemOf(binder(), ReplicationPushFilter.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index db9f35d..9004968 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,8 +17,11 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.WorkQueue;
@@ -38,28 +41,31 @@
import org.eclipse.jgit.transport.RemoteConfig;
import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.FS;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Singleton
public class ReplicationFileBasedConfig implements ReplicationConfig {
- private static final Logger log = LoggerFactory.getLogger(ReplicationFileBasedConfig.class);
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
private List<Destination> destinations;
+ private final SitePaths site;
private Path cfgPath;
private boolean replicateAllOnPluginStart;
private boolean defaultForceUpdate;
private int sshCommandTimeout;
private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
private final FileBasedConfig config;
+ private final Path pluginDataDir;
@Inject
- public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory)
+ public ReplicationFileBasedConfig(
+ SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
throws ConfigInvalidException, IOException {
+ this.site = site;
this.cfgPath = site.etc_dir.resolve("replication.config");
this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
this.destinations = allDestinations(destinationFactory);
+ this.pluginDataDir = pluginDataDir;
}
/*
@@ -86,14 +92,14 @@
return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
}
- private List<Destination> allDestinations(DestinationFactory destinationFactory)
+ private List<Destination> allDestinations(Destination.Factory destinationFactory)
throws ConfigInvalidException, IOException {
if (!config.getFile().exists()) {
- log.warn("Config file {} does not exist; not replicating", config.getFile());
+ logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
return Collections.emptyList();
}
if (config.getFile().length() == 0) {
- log.info("Config file {} is empty; not replicating", config.getFile());
+ logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
return Collections.emptyList();
}
@@ -107,7 +113,7 @@
String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
}
- replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", true);
+ replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
@@ -199,6 +205,15 @@
return destinations.isEmpty();
}
+ @Override
+ public Path getEventsDirectory() {
+ String eventsDirectory = config.getString("replication", null, "eventsDirectory");
+ if (!Strings.isNullOrEmpty(eventsDirectory)) {
+ return site.resolve(eventsDirectory);
+ }
+ return pluginDataDir;
+ }
+
Path getCfgPath() {
return cfgPath;
}
@@ -207,11 +222,54 @@
public int shutdown() {
int discarded = 0;
for (Destination cfg : destinations) {
- discarded += cfg.shutdown();
+ try {
+ drainReplicationEvents(cfg);
+ } catch (EventQueueNotEmptyException e) {
+ logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+ } finally {
+ discarded += cfg.shutdown();
+ }
}
return discarded;
}
+ void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+ int drainQueueAttempts = destination.getDrainQueueAttempts();
+ if (drainQueueAttempts == 0) {
+ return;
+ }
+ int pending = destination.getQueueInfo().pending.size();
+ int inFlight = destination.getQueueInfo().inFlight.size();
+
+ while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+ try {
+ logger.atInfo().log(
+ "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+ inFlight, pending);
+ Thread.sleep(destination.getReplicationDelaySeconds());
+ } catch (InterruptedException ie) {
+ logger.atWarning().withCause(ie).log(
+ "Wait for replication events to drain has been interrupted");
+ }
+ pending = destination.getQueueInfo().pending.size();
+ inFlight = destination.getQueueInfo().inFlight.size();
+ drainQueueAttempts--;
+ }
+
+ if (pending > 0 || inFlight > 0) {
+ throw new EventQueueNotEmptyException(
+ String.format("Pending: %d - InFlight: %d", pending, inFlight));
+ }
+ }
+
+ public static class EventQueueNotEmptyException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public EventQueueNotEmptyException(String errorMessage) {
+ super(errorMessage);
+ }
+ }
+
FileBasedConfig getConfig() {
return config;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 7b3486b..05bbb03 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project.NameKey;
+import com.google.gerrit.reviewdb.client.Project;
import java.util.Collections;
import java.util.List;
@@ -46,7 +46,7 @@
projectPatterns = patterns;
}
- public boolean matches(NameKey name) {
+ public boolean matches(Project.NameKey name) {
if (projectPatterns.isEmpty()) {
return true;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index f30e13d..5fdb375 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -21,8 +21,8 @@
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.EventTypes;
import com.google.inject.AbstractModule;
@@ -34,11 +34,13 @@
class ReplicationModule extends AbstractModule {
@Override
protected void configure() {
- bind(DestinationFactory.class).in(Scopes.SINGLETON);
+ install(new FactoryModuleBuilder().build(Destination.Factory.class));
bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+ bind(LifecycleListener.class)
+ .annotatedWith(UniqueAnnotations.create())
+ .to(ReplicationQueue.class);
DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class);
- DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ReplicationQueue.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class);
DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class);
@@ -55,14 +57,20 @@
.to(StartReplicationCapability.class);
install(new FactoryModuleBuilder().build(PushAll.Factory.class));
- install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class));
bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
- bind(ReplicationStateListener.class).to(ReplicationStateLogger.class);
+ DynamicSet.setOf(binder(), ReplicationStateListener.class);
+ DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
EventTypes.register(RefReplicatedEvent.TYPE, RefReplicatedEvent.class);
EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class);
EventTypes.register(ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class);
bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class);
+
+ DynamicItem.itemOf(binder(), AdminApiFactory.class);
+ DynamicItem.bind(binder(), AdminApiFactory.class)
+ .to(AdminApiFactory.DefaultAdminApiFactory.class);
+
+ bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
new file mode 100644
index 0000000..eb6ba90
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.annotations.ExtensionPoint;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+/**
+ * Filter that is invoked before list of remote ref updates is pushed to remote instance.
+ *
+ * <p>It can be used to filter out unwanted updates.
+ */
+@ExtensionPoint
+public interface ReplicationPushFilter {
+
+ public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 4c7bdfc..d73ab7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,34 +14,33 @@
package com.googlesource.gerrit.plugins.replication;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
-import com.google.gerrit.common.EventDispatcher;
+import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
-import org.eclipse.jgit.internal.storage.file.FileRepository;
-import org.eclipse.jgit.lib.Constants;
-import org.eclipse.jgit.lib.RefUpdate;
-import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.URIish;
-import org.eclipse.jgit.util.QuotedString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +48,6 @@
public class ReplicationQueue
implements LifecycleListener,
GitReferenceUpdatedListener,
- NewProjectCreatedListener,
ProjectDeletedListener,
HeadUpdatedListener {
static final String REPLICATION_LOG_NAME = "replication_log";
@@ -70,35 +68,39 @@
}
private final WorkQueue workQueue;
- private final SshHelper sshHelper;
private final DynamicItem<EventDispatcher> dispatcher;
private final ReplicationConfig config;
- private final GerritSshApi gerritAdmin;
+ private final DynamicItem<AdminApiFactory> adminApiFactory;
+ private final ReplicationTasksStorage replicationTasksStorage;
private volatile boolean running;
- private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue;
+ private volatile boolean replaying;
+ private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
@Inject
ReplicationQueue(
WorkQueue wq,
- SshHelper sh,
- GerritSshApi ga,
+ DynamicItem<AdminApiFactory> aaf,
ReplicationConfig rc,
DynamicItem<EventDispatcher> dis,
- ReplicationStateListener sl) {
+ ReplicationStateListeners sl,
+ ReplicationTasksStorage rts) {
workQueue = wq;
- sshHelper = sh;
dispatcher = dis;
config = rc;
stateLog = sl;
- gerritAdmin = ga;
+ adminApiFactory = aaf;
+ replicationTasksStorage = rts;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@Override
public void start() {
- config.startup(workQueue);
- running = true;
- fireBeforeStartupEvents();
+ if (!running) {
+ config.startup(workQueue);
+ running = true;
+ firePendingEvents();
+ fireBeforeStartupEvents();
+ }
}
@Override
@@ -110,11 +112,20 @@
}
}
+ public boolean isRunning() {
+ return running;
+ }
+
+ public boolean isReplaying() {
+ return replaying;
+ }
+
void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) {
scheduleFullSync(project, urlMatch, state, false);
}
- void scheduleFullSync(
+ @VisibleForTesting
+ public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
if (!running) {
stateLog.warn("Replication plugin did not finish startup before event", state);
@@ -125,6 +136,9 @@
if (cfg.wouldPushProject(project)) {
for (URIish uri : cfg.getURIs(project, urlMatch)) {
cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
+ replicationTasksStorage.persist(
+ new ReplicateRefUpdate(
+ project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
}
}
}
@@ -132,38 +146,53 @@
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
+ onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ }
+
+ private void onGitReferenceUpdated(String projectName, String refName) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
- beforeStartupEventsQueue.add(event);
+ beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName));
return;
}
- Project.NameKey project = new Project.NameKey(event.getProjectName());
+ Project.NameKey project = new Project.NameKey(projectName);
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
- if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) {
+ if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
for (URIish uri : cfg.getURIs(project, null)) {
- cfg.schedule(project, event.getRefName(), uri, state);
+ replicationTasksStorage.persist(
+ new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
+ cfg.schedule(project, refName, uri, state);
}
}
}
state.markAllPushTasksScheduled();
}
- @Override
- public void onNewProjectCreated(NewProjectCreatedListener.Event event) {
- Project.NameKey projectName = new Project.NameKey(event.getProjectName());
- for (URIish uri : getURIs(projectName, FilterType.PROJECT_CREATION)) {
- createProject(uri, projectName, event.getHeadName());
+ private void firePendingEvents() {
+ try {
+ Set<String> eventsReplayed = new HashSet<>();
+ replaying = true;
+ for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+ String eventKey = String.format("%s:%s", t.project, t.ref);
+ if (!eventsReplayed.contains(eventKey)) {
+ repLog.info("Firing pending task {}", eventKey);
+ onGitReferenceUpdated(t.project, t.ref);
+ eventsReplayed.add(eventKey);
+ }
+ }
+ } finally {
+ replaying = false;
}
}
@Override
public void onProjectDeleted(ProjectDeletedListener.Event event) {
Project.NameKey projectName = new Project.NameKey(event.getProjectName());
- for (URIish uri : getURIs(projectName, FilterType.PROJECT_DELETION)) {
+ for (URIish uri : getURIs(null, projectName, FilterType.PROJECT_DELETION)) {
deleteProject(uri, projectName);
}
}
@@ -171,24 +200,25 @@
@Override
public void onHeadUpdated(HeadUpdatedListener.Event event) {
Project.NameKey project = new Project.NameKey(event.getProjectName());
- for (URIish uri : getURIs(project, FilterType.ALL)) {
+ for (URIish uri : getURIs(null, project, FilterType.ALL)) {
updateHead(uri, project, event.getNewHeadName());
}
}
private void fireBeforeStartupEvents() {
Set<String> eventsReplayed = new HashSet<>();
- for (GitReferenceUpdatedListener.Event event : beforeStartupEventsQueue) {
+ for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", event);
- onGitReferenceUpdated(event);
+ onGitReferenceUpdated(event.getProjectName(), event.getRefName());
eventsReplayed.add(eventKey);
}
}
}
- private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) {
+ private Set<URIish> getURIs(
+ @Nullable String remoteName, Project.NameKey projectName, FilterType filterType) {
if (config.getDestinations(filterType).isEmpty()) {
return Collections.emptySet();
}
@@ -203,6 +233,10 @@
continue;
}
+ if (remoteName != null && !config.getRemoteConfigName().equals(remoteName)) {
+ continue;
+ }
+
boolean adminURLUsed = false;
for (String url : config.getAdminUrls()) {
@@ -245,201 +279,75 @@
return uris;
}
- public boolean createProject(Project.NameKey project, String head) {
+ public boolean createProject(String remoteName, Project.NameKey project, String head) {
boolean success = true;
- for (URIish uri : getURIs(project, FilterType.PROJECT_CREATION)) {
+ for (URIish uri : getURIs(remoteName, project, FilterType.PROJECT_CREATION)) {
success &= createProject(uri, project, head);
}
return success;
}
private boolean createProject(URIish replicateURI, Project.NameKey projectName, String head) {
- if (isGerrit(replicateURI)) {
- gerritAdmin.createProject(replicateURI, projectName, head);
- } else if (!replicateURI.isRemote()) {
- createLocally(replicateURI, head);
- } else if (isSSH(replicateURI)) {
- createRemoteSsh(replicateURI, head);
- } else {
- repLog.warn(
- "Cannot create new project on remote site {}."
- + " Only local paths and SSH URLs are supported"
- + " for remote repository creation",
- replicateURI);
- return false;
- }
- return true;
- }
-
- private static void createLocally(URIish uri, String head) {
- try (Repository repo = new FileRepository(uri.getPath())) {
- repo.create(true /* bare */);
-
- if (head != null && head.startsWith(Constants.R_REFS)) {
- RefUpdate u = repo.updateRef(Constants.HEAD);
- u.disableRefLog();
- u.link(head);
- }
- repLog.info("Created local repository: {}", uri);
- } catch (IOException e) {
- repLog.error("Error creating local repository {}:\n", uri.getPath(), e);
- }
- }
-
- private void createRemoteSsh(URIish uri, String head) {
- String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
- String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare";
- if (head != null) {
- cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head);
- }
- OutputStream errStream = sshHelper.newErrorBufferStream();
- try {
- sshHelper.executeRemoteSsh(uri, cmd, errStream);
- repLog.info("Created remote repository: {}", uri);
- } catch (IOException e) {
- repLog.error(
- "Error creating remote repository at {}:\n"
- + " Exception: {}\n"
- + " Command: {}\n"
- + " Output: {}",
- uri,
- e,
- cmd,
- errStream,
- e);
- }
- }
-
- private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
- if (isGerrit(replicateURI)) {
- gerritAdmin.deleteProject(replicateURI, projectName);
- repLog.info("Deleted remote repository: " + replicateURI);
- } else if (!replicateURI.isRemote()) {
- deleteLocally(replicateURI);
- } else if (isSSH(replicateURI)) {
- deleteRemoteSsh(replicateURI);
- } else {
- repLog.warn(
- "Cannot delete project on remote site {}. "
- + "Only local paths and SSH URLs are supported"
- + " for remote repository deletion",
- replicateURI);
- }
- }
-
- private static void deleteLocally(URIish uri) {
- try {
- recursivelyDelete(new File(uri.getPath()));
- repLog.info("Deleted local repository: {}", uri);
- } catch (IOException e) {
- repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
- }
- }
-
- private static void recursivelyDelete(File dir) throws IOException {
- File[] contents = dir.listFiles();
- if (contents != null) {
- for (File d : contents) {
- if (d.isDirectory()) {
- recursivelyDelete(d);
- } else {
- if (!d.delete()) {
- throw new IOException("Failed to delete: " + d.getAbsolutePath());
- }
- }
- }
- }
- if (!dir.delete()) {
- throw new IOException("Failed to delete: " + dir.getAbsolutePath());
- }
- }
-
- private void deleteRemoteSsh(URIish uri) {
- String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
- String cmd = "rm -rf " + quotedPath;
- OutputStream errStream = sshHelper.newErrorBufferStream();
- try {
- sshHelper.executeRemoteSsh(uri, cmd, errStream);
- repLog.info("Deleted remote repository: {}", uri);
- } catch (IOException e) {
- repLog.error(
- "Error deleting remote repository at {}:\n"
- + " Exception: {}\n"
- + " Command: {}\n"
- + " Output: {}",
- uri,
- e,
- cmd,
- errStream,
- e);
- }
- }
-
- private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
- if (isGerrit(replicateURI)) {
- gerritAdmin.updateHead(replicateURI, projectName, newHead);
- } else if (!replicateURI.isRemote()) {
- updateHeadLocally(replicateURI, newHead);
- } else if (isSSH(replicateURI)) {
- updateHeadRemoteSsh(replicateURI, newHead);
- } else {
- repLog.warn(
- "Cannot update HEAD of project on remote site {}."
- + " Only local paths and SSH URLs are supported"
- + " for remote HEAD update.",
- replicateURI);
- }
- }
-
- private void updateHeadRemoteSsh(URIish uri, String newHead) {
- String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
- String cmd =
- "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
- OutputStream errStream = sshHelper.newErrorBufferStream();
- try {
- sshHelper.executeRemoteSsh(uri, cmd, errStream);
- } catch (IOException e) {
- repLog.error(
- "Error updating HEAD of remote repository at {} to {}:\n"
- + " Exception: {}\n"
- + " Command: {}\n"
- + " Output: {}",
- uri,
- newHead,
- e,
- cmd,
- errStream,
- e);
- }
- }
-
- private static void updateHeadLocally(URIish uri, String newHead) {
- try (Repository repo = new FileRepository(uri.getPath())) {
- if (newHead != null) {
- RefUpdate u = repo.updateRef(Constants.HEAD);
- u.link(newHead);
- }
- } catch (IOException e) {
- repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
- }
- }
-
- private static boolean isSSH(URIish uri) {
- String scheme = uri.getScheme();
- if (!uri.isRemote()) {
- return false;
- }
- if (scheme != null && scheme.toLowerCase().contains("ssh")) {
+ Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+ if (adminApi.isPresent() && adminApi.get().createProject(projectName, head)) {
return true;
}
- if (scheme == null && uri.getHost() != null && uri.getPath() != null) {
- return true;
- }
+
+ warnCannotPerform("create new project", replicateURI);
return false;
}
- private static boolean isGerrit(URIish uri) {
- String scheme = uri.getScheme();
- return scheme != null && scheme.toLowerCase().equals("gerrit+ssh");
+ private void deleteProject(URIish replicateURI, Project.NameKey projectName) {
+ Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+ if (adminApi.isPresent()) {
+ adminApi.get().deleteProject(projectName);
+ return;
+ }
+
+ warnCannotPerform("delete project", replicateURI);
+ }
+
+ private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) {
+ Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI);
+ if (adminApi.isPresent()) {
+ adminApi.get().updateHead(projectName, newHead);
+ return;
+ }
+
+ warnCannotPerform("update HEAD of project", replicateURI);
+ }
+
+ private void warnCannotPerform(String op, URIish uri) {
+ repLog.warn("Cannot {} on remote site {}.", op, uri);
+ }
+
+ private static class ReferenceUpdatedEvent {
+ private String projectName;
+ private String refName;
+
+ public ReferenceUpdatedEvent(String projectName, String refName) {
+ this.projectName = projectName;
+ this.refName = refName;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public String getRefName() {
+ return refName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(projectName, refName);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (obj instanceof ReferenceUpdatedEvent)
+ && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName)
+ && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName);
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index cd7a3cf..aa965fe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -15,7 +15,6 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.Project.NameKey;
import com.google.gerrit.server.events.RefEvent;
public class ReplicationScheduledEvent extends RefEvent {
@@ -38,7 +37,7 @@
}
@Override
- public NameKey getProjectNameKey() {
+ public Project.NameKey getProjectNameKey() {
return new Project.NameKey(project);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 86557e2..df8f3f4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -23,6 +23,7 @@
import org.eclipse.jgit.transport.URIish;
public class ReplicationState {
+
private boolean allScheduled;
private final PushResultProcessing pushResultProcessing;
@@ -49,7 +50,7 @@
private int totalPushTasksCount;
private int finishedPushTasksCount;
- public ReplicationState(PushResultProcessing processing) {
+ ReplicationState(PushResultProcessing processing) {
pushResultProcessing = processing;
statusByProjectRef = HashBasedTable.create();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
new file mode 100644
index 0000000..d7bf227
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+
+public class ReplicationStateListeners implements ReplicationStateListener {
+ private final DynamicSet<ReplicationStateListener> listeners;
+
+ @Inject
+ ReplicationStateListeners(DynamicSet<ReplicationStateListener> stateListeners) {
+ this.listeners = stateListeners;
+ }
+
+ @Override
+ public void warn(String msg, ReplicationState... states) {
+ for (ReplicationStateListener listener : listeners) {
+ listener.warn(msg, states);
+ }
+ }
+
+ @Override
+ public void error(String msg, ReplicationState... states) {
+ for (ReplicationStateListener listener : listeners) {
+ listener.error(msg, states);
+ }
+ }
+
+ @Override
+ public void error(String msg, Throwable t, ReplicationState... states) {
+ for (ReplicationStateListener listener : listeners) {
+ listener.error(msg, t, states);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
new file mode 100644
index 0000000..64397f9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -0,0 +1,137 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.hash.Hashing;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class ReplicationTasksStorage {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private boolean disableDeleteForTesting;
+
+ public static class ReplicateRefUpdate {
+ public final String project;
+ public final String ref;
+ public final String uri;
+ public final String remote;
+
+ public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+ this.project = project;
+ this.ref = ref;
+ this.uri = uri.toASCIIString();
+ this.remote = remote;
+ }
+
+ @Override
+ public String toString() {
+ return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+ }
+ }
+
+ private static Gson GSON = new Gson();
+
+ private final Path refUpdates;
+
+ @Inject
+ ReplicationTasksStorage(ReplicationConfig config) {
+ refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ }
+
+ public String persist(ReplicateRefUpdate r) {
+ String json = GSON.toJson(r) + "\n";
+ String eventKey = sha1(json).name();
+ Path file = refUpdates().resolve(eventKey);
+
+ if (Files.exists(file)) {
+ return eventKey;
+ }
+
+ try {
+ logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+ Files.write(file, json.getBytes(UTF_8));
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
+ }
+ return eventKey;
+ }
+
+ @VisibleForTesting
+ public void disableDeleteForTesting(boolean deleteDisabled) {
+ this.disableDeleteForTesting = deleteDisabled;
+ }
+
+ public void delete(ReplicateRefUpdate r) {
+ String taskJson = GSON.toJson(r) + "\n";
+ String taskKey = sha1(taskJson).name();
+ Path file = refUpdates().resolve(taskKey);
+
+ if (disableDeleteForTesting) {
+ logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
+ return;
+ }
+
+ try {
+ logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+ Files.delete(file);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+ }
+ }
+
+ public List<ReplicateRefUpdate> list() {
+ ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
+ try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
+ for (Path e : events) {
+ if (Files.isRegularFile(e)) {
+ String json = new String(Files.readAllBytes(e), UTF_8);
+ result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ }
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error when firing pending events");
+ }
+ return result;
+ }
+
+ @SuppressWarnings("deprecation")
+ private ObjectId sha1(String s) {
+ return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
+ }
+
+ private Path refUpdates() {
+ try {
+ return Files.createDirectories(refUpdates);
+ } catch (IOException e) {
+ throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
index 2b0c16b..c518091 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -17,6 +17,7 @@
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import java.io.IOException;
+import java.util.Objects;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.storage.file.FileBasedConfig;
@@ -49,8 +50,8 @@
@Override
public SecureCredentialsProvider create(String remoteName) {
- String user = config.getString("remote", remoteName, "username");
- String pass = config.getString("remote", remoteName, "password");
+ String user = Objects.toString(config.getString("remote", remoteName, "username"), "");
+ String pass = Objects.toString(config.getString("remote", remoteName, "password"), "");
return new SecureCredentialsProvider(user, pass);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
index c4294a9..62b4036 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
@@ -20,7 +20,7 @@
import org.eclipse.jgit.transport.URIish;
/** Looks up a remote's password in secure.config. */
-class SecureCredentialsProvider extends CredentialsProvider {
+public class SecureCredentialsProvider extends CredentialsProvider {
private final String cfgUser;
private final String cfgPass;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
new file mode 100644
index 0000000..ba14299
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public interface TransportFactory {
+
+ Transport open(Repository local, URIish uri) throws NotSupportedException, TransportException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
new file mode 100644
index 0000000..58c1214
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public class TransportFactoryImpl implements TransportFactory {
+
+ @Override
+ public Transport open(Repository git, URIish uri)
+ throws NotSupportedException, TransportException {
+ return Transport.open(git, uri);
+ }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index e70094c..c9356b0 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -13,6 +13,11 @@
sudo su -c 'ssh mirror1.us.some.org echo' gerrit2
```
+*NOTE:* make sure the local user's ssh keys format is PEM, here how to generate them:
+```
+ ssh-keygen -m PEM -t rsa -C "your_email@example.com"
+```
+
<a name="example_file">
Next, create `$site_path/etc/replication.config` as a Git-style config
file, for example to replicate in parallel to four different hosts:</a>
@@ -65,7 +70,7 @@
gerrit.replicateOnStartup
: If true, replicates to all remotes on startup to ensure they
- are in-sync with this server. By default, true.
+ are in-sync with this server. By default, false.
gerrit.autoReload
: If true, automatically reloads replication destinations and settings
@@ -124,6 +129,17 @@
By default, pushes are retried indefinitely.
+replication.eventsDirectory
+: Directory where replication events are persisted
+
+ When scheduling a replication, the replication event is persisted
+ under this directory. When the replication is done, the event is deleted.
+ If plugin is stopped before all scheduled replications are done, the
+ persisted events will not be deleted. When the plugin is started again,
+ it will trigger all replications found under this directory.
+
+ When not set, defaults to the plugin's data directory.
+
remote.NAME.url
: Address of the remote server to push to. Multiple URLs may be
specified within a single remote block, listing different
@@ -272,6 +288,20 @@
By default, use replication.maxRetries.
+remote.NAME.drainQueueAttempts
+: Maximum number of attempts to drain the replication event queue before
+ stopping the plugin.
+
+ When stopping the plugin, the shutdown will be delayed trying to drain
+ the event queue.
+
+ The maximum delay is "drainQueueAttempts" * "replicationDelay" seconds.
+
+ When not set or set to 0, the queue is not drained and the pending
+ replication events are cancelled.
+
+ By default, do not drain replication events.
+
remote.NAME.threads
: Number of worker threads to dedicate to pushing to the
repositories described by this remote. Each thread can push
@@ -298,7 +328,7 @@
If the remote site was not available at the moment when a new
project was created, it will be created if during the replication
of a ref it is found to be missing.
-
+
If false, repositories are never created automatically on this
remote.
@@ -398,7 +428,7 @@
File `~/.ssh/config`
--------------------
-If present, Gerrit reads and caches `~/.ssh/config` at startup, and
+Gerrit reads and caches the `~/.ssh/config` at startup, and
supports most SSH configuration options. For example:
```
@@ -412,6 +442,15 @@
PreferredAuthentications publickey
```
+*IdentityFile* and *PreferredAuthentications* must be defined for all the hosts.
+Here an example of the minimum `~/.ssh/config` needed:
+
+```
+ Host *
+ IdentityFile ~/.ssh/id_rsa
+ PreferredAuthentications publickey
+```
+
Supported options:
* Host
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md
new file mode 100644
index 0000000..076aded
--- /dev/null
+++ b/src/main/resources/Documentation/extension-point.md
@@ -0,0 +1,53 @@
+@PLUGIN@ extension points
+==============
+
+The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script.
+Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and
+implemented by another plugin by declaring a `provided` dependency from the replication plugin.
+
+### Install extension libModule
+
+The replication plugin's extension points are defined in the `c.g.g.p.r.ReplicationExtensionPointModule`
+that needs to be configured as libModule.
+
+Create a symbolic link from `$GERRIT_SITE/plugins/replication.jar` into `$GERRIT_SITE/lib`
+and then add the replication extension module to the `gerrit.config`.
+
+Example:
+
+```
+[gerrit]
+ installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule
+```
+
+> **NOTE**: Use and configuration of the replication plugin as library module requires a Gerrit server restart and does not support hot plugin install or upgrade.
+
+
+### Extension points
+
+* `com.googlesource.gerrit.plugins.replication.ReplicationPushFilter`
+
+ Filter out the ref updates pushed to a remote instance.
+ Only one filter at a time is supported. Filter implementation needs to bind a `DynamicItem`.
+
+ Default: no filtering
+
+ Example:
+
+ ```
+ DynamicItem.bind(binder(), ReplicationPushFilter.class).to(ReplicationPushFilterImpl.class);
+ ```
+
+* `com.googlesource.gerrit.plugins.replication.AdminApiFactory`
+
+ Create an instance of `AdminApi` for a given remote URL. The default implementation
+ provides API instances for local FS, remote SSH, and remote Gerrit.
+
+ Only one factory at a time is supported. The implementation needs to be bound as a
+ `DynamicItem`.
+
+ Example:
+
+ ```
+ DynamicItem.bind(binder(), AdminApiFactory.class).to(AdminApiFactoryImpl.class);
+ ```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 337bd1d..5fa7b98 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -22,8 +22,8 @@
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
-import com.google.gerrit.common.EventDispatcher;
import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gwtorm.client.KeyUtil;
import com.google.gwtorm.server.OrmException;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
new file mode 100644
index 0000000..af065b3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -0,0 +1,375 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackend.ForProject;
+import com.google.gerrit.server.permissions.PermissionBackend.WithUser;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gerrit.server.util.IdGenerator;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.AssertionFailedError;
+import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectIdRef;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.FetchConnection;
+import org.eclipse.jgit.transport.PushConnection;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PushOneTest {
+ private static final int TEST_PUSH_TIMEOUT_SECS = 10;
+
+ private GitRepositoryManager gitRepositoryManagerMock;
+ private Repository repositoryMock;
+ private PermissionBackend permissionBackendMock;
+ private PermissionBackend.WithUser withUserMock;
+ private PermissionBackend.ForProject forProjectMock;
+
+ private Destination destinationMock;
+ private RemoteConfig remoteConfigMock;
+ private RefSpec refSpecMock;
+ private CredentialsFactory credentialsFactory;
+ private PerThreadRequestScope.Scoper threadRequestScoperMock;
+ private ReplicationQueue replicationQueueMock;
+ private IdGenerator idGeneratorMock;
+ private ReplicationStateListeners replicationStateListenersMock;
+ private ReplicationMetrics replicationMetricsMock;
+ private Timer1.Context timerContextMock;
+ private ProjectCache projectCacheMock;
+ private TransportFactory transportFactoryMock;
+ private Transport transportMock;
+ private FetchConnection fetchConnection;
+ private PushConnection pushConnection;
+ private ProjectState projectStateMock;
+ private RefUpdate refUpdateMock;
+
+ private Project.NameKey projectNameKey;
+ private URIish urish;
+ private Map<String, Ref> localRefs;
+
+ private Map<String, Ref> remoteRefs;
+ private CountDownLatch isCallFinished;
+ private Ref newLocalRef;
+
+ @Before
+ public void setup() throws Exception {
+ projectNameKey = new Project.NameKey("fooProject");
+ urish = new URIish("http://foo.com/fooProject.git");
+
+ newLocalRef =
+ new ObjectIdRef.Unpeeled(
+ NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001"));
+
+ localRefs = new HashMap<>();
+ localRefs.put("fooProject", newLocalRef);
+
+ Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId());
+ remoteRefs = new HashMap<>();
+ remoteRefs.put("fooProject", remoteRef);
+
+ isCallFinished = new CountDownLatch(1);
+
+ setupMocks();
+ }
+
+ private void setupMocks() throws Exception {
+ FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
+ config.setString("remote", "Replication", "push", "foo");
+
+ setupRefUpdateMock();
+ setupRepositoryMock(config);
+ setupGitRepoManagerMock();
+
+ projectStateMock = createNiceMock(ProjectState.class);
+ forProjectMock = createNiceMock(ForProject.class);
+ setupWithUserMock();
+ setupPermissionBackedMock();
+
+ setupDestinationMock();
+
+ setupRefSpecMock();
+ setupRemoteConfigMock();
+
+ credentialsFactory = createNiceMock(CredentialsFactory.class);
+
+ setupFetchConnectionMock();
+ setupPushConnectionMock();
+ setupRequestScopeMock();
+ replicationQueueMock = createNiceMock(ReplicationQueue.class);
+ idGeneratorMock = createNiceMock(IdGenerator.class);
+ replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+
+ timerContextMock = createNiceMock(Timer1.Context.class);
+ setupReplicationMetricsMock();
+
+ setupTransportMock();
+
+ setupProjectCacheMock();
+
+ replay(
+ gitRepositoryManagerMock,
+ refUpdateMock,
+ repositoryMock,
+ permissionBackendMock,
+ destinationMock,
+ remoteConfigMock,
+ credentialsFactory,
+ threadRequestScoperMock,
+ replicationQueueMock,
+ idGeneratorMock,
+ replicationStateListenersMock,
+ replicationMetricsMock,
+ projectCacheMock,
+ timerContextMock,
+ transportFactoryMock,
+ projectStateMock,
+ withUserMock,
+ forProjectMock,
+ fetchConnection,
+ pushConnection,
+ refSpecMock);
+ }
+
+ @Test
+ public void shouldPushAllRefsWhenNoFilters() throws InterruptedException, IOException {
+ shouldPushAllRefsWithDynamicItemFilter(DynamicItem.itemOf(ReplicationPushFilter.class, null));
+ }
+
+ @Test
+ public void shouldPushAllRefsWhenNoFiltersSetup() throws InterruptedException, IOException {
+ shouldPushAllRefsWithDynamicItemFilter(null);
+ }
+
+ private void shouldPushAllRefsWithDynamicItemFilter(
+ DynamicItem<ReplicationPushFilter> replicationPushFilter)
+ throws IOException, NotSupportedException, TransportException, InterruptedException {
+ List<RemoteRefUpdate> expectedUpdates =
+ Arrays.asList(
+ new RemoteRefUpdate(
+ repositoryMock,
+ newLocalRef.getName(),
+ newLocalRef.getObjectId(),
+ "fooProject",
+ false,
+ "fooProject",
+ null));
+
+ PushResult pushResult = new PushResult();
+
+ expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
+ .andReturn(pushResult)
+ .once();
+ replay(transportMock);
+
+ PushOne pushOne = createPushOne(replicationPushFilter);
+
+ pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.run();
+
+ isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
+
+ verify(transportMock);
+ }
+
+ @Test
+ public void shouldBlockReplicationUsingPushFilter() throws InterruptedException, IOException {
+ DynamicItem<ReplicationPushFilter> replicationPushFilter =
+ DynamicItem.itemOf(
+ ReplicationPushFilter.class,
+ new ReplicationPushFilter() {
+
+ @Override
+ public List<RemoteRefUpdate> filter(
+ String projectName, List<RemoteRefUpdate> remoteUpdatesList) {
+ return Collections.emptyList();
+ }
+ });
+
+ // easymock way to check if method was never called
+ expect(transportMock.push(anyObject(), anyObject()))
+ .andThrow(new AssertionFailedError())
+ .anyTimes();
+ replay(transportMock);
+
+ PushOne pushOne = createPushOne(replicationPushFilter);
+
+ pushOne.addRef(PushOne.ALL_REFS);
+ pushOne.run();
+
+ isCallFinished.await(10, TimeUnit.SECONDS);
+
+ verify(transportMock);
+ }
+
+ private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
+ PushOne push =
+ new PushOne(
+ gitRepositoryManagerMock,
+ permissionBackendMock,
+ destinationMock,
+ remoteConfigMock,
+ credentialsFactory,
+ threadRequestScoperMock,
+ replicationQueueMock,
+ idGeneratorMock,
+ replicationStateListenersMock,
+ replicationMetricsMock,
+ projectCacheMock,
+ transportFactoryMock,
+ projectNameKey,
+ urish);
+
+ push.setReplicationPushFilter(replicationPushFilter);
+ return push;
+ }
+
+ private void setupProjectCacheMock() throws IOException {
+ projectCacheMock = createNiceMock(ProjectCache.class);
+ expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+ }
+
+ private void setupTransportMock() throws NotSupportedException, TransportException {
+ transportMock = createNiceMock(Transport.class);
+ expect(transportMock.openFetch()).andReturn(fetchConnection);
+ transportFactoryMock = createNiceMock(TransportFactory.class);
+ expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+ }
+
+ private void setupReplicationMetricsMock() {
+ replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
+ expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+ }
+
+ private void setupRequestScopeMock() {
+ threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
+ expect(threadRequestScoperMock.scope(anyObject()))
+ .andAnswer(
+ new IAnswer<Callable<Object>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Callable<Object> answer() throws Throwable {
+ Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+ return new Callable<Object>() {
+
+ @Override
+ public Object call() throws Exception {
+ Object result = originalCall.call();
+ isCallFinished.countDown();
+ return result;
+ }
+ };
+ }
+ })
+ .anyTimes();
+ }
+
+ private void setupPushConnectionMock() {
+ pushConnection = createNiceMock(PushConnection.class);
+ expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+ }
+
+ private void setupFetchConnectionMock() {
+ fetchConnection = createNiceMock(FetchConnection.class);
+ expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+ }
+
+ private void setupRemoteConfigMock() {
+ remoteConfigMock = createNiceMock(RemoteConfig.class);
+ expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+ }
+
+ private void setupRefSpecMock() {
+ refSpecMock = createNiceMock(RefSpec.class);
+ expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
+ expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
+ expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
+ expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+ }
+
+ private void setupDestinationMock() {
+ destinationMock = createNiceMock(Destination.class);
+ expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+ }
+
+ private void setupPermissionBackedMock() {
+ permissionBackendMock = createNiceMock(PermissionBackend.class);
+ expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+ }
+
+ private void setupWithUserMock() {
+ withUserMock = createNiceMock(WithUser.class);
+ expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+ }
+
+ private void setupGitRepoManagerMock() throws IOException {
+ gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
+ expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void setupRepositoryMock(FileBasedConfig config) throws IOException {
+ repositoryMock = createNiceMock(Repository.class);
+ expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
+ expect(repositoryMock.getAllRefs()).andReturn(localRefs);
+ expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+ }
+
+ private void setupRefUpdateMock() {
+ refUpdateMock = createNiceMock(RefUpdate.class);
+ expect(refUpdateMock.getOldObjectId())
+ .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
+ .anyTimes();
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
new file mode 100644
index 0000000..111a792
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
@@ -0,0 +1,64 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.Objects;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+
+public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
+ Collection<RemoteRefUpdate> expectedRemoteRefs;
+
+ public static Collection<RemoteRefUpdate> eqRemoteRef(
+ Collection<RemoteRefUpdate> expectedRemoteRefs) {
+ EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
+ return null;
+ }
+
+ public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
+ this.expectedRemoteRefs = expectedRemoteRefs;
+ }
+
+ @Override
+ public boolean matches(Object argument) {
+ if (!(argument instanceof Collection)) return false;
+
+ @SuppressWarnings("unchecked")
+ Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
+
+ if (expectedRemoteRefs.size() != refs.size()) return false;
+ return refs.stream()
+ .allMatch(
+ ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {
+ buffer.append("expected:" + expectedRemoteRefs.toString());
+ }
+
+ private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
+ return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
+ && Objects.equals(ref.getStatus(), expectedRef.getStatus())
+ && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
+ && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
+ && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
+ && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
+ && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
+ && Objects.equals(ref.getMessage(), expectedRef.getMessage());
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
new file mode 100644
index 0000000..61a53f3
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -0,0 +1,377 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.common.ProjectInfo;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RemoteRefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationIT extends LightweightPluginDaemonTest {
+ private static final Optional<String> ALL_PROJECTS = Optional.empty();
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final int TEST_REPLICATION_DELAY = 1;
+ private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+ @Inject private SitePaths sitePaths;
+ private Path pluginDataDir;
+ private Path gitPath;
+ private Path storagePath;
+ private FileBasedConfig config;
+ private ReplicationTasksStorage tasksStorage;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ gitPath = sitePaths.site_path.resolve("git");
+
+ config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ config.save();
+
+ super.setUpTestPlugin();
+
+ pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+ storagePath = pluginDataDir.resolve("ref-updates");
+ tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class);
+ cleanupReplicationTasks();
+ tasksStorage.disableDeleteForTesting(true);
+ }
+
+ @Test
+ public void shouldReplicateNewProject() throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey sourceProject = createTestProject("foo");
+
+ assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
+
+ waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
+
+ ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
+ assertThat(replicaProject).isNotNull();
+ }
+
+ @Test
+ public void shouldReplicateNewChangeRef() throws Exception {
+ Project.NameKey targetProject = createTestProject("projectreplica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
+ @Test
+ public void shouldReplicateNewBranch() throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey targetProject = createTestProject("projectreplica");
+ String newBranch = "refs/heads/mybranch";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+ assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+ try (Repository repo = repoManager.openRepository(targetProject);
+ Repository sourceRepo = repoManager.openRepository(project)) {
+ waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+ Ref masterRef = getRef(sourceRepo, master);
+ Ref targetBranchRef = getRef(repo, newBranch);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+ }
+ }
+
+ @Test
+ public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+ Project.NameKey targetProject1 = createTestProject("projectreplica1");
+ Project.NameKey targetProject2 = createTestProject("projectreplica2");
+
+ setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+ setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+ reloadConfig();
+
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+ try (Repository repo1 = repoManager.openRepository(targetProject1);
+ Repository repo2 = repoManager.openRepository(targetProject2)) {
+ waitUntil(
+ () ->
+ (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+ Ref targetBranchRef1 = getRef(repo1, sourceRef);
+ assertThat(targetBranchRef1).isNotNull();
+ assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+ Ref targetBranchRef2 = getRef(repo2, sourceRef);
+ assertThat(targetBranchRef2).isNotNull();
+ assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
+ @Test
+ public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+ List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+ createTestProject("projectreplica1");
+ createTestProject("projectreplica2");
+
+ setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+ setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+ config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100);
+ reloadConfig();
+
+ createChange();
+
+ assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+
+ setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+ setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+ }
+
+ @Test
+ public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception {
+ PushResultProcessing pushResultProcessing =
+ new PushResultProcessing() {
+
+ @Override
+ void onRefReplicatedToOneNode(
+ String project,
+ String ref,
+ URIish uri,
+ ReplicationState.RefPushResult status,
+ RemoteRefUpdate.Status refStatus) {}
+
+ @Override
+ void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {}
+
+ @Override
+ void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {}
+ };
+
+ createTestProject("projectreplica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ plugin
+ .getSysInjector()
+ .getInstance(ReplicationQueue.class)
+ .scheduleFullSync(project, null, new ReplicationState(pushResultProcessing), true);
+
+ assertThat(listReplicationTasks(".*all.*")).hasSize(1);
+ }
+
+ @Test
+ public void shouldReplicateHeadUpdate() throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey targetProject = createTestProject("projectreplica");
+ String newHead = "refs/heads/newhead";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newHead).create(input);
+ gApi.projects().name(project.get()).head(newHead);
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, newHead) != null);
+
+ Ref targetProjectHead = getRef(repo, Constants.HEAD);
+ assertThat(targetProjectHead).isNotNull();
+ assertThat(targetProjectHead.getTarget().getName()).isEqualTo(newHead);
+ }
+ }
+
+ @Test
+ public void shouldNotDrainTheQueueWhenReloading() throws Exception {
+ // Setup repo to replicate
+ Project.NameKey targetProject = createTestProject("projectreplica");
+ String remoteName = "doNotDrainQueue";
+ setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+ Result pushResult = createChange();
+ shutdownConfig();
+
+ pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ assertThrows(
+ InterruptedException.class,
+ () -> {
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ }
+ });
+ }
+
+ @Test
+ public void shouldDrainTheQueueWhenReloading() throws Exception {
+ // Setup repo to replicate
+ Project.NameKey targetProject = createTestProject("projectreplica");
+ String remoteName = "drainQueue";
+ setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+ config.setInt("remote", remoteName, "drainQueueAttempts", 2);
+ config.save();
+ reloadConfig();
+
+ Result pushResult = createChange();
+ shutdownConfig();
+
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
+ private Project.NameKey createTestProject(String name) throws Exception {
+ return createProject(name);
+ }
+
+ private Ref getRef(Repository repo, String branchName) throws IOException {
+ return repo.getRefDatabase().exactRef(branchName);
+ }
+
+ private Ref checkedGetRef(Repository repo, String branchName) {
+ try {
+ return repo.getRefDatabase().exactRef(branchName);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+ return null;
+ }
+ }
+
+ private void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+ setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+ }
+
+ private void setReplicationDestination(
+ String remoteName, List<String> replicaSuffixes, Optional<String> project)
+ throws IOException {
+
+ List<String> replicaUrls =
+ replicaSuffixes.stream()
+ .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+ .collect(toList());
+ config.setStringList("remote", remoteName, "url", replicaUrls);
+ config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+ config.save();
+ }
+
+ private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+ WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+ }
+
+ private void reloadConfig() {
+ plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
+ }
+
+ private void shutdownConfig() {
+ plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
+ }
+
+ private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
+ Pattern refmaskPattern = Pattern.compile(refRegex);
+ return tasksStorage.list().stream()
+ .filter(task -> refmaskPattern.matcher(task.ref).matches())
+ .collect(toList());
+ }
+
+ private void cleanupReplicationTasks() throws IOException {
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+ for (Path path : files) {
+ path.toFile().delete();
+ }
+ }
+ }
+
+ private boolean projectExists(Project.NameKey name) {
+ try (Repository r = repoManager.openRepository(name)) {
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
index 881a282..2a395ca 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -56,13 +56,13 @@
private FileBasedConfig config;
@Override
- public void setUp() throws Exception {
+ public void setUpTestPlugin() throws Exception {
gitPath = sitePaths.site_path.resolve("git");
config =
new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
setReplicationDestination("foo", "replica");
- super.setUp();
+ super.setUpTestPlugin();
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
new file mode 100644
index 0000000..586b56c
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import com.google.common.base.Stopwatch;
+import java.time.Duration;
+import java.util.function.Supplier;
+
+public class WaitUtil {
+ public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout)
+ throws InterruptedException {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (!waitCondition.get()) {
+ if (stopwatch.elapsed().compareTo(timeout) > 0) {
+ throw new InterruptedException();
+ }
+ MILLISECONDS.sleep(50);
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
new file mode 100644
index 0000000..0ccb0af
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static com.googlesource.gerrit.plugins.replication.WaitUtil.waitUntil;
+
+import java.time.Duration;
+import org.junit.Test;
+
+public class WaitUtilTest {
+
+ @Test
+ public void shouldFailWhenConditionNotMetWithinTimeout() throws Exception {
+ assertThrows(
+ InterruptedException.class,
+ () -> waitUntil(() -> returnTrue() == false, Duration.ofSeconds(1)));
+ }
+
+ @Test
+ public void shouldNotFailWhenConditionIsMetWithinTimeout() throws Exception {
+ waitUntil(() -> returnTrue() == true, Duration.ofSeconds(1));
+ }
+
+ private static boolean returnTrue() {
+ return true;
+ }
+}