Merge branch 'stable-2.15' into stable-2.16 * stable-2.15: Fix issue with dropping events on start Change-Id: I8d1b52bcf8f8c288892492b4375178756c784ec3
diff --git a/BUILD b/BUILD index 0a54c3e..50615d8 100644 --- a/BUILD +++ b/BUILD
@@ -21,7 +21,10 @@ junit_tests( name = "replication_tests", - srcs = glob(["src/test/java/**/*Test.java"]), + srcs = glob([ + "src/test/java/**/*Test.java", + "src/test/java/**/*IT.java", + ]), tags = ["replication"], visibility = ["//visibility:public"], deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [ @@ -32,7 +35,7 @@ java_library( name = "replication_util", - testonly = 1, + testonly = True, srcs = glob( ["src/test/java/**/*.java"], exclude = ["src/test/java/**/*Test.java"],
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java new file mode 100644 index 0000000..acbf763 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -0,0 +1,25 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.gerrit.reviewdb.client.Project; + +public interface AdminApi { + public boolean createProject(Project.NameKey project, String head); + + public boolean deleteProject(Project.NameKey project); + + public boolean updateHead(Project.NameKey project, String newHead); +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java new file mode 100644 index 0000000..de6e91e --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApiFactory.java
@@ -0,0 +1,73 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.util.Optional; +import org.eclipse.jgit.transport.URIish; + +/** Factory for creating an {@link AdminApi} instance for a remote URI. */ +public interface AdminApiFactory { + /** + * Create an {@link AdminApi} for the given remote URI. + * + * @param uri the remote URI. + * @return An API for the given remote URI, or {@code Optional.empty} if there is no appropriate + * API for the URI. + */ + Optional<AdminApi> create(URIish uri); + + @Singleton + static class DefaultAdminApiFactory implements AdminApiFactory { + protected final SshHelper sshHelper; + + @Inject + public DefaultAdminApiFactory(SshHelper sshHelper) { + this.sshHelper = sshHelper; + } + + @Override + public Optional<AdminApi> create(URIish uri) { + if (isGerrit(uri)) { + return Optional.of(new GerritSshApi(sshHelper, uri)); + } else if (!uri.isRemote()) { + return Optional.of(new LocalFS(uri)); + } else if (isSSH(uri)) { + return Optional.of(new RemoteSsh(sshHelper, uri)); + } + return Optional.empty(); + } + } + + static boolean isGerrit(URIish uri) { + String scheme = uri.getScheme(); + return scheme != null && scheme.toLowerCase().equals("gerrit+ssh"); + } + + static boolean isSSH(URIish uri) { + if (!uri.isRemote()) { + return false; + } + String scheme = uri.getScheme(); + if (scheme != null && scheme.toLowerCase().contains("ssh")) { + return true; + } + if (scheme == null && uri.getHost() != null && uri.getPath() != null) { + return true; + } + return false; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index 5d6e409..02daa6d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -13,36 +13,50 @@ // limitations under the License. package com.googlesource.gerrit.plugins.replication; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.FluentLogger; import com.google.gerrit.common.FileUtil; +import com.google.gerrit.extensions.annotations.PluginData; import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; +import com.google.inject.Provider; import com.google.inject.Singleton; import java.io.IOException; +import java.nio.file.Path; import java.util.List; import org.eclipse.jgit.errors.ConfigInvalidException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Singleton public class AutoReloadConfigDecorator implements ReplicationConfig { - private static final Logger log = LoggerFactory.getLogger(AutoReloadConfigDecorator.class); - private ReplicationFileBasedConfig currentConfig; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private volatile ReplicationFileBasedConfig currentConfig; private long currentConfigTs; + private long lastFailedConfigTs; private final SitePaths site; - private final WorkQueue workQueue; - private final DestinationFactory destinationFactory; + private final Destination.Factory destinationFactory; + private final Path pluginDataDir; + // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with + // ReplicationConfig + private final Provider<ReplicationQueue> replicationQueue; + + private volatile boolean shuttingDown; @Inject public AutoReloadConfigDecorator( - SitePaths site, WorkQueue workQueue, DestinationFactory destinationFactory) + SitePaths site, + Destination.Factory destinationFactory, + Provider<ReplicationQueue> replicationQueue, + @PluginData Path pluginDataDir) throws ConfigInvalidException, IOException { this.site = site; this.destinationFactory = destinationFactory; + this.pluginDataDir = pluginDataDir; this.currentConfig = loadConfig(); this.currentConfigTs = getLastModified(currentConfig); - this.workQueue = workQueue; + this.replicationQueue = replicationQueue; } private static long getLastModified(ReplicationFileBasedConfig cfg) { @@ -50,7 +64,7 @@ } private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException { - return new ReplicationFileBasedConfig(site, destinationFactory); + return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir); } private synchronized boolean isAutoReload() { @@ -64,25 +78,42 @@ } private void reloadIfNeeded() { - try { - if (isAutoReload()) { - long lastModified = getLastModified(currentConfig); - if (lastModified > currentConfigTs) { - ReplicationFileBasedConfig newConfig = loadConfig(); - newConfig.startup(workQueue); - int discarded = currentConfig.shutdown(); + reload(false); + } - this.currentConfig = newConfig; - this.currentConfigTs = lastModified; - log.info( - "Configuration reloaded: {} destinations, {} replication events discarded", - currentConfig.getDestinations(FilterType.ALL).size(), - discarded); + @VisibleForTesting + public void forceReload() { + reload(true); + } + + private void reload(boolean force) { + if (force || isAutoReload()) { + ReplicationQueue queue = replicationQueue.get(); + + long lastModified = getLastModified(currentConfig); + try { + if (force + || (!shuttingDown + && lastModified > currentConfigTs + && lastModified > lastFailedConfigTs + && queue.isRunning() + && !queue.isReplaying())) { + queue.stop(); + currentConfig = loadConfig(); + currentConfigTs = lastModified; + lastFailedConfigTs = 0; + logger.atInfo().log( + "Configuration reloaded: %d destinations", + currentConfig.getDestinations(FilterType.ALL).size()); } + } catch (Exception e) { + logger.atSevere().withCause(e).log( + "Cannot reload replication configuration: keeping existing settings"); + lastFailedConfigTs = lastModified; + return; + } finally { + queue.start(); } - } catch (Exception e) { - log.error("Cannot reload replication configuration: keeping existing settings", e); - return; } } @@ -102,12 +133,32 @@ } @Override - public synchronized int shutdown() { + public Path getEventsDirectory() { + return currentConfig.getEventsDirectory(); + } + + /* shutdown() cannot be set as a synchronized method because + * it may need to wait for pending events to complete; + * e.g. when enabling the drain of replication events before + * shutdown. + * + * As a rule of thumb for synchronized methods, because they + * implicitly define a critical section and associated lock, + * they should never hold waiting for another resource, otherwise + * the risk of deadlock is very high. + * + * See more background about deadlocks, what they are and how to + * prevent them at: https://en.wikipedia.org/wiki/Deadlock + */ + @Override + public int shutdown() { + this.shuttingDown = true; return currentConfig.shutdown(); } @Override public synchronized void startup(WorkQueue workQueue) { + shuttingDown = false; currentConfig.startup(workQueue); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java index f8737b6..29a7ee6 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadSecureCredentialsFactoryDecorator.java
@@ -16,18 +16,16 @@ import static com.google.gerrit.common.FileUtil.lastModified; +import com.google.common.flogger.FluentLogger; import com.google.gerrit.server.config.SitePaths; import com.google.inject.Inject; import java.io.IOException; import java.nio.file.Files; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jgit.errors.ConfigInvalidException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class AutoReloadSecureCredentialsFactoryDecorator implements CredentialsFactory { - private static final Logger log = - LoggerFactory.getLogger(AutoReloadSecureCredentialsFactoryDecorator.class); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final AtomicReference<SecureCredentialsFactory> secureCredentialsFactory; private volatile long secureCredentialsFactoryLoadTs; @@ -58,13 +56,12 @@ secureCredentialsFactory.compareAndSet( secureCredentialsFactory.get(), new SecureCredentialsFactory(site)); secureCredentialsFactoryLoadTs = getSecureConfigLastEditTs(); - log.info("secure.config reloaded as it was updated on the file system"); + logger.atInfo().log("secure.config reloaded as it was updated on the file system"); } } catch (Exception e) { - log.error( + logger.atSevere().withCause(e).log( "Unexpected error while trying to reload " - + "secure.config: keeping existing credentials", - e); + + "secure.config: keeping existing credentials"); } return secureCredentialsFactory.get().create(remoteName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 0a06093..36960a1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -24,9 +24,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Lists; -import com.google.gerrit.common.EventDispatcher; import com.google.gerrit.common.data.GroupReference; -import com.google.gerrit.extensions.client.ProjectState; import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; @@ -42,6 +40,7 @@ import com.google.gerrit.server.account.GroupIncludeCache; import com.google.gerrit.server.account.ListGroupMembership; import com.google.gerrit.server.config.RequestScopedReviewDbProvider; +import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.PerThreadRequestScope; import com.google.gerrit.server.git.WorkQueue; @@ -50,15 +49,18 @@ import com.google.gerrit.server.permissions.ProjectPermission; import com.google.gerrit.server.permissions.RefPermission; import com.google.gerrit.server.project.NoSuchProjectException; -import com.google.gerrit.server.project.PerRequestProjectControlCache; -import com.google.gerrit.server.project.ProjectControl; +import com.google.gerrit.server.project.ProjectCache; +import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.util.RequestContext; +import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Provider; import com.google.inject.Provides; +import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.servlet.RequestScoped; import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -83,18 +85,25 @@ public class Destination { private static final Logger repLog = ReplicationQueue.repLog; + + public interface Factory { + Destination create(DestinationConfiguration config); + } + private final ReplicationStateListener stateLog; private final Object stateLock = new Object(); private final Map<URIish, PushOne> pending = new HashMap<>(); private final Map<URIish, PushOne> inFlight = new HashMap<>(); private final PushOne.Factory opFactory; - private final ProjectControl.Factory projectControlFactory; private final GitRepositoryManager gitManager; private final PermissionBackend permissionBackend; + private final Provider<CurrentUser> userProvider; + private final ProjectCache projectCache; private volatile ScheduledExecutorService pool; private final PerThreadRequestScope.Scoper threadScoper; private final DestinationConfiguration config; private final DynamicItem<EventDispatcher> eventDispatcher; + private final Provider<ReplicationTasksStorage> replicationTasksStorage; protected enum RetryReason { TRANSPORT_ERROR, @@ -112,23 +121,28 @@ } } + @Inject protected Destination( Injector injector, - DestinationConfiguration cfg, - RemoteSiteUser.Factory replicationUserFactory, PluginUser pluginUser, GitRepositoryManager gitRepositoryManager, PermissionBackend permissionBackend, + Provider<CurrentUser> userProvider, + ProjectCache projectCache, GroupBackend groupBackend, - ReplicationStateListener stateLog, + ReplicationStateListeners stateLog, GroupIncludeCache groupIncludeCache, - DynamicItem<EventDispatcher> eventDispatcher) { - config = cfg; + DynamicItem<EventDispatcher> eventDispatcher, + Provider<ReplicationTasksStorage> rts, + @Assisted DestinationConfiguration cfg) { this.eventDispatcher = eventDispatcher; gitManager = gitRepositoryManager; this.permissionBackend = permissionBackend; + this.userProvider = userProvider; + this.projectCache = projectCache; this.stateLog = stateLog; - + this.replicationTasksStorage = rts; + config = cfg; CurrentUser remoteUser; if (!cfg.getAuthGroupNames().isEmpty()) { ImmutableSet.Builder<AccountGroup.UUID> builder = ImmutableSet.builder(); @@ -141,7 +155,7 @@ repLog.warn("Group \"{}\" not recognized, removing from authGroup", name); } } - remoteUser = replicationUserFactory.create(new ListGroupMembership(builder.build())); + remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build())); } else { remoteUser = pluginUser; } @@ -153,7 +167,6 @@ protected void configure() { bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST); bind(PerThreadRequestScope.Propagator.class); - bind(PerRequestProjectControlCache.class).in(RequestScoped.class); bind(Destination.class).toInstance(Destination.this); bind(RemoteConfig.class).toInstance(config.getRemoteConfig()); @@ -185,7 +198,6 @@ } }); - projectControlFactory = child.getInstance(ProjectControl.Factory.class); opFactory = child.getInstance(PushOne.Factory.class); threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class); } @@ -245,15 +257,21 @@ } } - private boolean shouldReplicate(ProjectControl ctl) throws PermissionBackendException { - if (!config.replicateHiddenProjects() && ctl.getProject().getState() == ProjectState.HIDDEN) { + private boolean shouldReplicate(ProjectState state, CurrentUser user) + throws PermissionBackendException { + if (!config.replicateHiddenProjects() + && state.getProject().getState() + == com.google.gerrit.extensions.client.ProjectState.HIDDEN) { return false; } + + // Hidden projects(permitsRead = false) should only be accessible by the project owners. + // READ_CONFIG is checked here because it's only allowed to project owners(ACCESS may also + // be allowed for other users). + ProjectPermission permissionToCheck = + state.statePermitsRead() ? ProjectPermission.ACCESS : ProjectPermission.READ_CONFIG; try { - permissionBackend - .user(ctl.getUser()) - .project(ctl.getProject().getNameKey()) - .check(ProjectPermission.ACCESS); + permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck); return true; } catch (AuthException e) { return false; @@ -268,8 +286,19 @@ new Callable<Boolean>() { @Override public Boolean call() throws NoSuchProjectException, PermissionBackendException { - ProjectControl projectControl = controlFor(project); - if (!shouldReplicate(projectControl)) { + ProjectState projectState; + try { + projectState = projectCache.checkedGet(project); + } catch (IOException e) { + return false; + } + if (projectState == null) { + throw new NoSuchProjectException(project); + } + if (!projectState.statePermitsRead()) { + return false; + } + if (!shouldReplicate(projectState, userProvider.get())) { return false; } if (PushOne.ALL_REFS.equals(ref)) { @@ -277,7 +306,7 @@ } try { permissionBackend - .user(projectControl.getUser()) + .user(userProvider.get()) .project(project) .ref(ref) .check(RefPermission.READ); @@ -304,7 +333,16 @@ new Callable<Boolean>() { @Override public Boolean call() throws NoSuchProjectException, PermissionBackendException { - return shouldReplicate(controlFor(project)); + ProjectState projectState; + try { + projectState = projectCache.checkedGet(project); + } catch (IOException e) { + return false; + } + if (projectState == null) { + throw new NoSuchProjectException(project); + } + return shouldReplicate(projectState, userProvider.get()); } }) .call(); @@ -354,21 +392,21 @@ } synchronized (stateLock) { - PushOne e = getPendingPush(uri); - if (e == null) { - e = opFactory.create(project, uri); - addRef(e, ref); - e.addState(ref, state); + PushOne task = getPendingPush(uri); + if (task == null) { + task = opFactory.create(project, uri); + addRef(task, ref); + task.addState(ref, state); @SuppressWarnings("unused") ScheduledFuture<?> ignored = - pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS); - pending.put(uri, e); - } else if (!e.getRefs().contains(ref)) { - addRef(e, ref); - e.addState(ref, state); + pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS); + pending.put(uri, task); + } else if (!task.getRefs().contains(ref)) { + addRef(task, ref); + task.addState(ref, state); } state.increasePushTaskCount(project.get(), ref); - repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay()); + repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay()); } } @@ -492,10 +530,6 @@ } } - ProjectControl controlFor(Project.NameKey project) throws NoSuchProjectException { - return projectControlFactory.controlFor(project); - } - RunwayStatus requestRunway(PushOne op) { synchronized (stateLock) { if (op.wasCanceled()) { @@ -511,12 +545,31 @@ return RunwayStatus.allowed(); } - void notifyFinished(PushOne op) { + void notifyFinished(PushOne task) { synchronized (stateLock) { - inFlight.remove(op.getURI()); + inFlight.remove(task.getURI()); + if (!task.wasCanceled()) { + for (String ref : task.getRefs()) { + if (!refHasPendingPush(task.getURI(), ref)) { + replicationTasksStorage + .get() + .delete( + new ReplicateRefUpdate( + task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName())); + } + } + } } } + private boolean refHasPendingPush(URIish opUri, String ref) { + return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref); + } + + private boolean pushContainsRef(PushOne op, String ref) { + return op != null && op.getRefs().contains(ref); + } + boolean wouldPushProject(Project.NameKey project) { if (!shouldReplicate(project)) { return false; @@ -649,6 +702,14 @@ return config.getMaxRetries(); } + public int getDrainQueueAttempts() { + return config.getDrainQueueAttempts(); + } + + public int getReplicationDelaySeconds() { + return config.getDelay() * 1000; + } + private static boolean matches(URIish uri, String urlMatch) { if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) { return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java index b2d0de2..f688cfc 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -22,10 +22,12 @@ public class DestinationConfiguration { static final int DEFAULT_REPLICATION_DELAY = 15; static final int DEFAULT_RESCHEDULE_DELAY = 3; + static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0; private final int delay; private final int rescheduleDelay; private final int retryDelay; + private final int drainQueueAttempts; private final int lockErrorMaxRetries; private final ImmutableList<String> adminUrls; private final int poolThreads; @@ -50,6 +52,8 @@ projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects")); adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl")); retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1)); + drainQueueAttempts = + Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS)); poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1)); authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup")); lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0); @@ -77,6 +81,10 @@ return retryDelay; } + public int getDrainQueueAttempts() { + return drainQueueAttempts; + } + public int getPoolThreads() { return poolThreads; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java deleted file mode 100644 index 83eab86..0000000 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationFactory.java +++ /dev/null
@@ -1,75 +0,0 @@ -// Copyright (C) 2016 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.replication; - -import com.google.gerrit.common.EventDispatcher; -import com.google.gerrit.extensions.registration.DynamicItem; -import com.google.gerrit.server.PluginUser; -import com.google.gerrit.server.account.GroupBackend; -import com.google.gerrit.server.account.GroupIncludeCache; -import com.google.gerrit.server.git.GitRepositoryManager; -import com.google.gerrit.server.permissions.PermissionBackend; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; - -@Singleton -public class DestinationFactory { - private final Injector injector; - private final RemoteSiteUser.Factory replicationUserFactory; - private final PluginUser pluginUser; - private final GitRepositoryManager gitRepositoryManager; - private final PermissionBackend permissionBackend; - private final GroupBackend groupBackend; - private final ReplicationStateListener stateLog; - private final GroupIncludeCache groupIncludeCache; - private final DynamicItem<EventDispatcher> eventDispatcher; - - @Inject - public DestinationFactory( - Injector injector, - RemoteSiteUser.Factory replicationUserFactory, - PluginUser pluginUser, - GitRepositoryManager gitRepositoryManager, - PermissionBackend permissionBackend, - GroupBackend groupBackend, - ReplicationStateListener stateLog, - GroupIncludeCache groupIncludeCache, - DynamicItem<EventDispatcher> eventDispatcher) { - this.injector = injector; - this.replicationUserFactory = replicationUserFactory; - this.pluginUser = pluginUser; - this.gitRepositoryManager = gitRepositoryManager; - this.permissionBackend = permissionBackend; - this.groupBackend = groupBackend; - this.stateLog = stateLog; - this.groupIncludeCache = groupIncludeCache; - this.eventDispatcher = eventDispatcher; - } - - Destination create(DestinationConfiguration config) { - return new Destination( - injector, - config, - replicationUserFactory, - pluginUser, - gitRepositoryManager, - permissionBackend, - groupBackend, - stateLog, - groupIncludeCache, - eventDispatcher); - } -}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java index b46a0d9..46c8892 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -14,33 +14,34 @@ package com.googlesource.gerrit.plugins.replication; +import com.google.common.flogger.FluentLogger; import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.ssh.SshAddressesModule; -import com.google.inject.Inject; import java.io.IOException; import java.io.OutputStream; import java.net.URISyntaxException; import java.util.HashSet; import java.util.Set; import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class GerritSshApi { +public class GerritSshApi implements AdminApi { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + static int SSH_COMMAND_FAILED = -1; - private static final Logger log = LoggerFactory.getLogger(GerritSshApi.class); private static String GERRIT_ADMIN_PROTOCOL_PREFIX = "gerrit+"; - private final SshHelper sshHelper; + protected final SshHelper sshHelper; + protected final URIish uri; private final Set<URIish> withoutDeleteProjectPlugin = new HashSet<>(); - @Inject - protected GerritSshApi(SshHelper sshHelper) { + protected GerritSshApi(SshHelper sshHelper, URIish uri) { this.sshHelper = sshHelper; + this.uri = uri; } - protected boolean createProject(URIish uri, Project.NameKey projectName, String head) { + @Override + public boolean createProject(Project.NameKey projectName, String head) { OutputStream errStream = sshHelper.newErrorBufferStream(); String cmd = "gerrit create-project --branch " + head + " " + projectName.get(); try { @@ -52,7 +53,8 @@ return true; } - protected boolean deleteProject(URIish uri, Project.NameKey projectName) { + @Override + public boolean deleteProject(Project.NameKey projectName) { if (!withoutDeleteProjectPlugin.contains(uri)) { OutputStream errStream = sshHelper.newErrorBufferStream(); String cmd = "deleteproject delete --yes-really-delete --force " + projectName.get(); @@ -64,30 +66,23 @@ return false; } if (exitCode == 1) { - log.info( - "DeleteProject plugin is not installed on {}; will not try to forward this operation to that host"); + logger.atInfo().log( + "DeleteProject plugin is not installed on %s;" + + " will not try to forward this operation to that host"); withoutDeleteProjectPlugin.add(uri); - return true; } } return true; } - protected boolean updateHead(URIish uri, Project.NameKey projectName, String newHead) { + @Override + public boolean updateHead(Project.NameKey projectName, String newHead) { OutputStream errStream = sshHelper.newErrorBufferStream(); String cmd = "gerrit set-head " + projectName.get() + " --new-head " + newHead; try { execute(uri, cmd, errStream); } catch (IOException e) { - log.error( - "Error updating HEAD of remote repository at {} to {}:\n" - + " Exception: {}\n Command: {}\n Output: {}", - uri, - newHead, - e, - cmd, - errStream, - e); + logError("updating HEAD of", uri, errStream, cmd, e); return false; } return true; @@ -114,19 +109,14 @@ URIish sshUri = toSshUri(uri); return sshHelper.executeRemoteSsh(sshUri, cmd, errStream); } catch (URISyntaxException e) { - log.error("Cannot convert {} to SSH uri", uri, e); + logger.atSevere().withCause(e).log("Cannot convert %s to SSH uri", uri); } return SSH_COMMAND_FAILED; } public void logError(String msg, URIish uri, OutputStream errStream, String cmd, IOException e) { - log.error( - "Error {} remote repository at {}:\n Exception: {}\n Command: {}\n Output: {}", - msg, - uri, - e, - cmd, - errStream, - e); + logger.atSevere().withCause(e).log( + "Error %s remote repository at %s:\n Exception: %s\n Command: %s\n Output: %s", + msg, uri, e, cmd, errStream); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java new file mode 100644 index 0000000..aa6e16c --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -0,0 +1,97 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + +import com.google.gerrit.reviewdb.client.Project; +import java.io.File; +import java.io.IOException; +import org.eclipse.jgit.internal.storage.file.FileRepository; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.RefUpdate; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.URIish; + +public class LocalFS implements AdminApi { + + private final URIish uri; + + public LocalFS(URIish uri) { + this.uri = uri; + } + + @Override + public boolean createProject(Project.NameKey project, String head) { + try (Repository repo = new FileRepository(uri.getPath())) { + repo.create(true /* bare */); + + if (head != null && head.startsWith(Constants.R_REFS)) { + RefUpdate u = repo.updateRef(Constants.HEAD); + u.disableRefLog(); + u.link(head); + } + repLog.info("Created local repository: {}", uri); + } catch (IOException e) { + repLog.error("Error creating local repository {}", uri.getPath(), e); + return false; + } + return true; + } + + @Override + public boolean deleteProject(Project.NameKey project) { + try { + recursivelyDelete(new File(uri.getPath())); + repLog.info("Deleted local repository: {}", uri); + } catch (IOException e) { + repLog.error("Error deleting local repository {}:\n", uri.getPath(), e); + return false; + } + return true; + } + + @Override + public boolean updateHead(Project.NameKey project, String newHead) { + try (Repository repo = new FileRepository(uri.getPath())) { + if (newHead != null) { + RefUpdate u = repo.updateRef(Constants.HEAD); + u.link(newHead); + } + } catch (IOException e) { + repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e); + return false; + } + return true; + } + + private static void recursivelyDelete(File dir) throws IOException { + File[] contents = dir.listFiles(); + if (contents != null) { + for (File d : contents) { + if (d.isDirectory()) { + recursivelyDelete(d); + } else { + if (!d.delete()) { + throw new IOException("Failed to delete: " + d.getAbsolutePath()); + } + } + } + } + if (!dir.delete()) { + throw new IOException("Failed to delete: " + dir.getAbsolutePath()); + } + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java index 227804d..8b0aa3d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -15,10 +15,10 @@ package com.googlesource.gerrit.plugins.replication; import com.google.common.util.concurrent.Atomics; -import com.google.gerrit.common.EventDispatcher; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.systemstatus.ServerInformation; +import com.google.gerrit.server.events.EventDispatcher; import com.google.inject.Inject; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; import java.util.concurrent.Future; @@ -29,7 +29,6 @@ private final AtomicReference<Future<?>> pushAllFuture; private final ServerInformation srvInfo; private final PushAll.Factory pushAll; - private final ReplicationQueue queue; private final ReplicationConfig config; private final DynamicItem<EventDispatcher> eventDispatcher; @@ -37,12 +36,10 @@ protected OnStartStop( ServerInformation srvInfo, PushAll.Factory pushAll, - ReplicationQueue queue, ReplicationConfig config, DynamicItem<EventDispatcher> eventDispatcher) { this.srvInfo = srvInfo; this.pushAll = pushAll; - this.queue = queue; this.config = config; this.eventDispatcher = eventDispatcher; this.pushAllFuture = Atomics.newReference(); @@ -50,8 +47,6 @@ @Override public void start() { - queue.start(); - if (srvInfo.getState() == ServerInformation.State.STARTUP && config.isReplicateAllOnPluginStart()) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get())); @@ -68,6 +63,5 @@ if (f != null) { f.cancel(true); } - queue.stop(); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java index db067e2..833b02b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -43,7 +43,7 @@ WorkQueue wq, ProjectCache projectCache, ReplicationQueue rq, - ReplicationStateListener stateLog, + ReplicationStateListeners stateLog, @Assisted @Nullable String urlMatch, @Assisted ReplicationFilter filter, @Assisted ReplicationState state,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 5f0c066..5794f6e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -22,20 +22,23 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Sets; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; +import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; +import com.google.gerrit.extensions.restapi.ResourceConflictException; import com.google.gerrit.metrics.Timer1; import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.reviewdb.client.RefNames; import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.PerThreadRequestScope; import com.google.gerrit.server.git.ProjectRunnable; -import com.google.gerrit.server.git.VisibleRefFilter; import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning; +import com.google.gerrit.server.ioutil.HexFormat; import com.google.gerrit.server.permissions.PermissionBackend; +import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions; import com.google.gerrit.server.permissions.PermissionBackendException; import com.google.gerrit.server.permissions.ProjectPermission; -import com.google.gerrit.server.project.NoSuchProjectException; -import com.google.gerrit.server.project.ProjectControl; +import com.google.gerrit.server.project.ProjectCache; +import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.util.IdGenerator; import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; @@ -92,7 +95,6 @@ private final RemoteConfig config; private final CredentialsProvider credentialsProvider; private final PerThreadRequestScope.Scoper threadScoper; - private final VisibleRefFilter.Factory refFilterFactory; private final ReplicationQueue replicationQueue; private final Project.NameKey projectName; @@ -110,7 +112,10 @@ private final int id; private final long createdAt; private final ReplicationMetrics metrics; + private final ProjectCache projectCache; private final AtomicBoolean canceledWhileRunning; + private final TransportFactory transportFactory; + private DynamicItem<ReplicationPushFilter> replicationPushFilter; @Inject PushOne( @@ -118,20 +123,20 @@ PermissionBackend permissionBackend, Destination p, RemoteConfig c, - VisibleRefFilter.Factory rff, CredentialsFactory cpFactory, PerThreadRequestScope.Scoper ts, ReplicationQueue rq, IdGenerator ig, - ReplicationStateListener sl, + ReplicationStateListeners sl, ReplicationMetrics m, + ProjectCache pc, + TransportFactory tf, @Assisted Project.NameKey d, @Assisted URIish u) { gitManager = grm; this.permissionBackend = permissionBackend; pool = p; config = c; - refFilterFactory = rff; credentialsProvider = cpFactory.create(c.getName()); threadScoper = ts; replicationQueue = rq; @@ -143,13 +148,20 @@ stateLog = sl; createdAt = System.nanoTime(); metrics = m; + projectCache = pc; canceledWhileRunning = new AtomicBoolean(false); maxRetries = p.getMaxRetries(); + transportFactory = tf; + } + + @Inject(optional = true) + public void setReplicationPushFilter(DynamicItem<ReplicationPushFilter> replicationPushFilter) { + this.replicationPushFilter = replicationPushFilter; } @Override public void cancel() { - repLog.info("Replication [{}] to {} was canceled", IdGenerator.format(id), getURI()); + repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI()); canceledByReplication(); pool.pushWasCanceled(this); } @@ -158,7 +170,7 @@ public void setCanceledWhileRunning() { repLog.info( "Replication [{}] to {} was canceled while being executed", - IdGenerator.format(id), + HexFormat.fromInt(id), getURI()); canceledWhileRunning.set(true); } @@ -180,7 +192,7 @@ @Override public String toString() { - String print = "[" + IdGenerator.format(id) + "] push " + uri; + String print = "[" + HexFormat.fromInt(id) + "] push " + uri; if (retryCount > 0) { print = "(retry " + retryCount + ") " + print; @@ -301,7 +313,7 @@ // we start replication (instead a new instance, with the same URI, is // created and scheduled for a future point in time.) // - MDC.put(ID_MDC_KEY, IdGenerator.format(id)); + MDC.put(ID_MDC_KEY, HexFormat.fromInt(id)); RunwayStatus status = pool.requestRunway(this); if (!status.isAllowed()) { if (status.isCanceled()) { @@ -310,7 +322,7 @@ repLog.info( "Rescheduling replication to {} to avoid collision with the in-flight push [{}].", uri, - IdGenerator.format(status.getInFlightPushId())); + HexFormat.fromInt(status.getInFlightPushId())); pool.reschedule(this, Destination.RetryReason.COLLISION); } return; @@ -407,15 +419,12 @@ if (pool.isCreateMissingRepos()) { try { Ref head = git.exactRef(Constants.HEAD); - if (replicationQueue.createProject(projectName, head != null ? getName(head) : null)) { + if (replicationQueue.createProject( + config.getName(), projectName, head != null ? getName(head) : null)) { repLog.warn("Missing repository created; retry replication to {}", uri); pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING); } else { - repLog.warn( - "Missing repository could not be created when replicating {}. " - + "You can only create missing repositories locally, over SSH or when " - + "using adminUrl in replication.config. See documentation for more information.", - uri); + repLog.warn("Missing repository could not be created when replicating {}", uri); } } catch (IOException ioe) { stateLog.error( @@ -438,7 +447,7 @@ private void runImpl() throws IOException, PermissionBackendException { PushResult res; - try (Transport tn = Transport.open(git, uri)) { + try (Transport tn = transportFactory.open(git, uri)) { res = pushVia(tn); } updateStates(res.getRemoteUpdates()); @@ -465,19 +474,19 @@ private List<RemoteRefUpdate> generateUpdates(Transport tn) throws IOException, PermissionBackendException { - ProjectControl pc; - try { - pc = pool.controlFor(projectName); - } catch (NoSuchProjectException e) { + ProjectState projectState = projectCache.checkedGet(projectName); + if (projectState == null) { return Collections.emptyList(); } Map<String, Ref> local = git.getAllRefs(); boolean filter; + PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName); try { - permissionBackend.user(pc.getUser()).project(projectName).check(ProjectPermission.READ); + projectState.checkStatePermitsRead(); + forProject.check(ProjectPermission.READ); filter = false; - } catch (AuthException e) { + } catch (AuthException | ResourceConflictException e) { filter = true; } if (filter) { @@ -494,10 +503,15 @@ } local = n; } - local = refFilterFactory.create(pc.getProjectState(), git).filter(local, true); + local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build()); } - return pushAllRefs ? doPushAll(tn, local) : doPushDelta(local); + List<RemoteRefUpdate> remoteUpdatesList = + pushAllRefs ? doPushAll(tn, local) : doPushDelta(local); + + return replicationPushFilter == null || replicationPushFilter.get() == null + ? remoteUpdatesList + : replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList); } private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java index c71a792..ae0662d 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushResultProcessing.java
@@ -14,7 +14,8 @@ package com.googlesource.gerrit.plugins.replication; -import com.google.gerrit.common.EventDispatcher; +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.events.RefEvent; import com.google.gerrit.server.permissions.PermissionBackendException; import com.google.gwtorm.server.OrmException; @@ -23,8 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class PushResultProcessing { @@ -159,7 +158,7 @@ } public static class GitUpdateProcessing extends PushResultProcessing { - private static final Logger log = LoggerFactory.getLogger(GitUpdateProcessing.class); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final EventDispatcher dispatcher; @@ -189,7 +188,7 @@ try { dispatcher.postEvent(event); } catch (OrmException | PermissionBackendException e) { - log.error("Cannot post event", e); + logger.atSevere().withCause(e).log("Cannot post event"); } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java index 91fce7f..c3556af 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSiteUser.java
@@ -16,18 +16,11 @@ import com.google.gerrit.server.CurrentUser; import com.google.gerrit.server.account.GroupMembership; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; public class RemoteSiteUser extends CurrentUser { - public interface Factory { - RemoteSiteUser create(@Assisted GroupMembership authGroups); - } - private final GroupMembership effectiveGroups; - @Inject - RemoteSiteUser(@Assisted GroupMembership authGroups) { + public RemoteSiteUser(GroupMembership authGroups) { effectiveGroups = authGroups; } @@ -35,4 +28,10 @@ public GroupMembership getEffectiveGroups() { return effectiveGroups; } + + @Override + public Object getCacheKey() { + // Never cache a remote user + return new Object(); + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java new file mode 100644 index 0000000..ee9d4c0 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -0,0 +1,110 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + +import com.google.gerrit.reviewdb.client.Project; +import java.io.IOException; +import java.io.OutputStream; +import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.util.QuotedString; + +public class RemoteSsh implements AdminApi { + + private final SshHelper sshHelper; + private URIish uri; + + RemoteSsh(SshHelper sshHelper, URIish uri) { + this.sshHelper = sshHelper; + this.uri = uri; + } + + @Override + public boolean createProject(Project.NameKey project, String head) { + String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); + String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare"; + if (head != null) { + cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head); + } + OutputStream errStream = sshHelper.newErrorBufferStream(); + try { + sshHelper.executeRemoteSsh(uri, cmd, errStream); + repLog.info("Created remote repository: {}", uri); + } catch (IOException e) { + repLog.error( + "Error creating remote repository at {}:\n" + + " Exception: {}\n" + + " Command: {}\n" + + " Output: {}", + uri, + e, + cmd, + errStream, + e); + return false; + } + return true; + } + + @Override + public boolean deleteProject(Project.NameKey project) { + String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); + String cmd = "rm -rf " + quotedPath; + OutputStream errStream = sshHelper.newErrorBufferStream(); + try { + sshHelper.executeRemoteSsh(uri, cmd, errStream); + repLog.info("Deleted remote repository: {}", uri); + } catch (IOException e) { + repLog.error( + "Error deleting remote repository at {}:\n" + + " Exception: {}\n" + + " Command: {}\n" + + " Output: {}", + uri, + e, + cmd, + errStream, + e); + return false; + } + return true; + } + + @Override + public boolean updateHead(Project.NameKey project, String newHead) { + String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); + String cmd = + "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead); + OutputStream errStream = sshHelper.newErrorBufferStream(); + try { + sshHelper.executeRemoteSsh(uri, cmd, errStream); + } catch (IOException e) { + repLog.error( + "Error updating HEAD of remote repository at {} to {}:\n" + + " Exception: {}\n" + + " Command: {}\n" + + " Output: {}", + uri, + newHead, + e, + cmd, + errStream, + e); + return false; + } + return true; + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index 869a49b..c9531e3 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.server.git.WorkQueue; +import java.nio.file.Path; import java.util.List; public interface ReplicationConfig { @@ -32,6 +33,8 @@ boolean isEmpty(); + Path getEventsDirectory(); + int shutdown(); void startup(WorkQueue workQueue);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java new file mode 100644 index 0000000..b92a54a --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationExtensionPointModule.java
@@ -0,0 +1,32 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.inject.AbstractModule; + +/** + * Gerrit libModule for applying a ref-filter for outgoing replications. + * + * <p>It should be used only when an actual filter is defined, otherwise the default replication + * plugin behaviour will be pushing all refs without any filtering. + */ +public class ReplicationExtensionPointModule extends AbstractModule { + + @Override + protected void configure() { + DynamicItem.itemOf(binder(), ReplicationPushFilter.class); + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index db9f35d..9004968 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,8 +17,11 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.extensions.annotations.PluginData; import com.google.gerrit.server.config.ConfigUtil; import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.git.WorkQueue; @@ -38,28 +41,31 @@ import org.eclipse.jgit.transport.RemoteConfig; import org.eclipse.jgit.transport.URIish; import org.eclipse.jgit.util.FS; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Singleton public class ReplicationFileBasedConfig implements ReplicationConfig { - private static final Logger log = LoggerFactory.getLogger(ReplicationFileBasedConfig.class); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes private List<Destination> destinations; + private final SitePaths site; private Path cfgPath; private boolean replicateAllOnPluginStart; private boolean defaultForceUpdate; private int sshCommandTimeout; private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS; private final FileBasedConfig config; + private final Path pluginDataDir; @Inject - public ReplicationFileBasedConfig(SitePaths site, DestinationFactory destinationFactory) + public ReplicationFileBasedConfig( + SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir) throws ConfigInvalidException, IOException { + this.site = site; this.cfgPath = site.etc_dir.resolve("replication.config"); this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED); this.destinations = allDestinations(destinationFactory); + this.pluginDataDir = pluginDataDir; } /* @@ -86,14 +92,14 @@ return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList()); } - private List<Destination> allDestinations(DestinationFactory destinationFactory) + private List<Destination> allDestinations(Destination.Factory destinationFactory) throws ConfigInvalidException, IOException { if (!config.getFile().exists()) { - log.warn("Config file {} does not exist; not replicating", config.getFile()); + logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile()); return Collections.emptyList(); } if (config.getFile().length() == 0) { - log.info("Config file {} is empty; not replicating", config.getFile()); + logger.atInfo().log("Config file %s is empty; not replicating", config.getFile()); return Collections.emptyList(); } @@ -107,7 +113,7 @@ String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e); } - replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", true); + replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false); defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); @@ -199,6 +205,15 @@ return destinations.isEmpty(); } + @Override + public Path getEventsDirectory() { + String eventsDirectory = config.getString("replication", null, "eventsDirectory"); + if (!Strings.isNullOrEmpty(eventsDirectory)) { + return site.resolve(eventsDirectory); + } + return pluginDataDir; + } + Path getCfgPath() { return cfgPath; } @@ -207,11 +222,54 @@ public int shutdown() { int discarded = 0; for (Destination cfg : destinations) { - discarded += cfg.shutdown(); + try { + drainReplicationEvents(cfg); + } catch (EventQueueNotEmptyException e) { + logger.atWarning().log("Event queue not empty: %s", e.getMessage()); + } finally { + discarded += cfg.shutdown(); + } } return discarded; } + void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException { + int drainQueueAttempts = destination.getDrainQueueAttempts(); + if (drainQueueAttempts == 0) { + return; + } + int pending = destination.getQueueInfo().pending.size(); + int inFlight = destination.getQueueInfo().inFlight.size(); + + while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) { + try { + logger.atInfo().log( + "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d", + inFlight, pending); + Thread.sleep(destination.getReplicationDelaySeconds()); + } catch (InterruptedException ie) { + logger.atWarning().withCause(ie).log( + "Wait for replication events to drain has been interrupted"); + } + pending = destination.getQueueInfo().pending.size(); + inFlight = destination.getQueueInfo().inFlight.size(); + drainQueueAttempts--; + } + + if (pending > 0 || inFlight > 0) { + throw new EventQueueNotEmptyException( + String.format("Pending: %d - InFlight: %d", pending, inFlight)); + } + } + + public static class EventQueueNotEmptyException extends Exception { + private static final long serialVersionUID = 1L; + + public EventQueueNotEmptyException(String errorMessage) { + super(errorMessage); + } + } + FileBasedConfig getConfig() { return config; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java index 7b3486b..05bbb03 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.common.data.AccessSection; -import com.google.gerrit.reviewdb.client.Project.NameKey; +import com.google.gerrit.reviewdb.client.Project; import java.util.Collections; import java.util.List; @@ -46,7 +46,7 @@ projectPatterns = patterns; } - public boolean matches(NameKey name) { + public boolean matches(Project.NameKey name) { if (projectPatterns.isEmpty()) { return true; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java index f30e13d..5fdb375 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -21,8 +21,8 @@ import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; import com.google.gerrit.extensions.events.HeadUpdatedListener; import com.google.gerrit.extensions.events.LifecycleListener; -import com.google.gerrit.extensions.events.NewProjectCreatedListener; import com.google.gerrit.extensions.events.ProjectDeletedListener; +import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.server.events.EventTypes; import com.google.inject.AbstractModule; @@ -34,11 +34,13 @@ class ReplicationModule extends AbstractModule { @Override protected void configure() { - bind(DestinationFactory.class).in(Scopes.SINGLETON); + install(new FactoryModuleBuilder().build(Destination.Factory.class)); bind(ReplicationQueue.class).in(Scopes.SINGLETON); + bind(LifecycleListener.class) + .annotatedWith(UniqueAnnotations.create()) + .to(ReplicationQueue.class); DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class); - DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ReplicationQueue.class); DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ReplicationQueue.class); DynamicSet.bind(binder(), HeadUpdatedListener.class).to(ReplicationQueue.class); @@ -55,14 +57,20 @@ .to(StartReplicationCapability.class); install(new FactoryModuleBuilder().build(PushAll.Factory.class)); - install(new FactoryModuleBuilder().build(RemoteSiteUser.Factory.class)); bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class); - bind(ReplicationStateListener.class).to(ReplicationStateLogger.class); + DynamicSet.setOf(binder(), ReplicationStateListener.class); + DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class); EventTypes.register(RefReplicatedEvent.TYPE, RefReplicatedEvent.class); EventTypes.register(RefReplicationDoneEvent.TYPE, RefReplicationDoneEvent.class); EventTypes.register(ReplicationScheduledEvent.TYPE, ReplicationScheduledEvent.class); bind(SshSessionFactory.class).toProvider(ReplicationSshSessionFactoryProvider.class); + + DynamicItem.itemOf(binder(), AdminApiFactory.class); + DynamicItem.bind(binder(), AdminApiFactory.class) + .to(AdminApiFactory.DefaultAdminApiFactory.class); + + bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java new file mode 100644 index 0000000..eb6ba90 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationPushFilter.java
@@ -0,0 +1,30 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.gerrit.extensions.annotations.ExtensionPoint; +import java.util.List; +import org.eclipse.jgit.transport.RemoteRefUpdate; + +/** + * Filter that is invoked before list of remote ref updates is pushed to remote instance. + * + * <p>It can be used to filter out unwanted updates. + */ +@ExtensionPoint +public interface ReplicationPushFilter { + + public List<RemoteRefUpdate> filter(String projectName, List<RemoteRefUpdate> remoteUpdatesList); +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 4c7bdfc..d73ab7b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,34 +14,33 @@ package com.googlesource.gerrit.plugins.replication; +import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit; +import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Strings; import com.google.common.collect.Queues; -import com.google.gerrit.common.EventDispatcher; +import com.google.gerrit.common.Nullable; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; import com.google.gerrit.extensions.events.HeadUpdatedListener; import com.google.gerrit.extensions.events.LifecycleListener; -import com.google.gerrit.extensions.events.NewProjectCreatedListener; import com.google.gerrit.extensions.events.ProjectDeletedListener; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; import java.net.URISyntaxException; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Queue; import java.util.Set; -import org.eclipse.jgit.internal.storage.file.FileRepository; -import org.eclipse.jgit.lib.Constants; -import org.eclipse.jgit.lib.RefUpdate; -import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.transport.URIish; -import org.eclipse.jgit.util.QuotedString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +48,6 @@ public class ReplicationQueue implements LifecycleListener, GitReferenceUpdatedListener, - NewProjectCreatedListener, ProjectDeletedListener, HeadUpdatedListener { static final String REPLICATION_LOG_NAME = "replication_log"; @@ -70,35 +68,39 @@ } private final WorkQueue workQueue; - private final SshHelper sshHelper; private final DynamicItem<EventDispatcher> dispatcher; private final ReplicationConfig config; - private final GerritSshApi gerritAdmin; + private final DynamicItem<AdminApiFactory> adminApiFactory; + private final ReplicationTasksStorage replicationTasksStorage; private volatile boolean running; - private final Queue<GitReferenceUpdatedListener.Event> beforeStartupEventsQueue; + private volatile boolean replaying; + private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; @Inject ReplicationQueue( WorkQueue wq, - SshHelper sh, - GerritSshApi ga, + DynamicItem<AdminApiFactory> aaf, ReplicationConfig rc, DynamicItem<EventDispatcher> dis, - ReplicationStateListener sl) { + ReplicationStateListeners sl, + ReplicationTasksStorage rts) { workQueue = wq; - sshHelper = sh; dispatcher = dis; config = rc; stateLog = sl; - gerritAdmin = ga; + adminApiFactory = aaf; + replicationTasksStorage = rts; beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); } @Override public void start() { - config.startup(workQueue); - running = true; - fireBeforeStartupEvents(); + if (!running) { + config.startup(workQueue); + running = true; + firePendingEvents(); + fireBeforeStartupEvents(); + } } @Override @@ -110,11 +112,20 @@ } } + public boolean isRunning() { + return running; + } + + public boolean isReplaying() { + return replaying; + } + void scheduleFullSync(Project.NameKey project, String urlMatch, ReplicationState state) { scheduleFullSync(project, urlMatch, state, false); } - void scheduleFullSync( + @VisibleForTesting + public void scheduleFullSync( Project.NameKey project, String urlMatch, ReplicationState state, boolean now) { if (!running) { stateLog.warn("Replication plugin did not finish startup before event", state); @@ -125,6 +136,9 @@ if (cfg.wouldPushProject(project)) { for (URIish uri : cfg.getURIs(project, urlMatch)) { cfg.schedule(project, PushOne.ALL_REFS, uri, state, now); + replicationTasksStorage.persist( + new ReplicateRefUpdate( + project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName())); } } } @@ -132,38 +146,53 @@ @Override public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { + onGitReferenceUpdated(event.getProjectName(), event.getRefName()); + } + + private void onGitReferenceUpdated(String projectName, String refName) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", state); - beforeStartupEventsQueue.add(event); + beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName)); return; } - Project.NameKey project = new Project.NameKey(event.getProjectName()); + Project.NameKey project = new Project.NameKey(projectName); for (Destination cfg : config.getDestinations(FilterType.ALL)) { - if (cfg.wouldPushProject(project) && cfg.wouldPushRef(event.getRefName())) { + if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) { for (URIish uri : cfg.getURIs(project, null)) { - cfg.schedule(project, event.getRefName(), uri, state); + replicationTasksStorage.persist( + new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName())); + cfg.schedule(project, refName, uri, state); } } } state.markAllPushTasksScheduled(); } - @Override - public void onNewProjectCreated(NewProjectCreatedListener.Event event) { - Project.NameKey projectName = new Project.NameKey(event.getProjectName()); - for (URIish uri : getURIs(projectName, FilterType.PROJECT_CREATION)) { - createProject(uri, projectName, event.getHeadName()); + private void firePendingEvents() { + try { + Set<String> eventsReplayed = new HashSet<>(); + replaying = true; + for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) { + String eventKey = String.format("%s:%s", t.project, t.ref); + if (!eventsReplayed.contains(eventKey)) { + repLog.info("Firing pending task {}", eventKey); + onGitReferenceUpdated(t.project, t.ref); + eventsReplayed.add(eventKey); + } + } + } finally { + replaying = false; } } @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey projectName = new Project.NameKey(event.getProjectName()); - for (URIish uri : getURIs(projectName, FilterType.PROJECT_DELETION)) { + for (URIish uri : getURIs(null, projectName, FilterType.PROJECT_DELETION)) { deleteProject(uri, projectName); } } @@ -171,24 +200,25 @@ @Override public void onHeadUpdated(HeadUpdatedListener.Event event) { Project.NameKey project = new Project.NameKey(event.getProjectName()); - for (URIish uri : getURIs(project, FilterType.ALL)) { + for (URIish uri : getURIs(null, project, FilterType.ALL)) { updateHead(uri, project, event.getNewHeadName()); } } private void fireBeforeStartupEvents() { Set<String> eventsReplayed = new HashSet<>(); - for (GitReferenceUpdatedListener.Event event : beforeStartupEventsQueue) { + for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) { String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName()); if (!eventsReplayed.contains(eventKey)) { repLog.info("Firing pending task {}", event); - onGitReferenceUpdated(event); + onGitReferenceUpdated(event.getProjectName(), event.getRefName()); eventsReplayed.add(eventKey); } } } - private Set<URIish> getURIs(Project.NameKey projectName, FilterType filterType) { + private Set<URIish> getURIs( + @Nullable String remoteName, Project.NameKey projectName, FilterType filterType) { if (config.getDestinations(filterType).isEmpty()) { return Collections.emptySet(); } @@ -203,6 +233,10 @@ continue; } + if (remoteName != null && !config.getRemoteConfigName().equals(remoteName)) { + continue; + } + boolean adminURLUsed = false; for (String url : config.getAdminUrls()) { @@ -245,201 +279,75 @@ return uris; } - public boolean createProject(Project.NameKey project, String head) { + public boolean createProject(String remoteName, Project.NameKey project, String head) { boolean success = true; - for (URIish uri : getURIs(project, FilterType.PROJECT_CREATION)) { + for (URIish uri : getURIs(remoteName, project, FilterType.PROJECT_CREATION)) { success &= createProject(uri, project, head); } return success; } private boolean createProject(URIish replicateURI, Project.NameKey projectName, String head) { - if (isGerrit(replicateURI)) { - gerritAdmin.createProject(replicateURI, projectName, head); - } else if (!replicateURI.isRemote()) { - createLocally(replicateURI, head); - } else if (isSSH(replicateURI)) { - createRemoteSsh(replicateURI, head); - } else { - repLog.warn( - "Cannot create new project on remote site {}." - + " Only local paths and SSH URLs are supported" - + " for remote repository creation", - replicateURI); - return false; - } - return true; - } - - private static void createLocally(URIish uri, String head) { - try (Repository repo = new FileRepository(uri.getPath())) { - repo.create(true /* bare */); - - if (head != null && head.startsWith(Constants.R_REFS)) { - RefUpdate u = repo.updateRef(Constants.HEAD); - u.disableRefLog(); - u.link(head); - } - repLog.info("Created local repository: {}", uri); - } catch (IOException e) { - repLog.error("Error creating local repository {}:\n", uri.getPath(), e); - } - } - - private void createRemoteSsh(URIish uri, String head) { - String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); - String cmd = "mkdir -p " + quotedPath + " && cd " + quotedPath + " && git init --bare"; - if (head != null) { - cmd = cmd + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(head); - } - OutputStream errStream = sshHelper.newErrorBufferStream(); - try { - sshHelper.executeRemoteSsh(uri, cmd, errStream); - repLog.info("Created remote repository: {}", uri); - } catch (IOException e) { - repLog.error( - "Error creating remote repository at {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - e, - cmd, - errStream, - e); - } - } - - private void deleteProject(URIish replicateURI, Project.NameKey projectName) { - if (isGerrit(replicateURI)) { - gerritAdmin.deleteProject(replicateURI, projectName); - repLog.info("Deleted remote repository: " + replicateURI); - } else if (!replicateURI.isRemote()) { - deleteLocally(replicateURI); - } else if (isSSH(replicateURI)) { - deleteRemoteSsh(replicateURI); - } else { - repLog.warn( - "Cannot delete project on remote site {}. " - + "Only local paths and SSH URLs are supported" - + " for remote repository deletion", - replicateURI); - } - } - - private static void deleteLocally(URIish uri) { - try { - recursivelyDelete(new File(uri.getPath())); - repLog.info("Deleted local repository: {}", uri); - } catch (IOException e) { - repLog.error("Error deleting local repository {}:\n", uri.getPath(), e); - } - } - - private static void recursivelyDelete(File dir) throws IOException { - File[] contents = dir.listFiles(); - if (contents != null) { - for (File d : contents) { - if (d.isDirectory()) { - recursivelyDelete(d); - } else { - if (!d.delete()) { - throw new IOException("Failed to delete: " + d.getAbsolutePath()); - } - } - } - } - if (!dir.delete()) { - throw new IOException("Failed to delete: " + dir.getAbsolutePath()); - } - } - - private void deleteRemoteSsh(URIish uri) { - String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); - String cmd = "rm -rf " + quotedPath; - OutputStream errStream = sshHelper.newErrorBufferStream(); - try { - sshHelper.executeRemoteSsh(uri, cmd, errStream); - repLog.info("Deleted remote repository: {}", uri); - } catch (IOException e) { - repLog.error( - "Error deleting remote repository at {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - e, - cmd, - errStream, - e); - } - } - - private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) { - if (isGerrit(replicateURI)) { - gerritAdmin.updateHead(replicateURI, projectName, newHead); - } else if (!replicateURI.isRemote()) { - updateHeadLocally(replicateURI, newHead); - } else if (isSSH(replicateURI)) { - updateHeadRemoteSsh(replicateURI, newHead); - } else { - repLog.warn( - "Cannot update HEAD of project on remote site {}." - + " Only local paths and SSH URLs are supported" - + " for remote HEAD update.", - replicateURI); - } - } - - private void updateHeadRemoteSsh(URIish uri, String newHead) { - String quotedPath = QuotedString.BOURNE.quote(uri.getPath()); - String cmd = - "cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead); - OutputStream errStream = sshHelper.newErrorBufferStream(); - try { - sshHelper.executeRemoteSsh(uri, cmd, errStream); - } catch (IOException e) { - repLog.error( - "Error updating HEAD of remote repository at {} to {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - newHead, - e, - cmd, - errStream, - e); - } - } - - private static void updateHeadLocally(URIish uri, String newHead) { - try (Repository repo = new FileRepository(uri.getPath())) { - if (newHead != null) { - RefUpdate u = repo.updateRef(Constants.HEAD); - u.link(newHead); - } - } catch (IOException e) { - repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e); - } - } - - private static boolean isSSH(URIish uri) { - String scheme = uri.getScheme(); - if (!uri.isRemote()) { - return false; - } - if (scheme != null && scheme.toLowerCase().contains("ssh")) { + Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI); + if (adminApi.isPresent() && adminApi.get().createProject(projectName, head)) { return true; } - if (scheme == null && uri.getHost() != null && uri.getPath() != null) { - return true; - } + + warnCannotPerform("create new project", replicateURI); return false; } - private static boolean isGerrit(URIish uri) { - String scheme = uri.getScheme(); - return scheme != null && scheme.toLowerCase().equals("gerrit+ssh"); + private void deleteProject(URIish replicateURI, Project.NameKey projectName) { + Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI); + if (adminApi.isPresent()) { + adminApi.get().deleteProject(projectName); + return; + } + + warnCannotPerform("delete project", replicateURI); + } + + private void updateHead(URIish replicateURI, Project.NameKey projectName, String newHead) { + Optional<AdminApi> adminApi = adminApiFactory.get().create(replicateURI); + if (adminApi.isPresent()) { + adminApi.get().updateHead(projectName, newHead); + return; + } + + warnCannotPerform("update HEAD of project", replicateURI); + } + + private void warnCannotPerform(String op, URIish uri) { + repLog.warn("Cannot {} on remote site {}.", op, uri); + } + + private static class ReferenceUpdatedEvent { + private String projectName; + private String refName; + + public ReferenceUpdatedEvent(String projectName, String refName) { + this.projectName = projectName; + this.refName = refName; + } + + public String getProjectName() { + return projectName; + } + + public String getRefName() { + return refName; + } + + @Override + public int hashCode() { + return Objects.hashCode(projectName, refName); + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof ReferenceUpdatedEvent) + && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName) + && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName); + } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java index cd7a3cf..aa965fe 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -15,7 +15,6 @@ package com.googlesource.gerrit.plugins.replication; import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.reviewdb.client.Project.NameKey; import com.google.gerrit.server.events.RefEvent; public class ReplicationScheduledEvent extends RefEvent { @@ -38,7 +37,7 @@ } @Override - public NameKey getProjectNameKey() { + public Project.NameKey getProjectNameKey() { return new Project.NameKey(project); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java index 86557e2..df8f3f4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -23,6 +23,7 @@ import org.eclipse.jgit.transport.URIish; public class ReplicationState { + private boolean allScheduled; private final PushResultProcessing pushResultProcessing; @@ -49,7 +50,7 @@ private int totalPushTasksCount; private int finishedPushTasksCount; - public ReplicationState(PushResultProcessing processing) { + ReplicationState(PushResultProcessing processing) { pushResultProcessing = processing; statusByProjectRef = HashBasedTable.create(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java new file mode 100644 index 0000000..d7bf227 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateListeners.java
@@ -0,0 +1,48 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import com.google.gerrit.extensions.registration.DynamicSet; +import com.google.inject.Inject; + +public class ReplicationStateListeners implements ReplicationStateListener { + private final DynamicSet<ReplicationStateListener> listeners; + + @Inject + ReplicationStateListeners(DynamicSet<ReplicationStateListener> stateListeners) { + this.listeners = stateListeners; + } + + @Override + public void warn(String msg, ReplicationState... states) { + for (ReplicationStateListener listener : listeners) { + listener.warn(msg, states); + } + } + + @Override + public void error(String msg, ReplicationState... states) { + for (ReplicationStateListener listener : listeners) { + listener.error(msg, states); + } + } + + @Override + public void error(String msg, Throwable t, ReplicationState... states) { + for (ReplicationStateListener listener : listeners) { + listener.error(msg, t, states); + } + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java new file mode 100644 index 0000000..64397f9 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -0,0 +1,137 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.FluentLogger; +import com.google.common.hash.Hashing; +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.ProvisionException; +import com.google.inject.Singleton; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.transport.URIish; + +@Singleton +public class ReplicationTasksStorage { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private boolean disableDeleteForTesting; + + public static class ReplicateRefUpdate { + public final String project; + public final String ref; + public final String uri; + public final String remote; + + public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) { + this.project = project; + this.ref = ref; + this.uri = uri.toASCIIString(); + this.remote = remote; + } + + @Override + public String toString() { + return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote; + } + } + + private static Gson GSON = new Gson(); + + private final Path refUpdates; + + @Inject + ReplicationTasksStorage(ReplicationConfig config) { + refUpdates = config.getEventsDirectory().resolve("ref-updates"); + } + + public String persist(ReplicateRefUpdate r) { + String json = GSON.toJson(r) + "\n"; + String eventKey = sha1(json).name(); + Path file = refUpdates().resolve(eventKey); + + if (Files.exists(file)) { + return eventKey; + } + + try { + logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri); + Files.write(file, json.getBytes(UTF_8)); + } catch (IOException e) { + logger.atWarning().withCause(e).log("Couldn't persist event %s", json); + } + return eventKey; + } + + @VisibleForTesting + public void disableDeleteForTesting(boolean deleteDisabled) { + this.disableDeleteForTesting = deleteDisabled; + } + + public void delete(ReplicateRefUpdate r) { + String taskJson = GSON.toJson(r) + "\n"; + String taskKey = sha1(taskJson).name(); + Path file = refUpdates().resolve(taskKey); + + if (disableDeleteForTesting) { + logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri); + return; + } + + try { + logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri); + Files.delete(file); + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey); + } + } + + public List<ReplicateRefUpdate> list() { + ArrayList<ReplicateRefUpdate> result = new ArrayList<>(); + try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) { + for (Path e : events) { + if (Files.isRegularFile(e)) { + String json = new String(Files.readAllBytes(e), UTF_8); + result.add(GSON.fromJson(json, ReplicateRefUpdate.class)); + } + } + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error when firing pending events"); + } + return result; + } + + @SuppressWarnings("deprecation") + private ObjectId sha1(String s) { + return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes()); + } + + private Path refUpdates() { + try { + return Files.createDirectories(refUpdates); + } catch (IOException e) { + throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e); + } + } +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java index 2b0c16b..c518091 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -17,6 +17,7 @@ import com.google.gerrit.server.config.SitePaths; import com.google.inject.Inject; import java.io.IOException; +import java.util.Objects; import org.eclipse.jgit.errors.ConfigInvalidException; import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.storage.file.FileBasedConfig; @@ -49,8 +50,8 @@ @Override public SecureCredentialsProvider create(String remoteName) { - String user = config.getString("remote", remoteName, "username"); - String pass = config.getString("remote", remoteName, "password"); + String user = Objects.toString(config.getString("remote", remoteName, "username"), ""); + String pass = Objects.toString(config.getString("remote", remoteName, "password"), ""); return new SecureCredentialsProvider(user, pass); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java index c4294a9..62b4036 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsProvider.java
@@ -20,7 +20,7 @@ import org.eclipse.jgit.transport.URIish; /** Looks up a remote's password in secure.config. */ -class SecureCredentialsProvider extends CredentialsProvider { +public class SecureCredentialsProvider extends CredentialsProvider { private final String cfgUser; private final String cfgPass;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java new file mode 100644 index 0000000..ba14299 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactory.java
@@ -0,0 +1,26 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import org.eclipse.jgit.errors.NotSupportedException; +import org.eclipse.jgit.errors.TransportException; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.Transport; +import org.eclipse.jgit.transport.URIish; + +public interface TransportFactory { + + Transport open(Repository local, URIish uri) throws NotSupportedException, TransportException; +}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java new file mode 100644 index 0000000..58c1214 --- /dev/null +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/TransportFactoryImpl.java
@@ -0,0 +1,30 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import org.eclipse.jgit.errors.NotSupportedException; +import org.eclipse.jgit.errors.TransportException; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.Transport; +import org.eclipse.jgit.transport.URIish; + +public class TransportFactoryImpl implements TransportFactory { + + @Override + public Transport open(Repository git, URIish uri) + throws NotSupportedException, TransportException { + return Transport.open(git, uri); + } +}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index e70094c..c9356b0 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -13,6 +13,11 @@ sudo su -c 'ssh mirror1.us.some.org echo' gerrit2 ``` +*NOTE:* make sure the local user's ssh keys format is PEM, here how to generate them: +``` + ssh-keygen -m PEM -t rsa -C "your_email@example.com" +``` + <a name="example_file"> Next, create `$site_path/etc/replication.config` as a Git-style config file, for example to replicate in parallel to four different hosts:</a> @@ -65,7 +70,7 @@ gerrit.replicateOnStartup : If true, replicates to all remotes on startup to ensure they - are in-sync with this server. By default, true. + are in-sync with this server. By default, false. gerrit.autoReload : If true, automatically reloads replication destinations and settings @@ -124,6 +129,17 @@ By default, pushes are retried indefinitely. +replication.eventsDirectory +: Directory where replication events are persisted + + When scheduling a replication, the replication event is persisted + under this directory. When the replication is done, the event is deleted. + If plugin is stopped before all scheduled replications are done, the + persisted events will not be deleted. When the plugin is started again, + it will trigger all replications found under this directory. + + When not set, defaults to the plugin's data directory. + remote.NAME.url : Address of the remote server to push to. Multiple URLs may be specified within a single remote block, listing different @@ -272,6 +288,20 @@ By default, use replication.maxRetries. +remote.NAME.drainQueueAttempts +: Maximum number of attempts to drain the replication event queue before + stopping the plugin. + + When stopping the plugin, the shutdown will be delayed trying to drain + the event queue. + + The maximum delay is "drainQueueAttempts" * "replicationDelay" seconds. + + When not set or set to 0, the queue is not drained and the pending + replication events are cancelled. + + By default, do not drain replication events. + remote.NAME.threads : Number of worker threads to dedicate to pushing to the repositories described by this remote. Each thread can push @@ -298,7 +328,7 @@ If the remote site was not available at the moment when a new project was created, it will be created if during the replication of a ref it is found to be missing. - + If false, repositories are never created automatically on this remote. @@ -398,7 +428,7 @@ File `~/.ssh/config` -------------------- -If present, Gerrit reads and caches `~/.ssh/config` at startup, and +Gerrit reads and caches the `~/.ssh/config` at startup, and supports most SSH configuration options. For example: ``` @@ -412,6 +442,15 @@ PreferredAuthentications publickey ``` +*IdentityFile* and *PreferredAuthentications* must be defined for all the hosts. +Here an example of the minimum `~/.ssh/config` needed: + +``` + Host * + IdentityFile ~/.ssh/id_rsa + PreferredAuthentications publickey +``` + Supported options: * Host
diff --git a/src/main/resources/Documentation/extension-point.md b/src/main/resources/Documentation/extension-point.md new file mode 100644 index 0000000..076aded --- /dev/null +++ b/src/main/resources/Documentation/extension-point.md
@@ -0,0 +1,53 @@ +@PLUGIN@ extension points +============== + +The replication plugin exposes an extension point to allow influencing its behaviour from another plugin or a script. +Extension points can be defined from the replication plugin only when it is loaded as [libModule](/config-gerrit.html#gerrit.installModule) and +implemented by another plugin by declaring a `provided` dependency from the replication plugin. + +### Install extension libModule + +The replication plugin's extension points are defined in the `c.g.g.p.r.ReplicationExtensionPointModule` +that needs to be configured as libModule. + +Create a symbolic link from `$GERRIT_SITE/plugins/replication.jar` into `$GERRIT_SITE/lib` +and then add the replication extension module to the `gerrit.config`. + +Example: + +``` +[gerrit] + installModule = com.googlesource.gerrit.plugins.replication.ReplicationExtensionPointModule +``` + +> **NOTE**: Use and configuration of the replication plugin as library module requires a Gerrit server restart and does not support hot plugin install or upgrade. + + +### Extension points + +* `com.googlesource.gerrit.plugins.replication.ReplicationPushFilter` + + Filter out the ref updates pushed to a remote instance. + Only one filter at a time is supported. Filter implementation needs to bind a `DynamicItem`. + + Default: no filtering + + Example: + + ``` + DynamicItem.bind(binder(), ReplicationPushFilter.class).to(ReplicationPushFilterImpl.class); + ``` + +* `com.googlesource.gerrit.plugins.replication.AdminApiFactory` + + Create an instance of `AdminApi` for a given remote URL. The default implementation + provides API instances for local FS, remote SSH, and remote Gerrit. + + Only one factory at a time is supported. The implementation needs to be bound as a + `DynamicItem`. + + Example: + + ``` + DynamicItem.bind(binder(), AdminApiFactory.class).to(AdminApiFactoryImpl.class); + ```
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java index 337bd1d..5fa7b98 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -22,8 +22,8 @@ import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; -import com.google.gerrit.common.EventDispatcher; import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.permissions.PermissionBackendException; import com.google.gwtorm.client.KeyUtil; import com.google.gwtorm.server.OrmException;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java new file mode 100644 index 0000000..af065b3 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -0,0 +1,375 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.eclipse.jgit.lib.Ref.Storage.NEW; + +import com.google.common.collect.ImmutableList; +import com.google.gerrit.extensions.registration.DynamicItem; +import com.google.gerrit.metrics.Timer1; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.git.GitRepositoryManager; +import com.google.gerrit.server.git.PerThreadRequestScope; +import com.google.gerrit.server.permissions.PermissionBackend; +import com.google.gerrit.server.permissions.PermissionBackend.ForProject; +import com.google.gerrit.server.permissions.PermissionBackend.WithUser; +import com.google.gerrit.server.project.ProjectCache; +import com.google.gerrit.server.project.ProjectState; +import com.google.gerrit.server.util.IdGenerator; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import junit.framework.AssertionFailedError; +import org.easymock.IAnswer; +import org.eclipse.jgit.errors.NotSupportedException; +import org.eclipse.jgit.errors.TransportException; +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectIdRef; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.RefUpdate; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.transport.FetchConnection; +import org.eclipse.jgit.transport.PushConnection; +import org.eclipse.jgit.transport.PushResult; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.RemoteRefUpdate; +import org.eclipse.jgit.transport.Transport; +import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.util.FS; +import org.junit.Before; +import org.junit.Test; + +public class PushOneTest { + private static final int TEST_PUSH_TIMEOUT_SECS = 10; + + private GitRepositoryManager gitRepositoryManagerMock; + private Repository repositoryMock; + private PermissionBackend permissionBackendMock; + private PermissionBackend.WithUser withUserMock; + private PermissionBackend.ForProject forProjectMock; + + private Destination destinationMock; + private RemoteConfig remoteConfigMock; + private RefSpec refSpecMock; + private CredentialsFactory credentialsFactory; + private PerThreadRequestScope.Scoper threadRequestScoperMock; + private ReplicationQueue replicationQueueMock; + private IdGenerator idGeneratorMock; + private ReplicationStateListeners replicationStateListenersMock; + private ReplicationMetrics replicationMetricsMock; + private Timer1.Context timerContextMock; + private ProjectCache projectCacheMock; + private TransportFactory transportFactoryMock; + private Transport transportMock; + private FetchConnection fetchConnection; + private PushConnection pushConnection; + private ProjectState projectStateMock; + private RefUpdate refUpdateMock; + + private Project.NameKey projectNameKey; + private URIish urish; + private Map<String, Ref> localRefs; + + private Map<String, Ref> remoteRefs; + private CountDownLatch isCallFinished; + private Ref newLocalRef; + + @Before + public void setup() throws Exception { + projectNameKey = new Project.NameKey("fooProject"); + urish = new URIish("http://foo.com/fooProject.git"); + + newLocalRef = + new ObjectIdRef.Unpeeled( + NEW, "foo", ObjectId.fromString("0000000000000000000000000000000000000001")); + + localRefs = new HashMap<>(); + localRefs.put("fooProject", newLocalRef); + + Ref remoteRef = new ObjectIdRef.Unpeeled(NEW, "foo", ObjectId.zeroId()); + remoteRefs = new HashMap<>(); + remoteRefs.put("fooProject", remoteRef); + + isCallFinished = new CountDownLatch(1); + + setupMocks(); + } + + private void setupMocks() throws Exception { + FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED); + config.setString("remote", "Replication", "push", "foo"); + + setupRefUpdateMock(); + setupRepositoryMock(config); + setupGitRepoManagerMock(); + + projectStateMock = createNiceMock(ProjectState.class); + forProjectMock = createNiceMock(ForProject.class); + setupWithUserMock(); + setupPermissionBackedMock(); + + setupDestinationMock(); + + setupRefSpecMock(); + setupRemoteConfigMock(); + + credentialsFactory = createNiceMock(CredentialsFactory.class); + + setupFetchConnectionMock(); + setupPushConnectionMock(); + setupRequestScopeMock(); + replicationQueueMock = createNiceMock(ReplicationQueue.class); + idGeneratorMock = createNiceMock(IdGenerator.class); + replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class); + + timerContextMock = createNiceMock(Timer1.Context.class); + setupReplicationMetricsMock(); + + setupTransportMock(); + + setupProjectCacheMock(); + + replay( + gitRepositoryManagerMock, + refUpdateMock, + repositoryMock, + permissionBackendMock, + destinationMock, + remoteConfigMock, + credentialsFactory, + threadRequestScoperMock, + replicationQueueMock, + idGeneratorMock, + replicationStateListenersMock, + replicationMetricsMock, + projectCacheMock, + timerContextMock, + transportFactoryMock, + projectStateMock, + withUserMock, + forProjectMock, + fetchConnection, + pushConnection, + refSpecMock); + } + + @Test + public void shouldPushAllRefsWhenNoFilters() throws InterruptedException, IOException { + shouldPushAllRefsWithDynamicItemFilter(DynamicItem.itemOf(ReplicationPushFilter.class, null)); + } + + @Test + public void shouldPushAllRefsWhenNoFiltersSetup() throws InterruptedException, IOException { + shouldPushAllRefsWithDynamicItemFilter(null); + } + + private void shouldPushAllRefsWithDynamicItemFilter( + DynamicItem<ReplicationPushFilter> replicationPushFilter) + throws IOException, NotSupportedException, TransportException, InterruptedException { + List<RemoteRefUpdate> expectedUpdates = + Arrays.asList( + new RemoteRefUpdate( + repositoryMock, + newLocalRef.getName(), + newLocalRef.getObjectId(), + "fooProject", + false, + "fooProject", + null)); + + PushResult pushResult = new PushResult(); + + expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates))) + .andReturn(pushResult) + .once(); + replay(transportMock); + + PushOne pushOne = createPushOne(replicationPushFilter); + + pushOne.addRef(PushOne.ALL_REFS); + pushOne.run(); + + isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS); + + verify(transportMock); + } + + @Test + public void shouldBlockReplicationUsingPushFilter() throws InterruptedException, IOException { + DynamicItem<ReplicationPushFilter> replicationPushFilter = + DynamicItem.itemOf( + ReplicationPushFilter.class, + new ReplicationPushFilter() { + + @Override + public List<RemoteRefUpdate> filter( + String projectName, List<RemoteRefUpdate> remoteUpdatesList) { + return Collections.emptyList(); + } + }); + + // easymock way to check if method was never called + expect(transportMock.push(anyObject(), anyObject())) + .andThrow(new AssertionFailedError()) + .anyTimes(); + replay(transportMock); + + PushOne pushOne = createPushOne(replicationPushFilter); + + pushOne.addRef(PushOne.ALL_REFS); + pushOne.run(); + + isCallFinished.await(10, TimeUnit.SECONDS); + + verify(transportMock); + } + + private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) { + PushOne push = + new PushOne( + gitRepositoryManagerMock, + permissionBackendMock, + destinationMock, + remoteConfigMock, + credentialsFactory, + threadRequestScoperMock, + replicationQueueMock, + idGeneratorMock, + replicationStateListenersMock, + replicationMetricsMock, + projectCacheMock, + transportFactoryMock, + projectNameKey, + urish); + + push.setReplicationPushFilter(replicationPushFilter); + return push; + } + + private void setupProjectCacheMock() throws IOException { + projectCacheMock = createNiceMock(ProjectCache.class); + expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock); + } + + private void setupTransportMock() throws NotSupportedException, TransportException { + transportMock = createNiceMock(Transport.class); + expect(transportMock.openFetch()).andReturn(fetchConnection); + transportFactoryMock = createNiceMock(TransportFactory.class); + expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes(); + } + + private void setupReplicationMetricsMock() { + replicationMetricsMock = createNiceMock(ReplicationMetrics.class); + expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock); + } + + private void setupRequestScopeMock() { + threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class); + expect(threadRequestScoperMock.scope(anyObject())) + .andAnswer( + new IAnswer<Callable<Object>>() { + @SuppressWarnings("unchecked") + @Override + public Callable<Object> answer() throws Throwable { + Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0]; + return new Callable<Object>() { + + @Override + public Object call() throws Exception { + Object result = originalCall.call(); + isCallFinished.countDown(); + return result; + } + }; + } + }) + .anyTimes(); + } + + private void setupPushConnectionMock() { + pushConnection = createNiceMock(PushConnection.class); + expect(pushConnection.getRefsMap()).andReturn(remoteRefs); + } + + private void setupFetchConnectionMock() { + fetchConnection = createNiceMock(FetchConnection.class); + expect(fetchConnection.getRefsMap()).andReturn(remoteRefs); + } + + private void setupRemoteConfigMock() { + remoteConfigMock = createNiceMock(RemoteConfig.class); + expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock)); + } + + private void setupRefSpecMock() { + refSpecMock = createNiceMock(RefSpec.class); + expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true); + expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock); + expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes(); + expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes(); + } + + private void setupDestinationMock() { + destinationMock = createNiceMock(Destination.class); + expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed()); + } + + private void setupPermissionBackedMock() { + permissionBackendMock = createNiceMock(PermissionBackend.class); + expect(permissionBackendMock.currentUser()).andReturn(withUserMock); + } + + private void setupWithUserMock() { + withUserMock = createNiceMock(WithUser.class); + expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock); + } + + private void setupGitRepoManagerMock() throws IOException { + gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class); + expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock); + } + + @SuppressWarnings("deprecation") + private void setupRepositoryMock(FileBasedConfig config) throws IOException { + repositoryMock = createNiceMock(Repository.class); + expect(repositoryMock.getConfig()).andReturn(config).anyTimes(); + expect(repositoryMock.getAllRefs()).andReturn(localRefs); + expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock); + } + + private void setupRefUpdateMock() { + refUpdateMock = createNiceMock(RefUpdate.class); + expect(refUpdateMock.getOldObjectId()) + .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001")) + .anyTimes(); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java new file mode 100644 index 0000000..111a792 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
@@ -0,0 +1,64 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import java.util.Collection; +import java.util.Objects; +import org.easymock.EasyMock; +import org.easymock.IArgumentMatcher; +import org.eclipse.jgit.transport.RemoteRefUpdate; + +public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher { + Collection<RemoteRefUpdate> expectedRemoteRefs; + + public static Collection<RemoteRefUpdate> eqRemoteRef( + Collection<RemoteRefUpdate> expectedRemoteRefs) { + EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs)); + return null; + } + + public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) { + this.expectedRemoteRefs = expectedRemoteRefs; + } + + @Override + public boolean matches(Object argument) { + if (!(argument instanceof Collection)) return false; + + @SuppressWarnings("unchecked") + Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument; + + if (expectedRemoteRefs.size() != refs.size()) return false; + return refs.stream() + .allMatch( + ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef))); + } + + @Override + public void appendTo(StringBuffer buffer) { + buffer.append("expected:" + expectedRemoteRefs.toString()); + } + + private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) { + return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName()) + && Objects.equals(ref.getStatus(), expectedRef.getStatus()) + && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId()) + && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId()) + && Objects.equals(ref.isFastForward(), expectedRef.isFastForward()) + && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef()) + && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate()) + && Objects.equals(ref.getMessage(), expectedRef.getMessage()); + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java new file mode 100644 index 0000000..61a53f3 --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -0,0 +1,377 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.gerrit.testing.GerritJUnit.assertThrows; +import static java.util.stream.Collectors.toList; + +import com.google.common.flogger.FluentLogger; +import com.google.gerrit.acceptance.LightweightPluginDaemonTest; +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.extensions.annotations.PluginData; +import com.google.gerrit.extensions.api.projects.BranchInput; +import com.google.gerrit.extensions.common.ProjectInfo; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.config.SitePaths; +import com.google.inject.Inject; +import com.google.inject.Key; +import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.transport.RemoteRefUpdate; +import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.util.FS; +import org.junit.Test; + +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationIT extends LightweightPluginDaemonTest { + private static final Optional<String> ALL_PROJECTS = Optional.empty(); + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final int TEST_REPLICATION_DELAY = 1; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2); + + @Inject private SitePaths sitePaths; + private Path pluginDataDir; + private Path gitPath; + private Path storagePath; + private FileBasedConfig config; + private ReplicationTasksStorage tasksStorage; + + @Override + public void setUpTestPlugin() throws Exception { + gitPath = sitePaths.site_path.resolve("git"); + + config = + new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); + setReplicationDestination( + "remote1", + "suffix1", + Optional.of("not-used-project")); // Simulates a full replication.config initialization + config.save(); + + super.setUpTestPlugin(); + + pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class)); + storagePath = pluginDataDir.resolve("ref-updates"); + tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); + cleanupReplicationTasks(); + tasksStorage.disableDeleteForTesting(true); + } + + @Test + public void shouldReplicateNewProject() throws Exception { + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Project.NameKey sourceProject = createTestProject("foo"); + + assertThat(listReplicationTasks("refs/meta/config")).hasSize(1); + + waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git"))); + + ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get(); + assertThat(replicaProject).isNotNull(); + } + + @Test + public void shouldReplicateNewChangeRef() throws Exception { + Project.NameKey targetProject = createTestProject("projectreplica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().getRefName(); + + assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1); + + try (Repository repo = repoManager.openRepository(targetProject)) { + waitUntil(() -> checkedGetRef(repo, sourceRef) != null); + + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + + @Test + public void shouldReplicateNewBranch() throws Exception { + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Project.NameKey targetProject = createTestProject("projectreplica"); + String newBranch = "refs/heads/mybranch"; + String master = "refs/heads/master"; + BranchInput input = new BranchInput(); + input.revision = master; + gApi.projects().name(project.get()).branch(newBranch).create(input); + + assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2); + + try (Repository repo = repoManager.openRepository(targetProject); + Repository sourceRepo = repoManager.openRepository(project)) { + waitUntil(() -> checkedGetRef(repo, newBranch) != null); + + Ref masterRef = getRef(sourceRepo, master); + Ref targetBranchRef = getRef(repo, newBranch); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId()); + } + } + + @Test + public void shouldReplicateNewBranchToTwoRemotes() throws Exception { + Project.NameKey targetProject1 = createTestProject("projectreplica1"); + Project.NameKey targetProject2 = createTestProject("projectreplica2"); + + setReplicationDestination("foo1", "replica1", ALL_PROJECTS); + setReplicationDestination("foo2", "replica2", ALL_PROJECTS); + reloadConfig(); + + Result pushResult = createChange(); + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().getRefName(); + + assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2); + + try (Repository repo1 = repoManager.openRepository(targetProject1); + Repository repo2 = repoManager.openRepository(targetProject2)) { + waitUntil( + () -> + (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null)); + + Ref targetBranchRef1 = getRef(repo1, sourceRef); + assertThat(targetBranchRef1).isNotNull(); + assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId()); + + Ref targetBranchRef2 = getRef(repo2, sourceRef); + assertThat(targetBranchRef2).isNotNull(); + assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + + @Test + public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception { + List<String> replicaSuffixes = Arrays.asList("replica1", "replica2"); + createTestProject("projectreplica1"); + createTestProject("projectreplica2"); + + setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS); + setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS); + config.setInt("remote", "foo1", "replicationDelay", TEST_REPLICATION_DELAY * 100); + config.setInt("remote", "foo2", "replicationDelay", TEST_REPLICATION_DELAY * 100); + reloadConfig(); + + createChange(); + + assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4); + + setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS); + setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS); + } + + @Test + public void shouldCreateOneReplicationTaskWhenSchedulingRepoFullSync() throws Exception { + PushResultProcessing pushResultProcessing = + new PushResultProcessing() { + + @Override + void onRefReplicatedToOneNode( + String project, + String ref, + URIish uri, + ReplicationState.RefPushResult status, + RemoteRefUpdate.Status refStatus) {} + + @Override + void onRefReplicatedToAllNodes(String project, String ref, int nodesCount) {} + + @Override + void onAllRefsReplicatedToAllNodes(int totalPushTasksCount) {} + }; + + createTestProject("projectreplica"); + + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + plugin + .getSysInjector() + .getInstance(ReplicationQueue.class) + .scheduleFullSync(project, null, new ReplicationState(pushResultProcessing), true); + + assertThat(listReplicationTasks(".*all.*")).hasSize(1); + } + + @Test + public void shouldReplicateHeadUpdate() throws Exception { + setReplicationDestination("foo", "replica", ALL_PROJECTS); + reloadConfig(); + + Project.NameKey targetProject = createTestProject("projectreplica"); + String newHead = "refs/heads/newhead"; + String master = "refs/heads/master"; + BranchInput input = new BranchInput(); + input.revision = master; + gApi.projects().name(project.get()).branch(newHead).create(input); + gApi.projects().name(project.get()).head(newHead); + + try (Repository repo = repoManager.openRepository(targetProject)) { + waitUntil(() -> checkedGetRef(repo, newHead) != null); + + Ref targetProjectHead = getRef(repo, Constants.HEAD); + assertThat(targetProjectHead).isNotNull(); + assertThat(targetProjectHead.getTarget().getName()).isEqualTo(newHead); + } + } + + @Test + public void shouldNotDrainTheQueueWhenReloading() throws Exception { + // Setup repo to replicate + Project.NameKey targetProject = createTestProject("projectreplica"); + String remoteName = "doNotDrainQueue"; + setReplicationDestination(remoteName, "replica", ALL_PROJECTS); + + Result pushResult = createChange(); + shutdownConfig(); + + pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().getRefName(); + + assertThrows( + InterruptedException.class, + () -> { + try (Repository repo = repoManager.openRepository(targetProject)) { + waitUntil(() -> checkedGetRef(repo, sourceRef) != null); + } + }); + } + + @Test + public void shouldDrainTheQueueWhenReloading() throws Exception { + // Setup repo to replicate + Project.NameKey targetProject = createTestProject("projectreplica"); + String remoteName = "drainQueue"; + setReplicationDestination(remoteName, "replica", ALL_PROJECTS); + + config.setInt("remote", remoteName, "drainQueueAttempts", 2); + config.save(); + reloadConfig(); + + Result pushResult = createChange(); + shutdownConfig(); + + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().getRefName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + waitUntil(() -> checkedGetRef(repo, sourceRef) != null); + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + + private Project.NameKey createTestProject(String name) throws Exception { + return createProject(name); + } + + private Ref getRef(Repository repo, String branchName) throws IOException { + return repo.getRefDatabase().exactRef(branchName); + } + + private Ref checkedGetRef(Repository repo, String branchName) { + try { + return repo.getRefDatabase().exactRef(branchName); + } catch (Exception e) { + logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo); + return null; + } + } + + private void setReplicationDestination( + String remoteName, String replicaSuffix, Optional<String> project) throws IOException { + setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project); + } + + private void setReplicationDestination( + String remoteName, List<String> replicaSuffixes, Optional<String> project) + throws IOException { + + List<String> replicaUrls = + replicaSuffixes.stream() + .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString()) + .collect(toList()); + config.setStringList("remote", remoteName, "url", replicaUrls); + config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj)); + config.save(); + } + + private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { + WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT); + } + + private void reloadConfig() { + plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload(); + } + + private void shutdownConfig() { + plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown(); + } + + private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) { + Pattern refmaskPattern = Pattern.compile(refRegex); + return tasksStorage.list().stream() + .filter(task -> refmaskPattern.matcher(task.ref).matches()) + .collect(toList()); + } + + private void cleanupReplicationTasks() throws IOException { + try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) { + for (Path path : files) { + path.toFile().delete(); + } + } + } + + private boolean projectExists(Project.NameKey name) { + try (Repository r = repoManager.openRepository(name)) { + return true; + } catch (Exception e) { + return false; + } + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java index 881a282..2a395ca 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -56,13 +56,13 @@ private FileBasedConfig config; @Override - public void setUp() throws Exception { + public void setUpTestPlugin() throws Exception { gitPath = sitePaths.site_path.resolve("git"); config = new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); setReplicationDestination("foo", "replica"); - super.setUp(); + super.setUpTestPlugin(); } @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java new file mode 100644 index 0000000..586b56c --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtil.java
@@ -0,0 +1,34 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import com.google.common.base.Stopwatch; +import java.time.Duration; +import java.util.function.Supplier; + +public class WaitUtil { + public static void waitUntil(Supplier<Boolean> waitCondition, Duration timeout) + throws InterruptedException { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (!waitCondition.get()) { + if (stopwatch.elapsed().compareTo(timeout) > 0) { + throw new InterruptedException(); + } + MILLISECONDS.sleep(50); + } + } +}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java new file mode 100644 index 0000000..0ccb0af --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/WaitUtilTest.java
@@ -0,0 +1,40 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.gerrit.testing.GerritJUnit.assertThrows; +import static com.googlesource.gerrit.plugins.replication.WaitUtil.waitUntil; + +import java.time.Duration; +import org.junit.Test; + +public class WaitUtilTest { + + @Test + public void shouldFailWhenConditionNotMetWithinTimeout() throws Exception { + assertThrows( + InterruptedException.class, + () -> waitUntil(() -> returnTrue() == false, Duration.ofSeconds(1))); + } + + @Test + public void shouldNotFailWhenConditionIsMetWithinTimeout() throws Exception { + waitUntil(() -> returnTrue() == true, Duration.ofSeconds(1)); + } + + private static boolean returnTrue() { + return true; + } +}