Merge branch 'stable-3.0'
* stable-3.0:
Clarify that starred-changes ref is not needed on slaves
Change-Id: I4248889146c31d8f956dd0b3ff7a3c4881b97773
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index acbf763..f3d2103 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
public interface AdminApi {
public boolean createProject(Project.NameKey project, String head);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 945f869..c043baf 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -14,127 +14,45 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Multimap;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.common.FileUtil;
-import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
-import com.google.inject.Provider;
import com.google.inject.Singleton;
-import java.io.IOException;
import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.eclipse.jgit.transport.URIish;
@Singleton
-public class AutoReloadConfigDecorator implements ReplicationConfig {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+public class AutoReloadConfigDecorator implements ReplicationConfig, LifecycleListener {
private static final long RELOAD_DELAY = 120;
private static final long RELOAD_INTERVAL = 60;
private volatile ReplicationFileBasedConfig currentConfig;
- private long currentConfigTs;
- private long lastFailedConfigTs;
- private final SitePaths site;
- private final Destination.Factory destinationFactory;
- private final Path pluginDataDir;
- // Use Provider<> instead of injecting the ReplicationQueue because of circular dependency with
- // ReplicationConfig
- private final Provider<ReplicationQueue> replicationQueue;
private final ScheduledExecutorService autoReloadExecutor;
private ScheduledFuture<?> autoReloadRunnable;
-
- private volatile boolean shuttingDown;
+ private final AutoReloadRunnable reloadRunner;
@Inject
public AutoReloadConfigDecorator(
- SitePaths site,
- Destination.Factory destinationFactory,
- Provider<ReplicationQueue> replicationQueue,
- @PluginData Path pluginDataDir,
@PluginName String pluginName,
- WorkQueue workQueue)
- throws ConfigInvalidException, IOException {
- this.site = site;
- this.destinationFactory = destinationFactory;
- this.pluginDataDir = pluginDataDir;
- this.currentConfig = loadConfig();
- this.currentConfigTs = getLastModified(currentConfig);
- this.replicationQueue = replicationQueue;
+ WorkQueue workQueue,
+ ReplicationFileBasedConfig replicationConfig,
+ AutoReloadRunnable reloadRunner,
+ EventBus eventBus) {
+ this.currentConfig = replicationConfig;
this.autoReloadExecutor = workQueue.createQueue(1, pluginName + "_auto-reload-config");
- }
-
- private static long getLastModified(ReplicationFileBasedConfig cfg) {
- return FileUtil.lastModified(cfg.getCfgPath());
- }
-
- private ReplicationFileBasedConfig loadConfig() throws ConfigInvalidException, IOException {
- return new ReplicationFileBasedConfig(site, destinationFactory, pluginDataDir);
- }
-
- private synchronized boolean isAutoReload() {
- return currentConfig.getConfig().getBoolean("gerrit", "autoReload", false);
- }
-
- @Override
- public synchronized List<Destination> getDestinations(FilterType filterType) {
- return currentConfig.getDestinations(filterType);
- }
-
- @Override
- public synchronized Multimap<Destination, URIish> getURIs(
- Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
- return currentConfig.getURIs(remoteName, projectName, filterType);
- }
-
- private synchronized void reloadIfNeeded() {
- reload(false);
+ this.reloadRunner = reloadRunner;
+ eventBus.register(this);
}
@VisibleForTesting
- public void forceReload() {
- reload(true);
- }
-
- private void reload(boolean force) {
- if (force || isAutoReload()) {
- ReplicationQueue queue = replicationQueue.get();
-
- long lastModified = getLastModified(currentConfig);
- try {
- if (force
- || (!shuttingDown
- && lastModified > currentConfigTs
- && lastModified > lastFailedConfigTs
- && queue.isRunning()
- && !queue.isReplaying())) {
- queue.stop();
- currentConfig = loadConfig();
- currentConfigTs = lastModified;
- lastFailedConfigTs = 0;
- logger.atInfo().log(
- "Configuration reloaded: %d destinations",
- currentConfig.getDestinations(FilterType.ALL).size());
- }
- } catch (Exception e) {
- logger.atSevere().withCause(e).log(
- "Cannot reload replication configuration: keeping existing settings");
- lastFailedConfigTs = lastModified;
- return;
- } finally {
- queue.start();
- }
- }
+ public void reload() {
+ reloadRunner.reload();
}
@Override
@@ -148,13 +66,13 @@
}
@Override
- public synchronized int getMaxRefsToLog() {
- return currentConfig.getMaxRefsToLog();
+ public int getDistributionInterval() {
+ return currentConfig.getDistributionInterval();
}
@Override
- public synchronized boolean isEmpty() {
- return currentConfig.isEmpty();
+ public synchronized int getMaxRefsToLog() {
+ return currentConfig.getMaxRefsToLog();
}
@Override
@@ -162,36 +80,32 @@
return currentConfig.getEventsDirectory();
}
- /* shutdown() cannot be set as a synchronized method because
- * it may need to wait for pending events to complete;
- * e.g. when enabling the drain of replication events before
- * shutdown.
- *
- * As a rule of thumb for synchronized methods, because they
- * implicitly define a critical section and associated lock,
- * they should never hold waiting for another resource, otherwise
- * the risk of deadlock is very high.
- *
- * See more background about deadlocks, what they are and how to
- * prevent them at: https://en.wikipedia.org/wiki/Deadlock
- */
@Override
- public int shutdown() {
- this.shuttingDown = true;
- if (autoReloadRunnable != null) {
- autoReloadRunnable.cancel(false);
- autoReloadRunnable = null;
- }
- return currentConfig.shutdown();
+ public synchronized void start() {
+ autoReloadRunnable =
+ autoReloadExecutor.scheduleAtFixedRate(
+ reloadRunner, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
}
@Override
- public synchronized void startup(WorkQueue workQueue) {
- shuttingDown = false;
- currentConfig.startup(workQueue);
- autoReloadRunnable =
- autoReloadExecutor.scheduleAtFixedRate(
- this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
+ public synchronized void stop() {
+ if (autoReloadRunnable != null) {
+ if (!autoReloadRunnable.cancel(true)) {
+ throw new IllegalStateException(
+ "Unable to cancel replication reload task: cannot guarantee orderly shutdown");
+ }
+ autoReloadRunnable = null;
+ }
+ }
+
+ @Override
+ public String getVersion() {
+ return currentConfig.getVersion();
+ }
+
+ @Subscribe
+ public void onReload(ReplicationFileBasedConfig newConfig) {
+ currentConfig = newConfig;
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
new file mode 100644
index 0000000..a1084a8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnable.java
@@ -0,0 +1,87 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import java.nio.file.Path;
+import java.util.List;
+
+public class AutoReloadRunnable implements Runnable {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final SitePaths site;
+ private final Path pluginDataDir;
+ private final EventBus eventBus;
+ private final Provider<ObservableQueue> queueObserverProvider;
+ private final ReplicationConfigValidator configValidator;
+
+ private ReplicationFileBasedConfig loadedConfig;
+ private String loadedConfigVersion;
+ private String lastFailedConfigVersion;
+
+ @Inject
+ public AutoReloadRunnable(
+ ReplicationConfigValidator configValidator,
+ ReplicationFileBasedConfig config,
+ SitePaths site,
+ @PluginData Path pluginDataDir,
+ EventBus eventBus,
+ Provider<ObservableQueue> queueObserverProvider) {
+ this.loadedConfig = config;
+ this.loadedConfigVersion = config.getVersion();
+ this.lastFailedConfigVersion = "";
+ this.site = site;
+ this.pluginDataDir = pluginDataDir;
+ this.eventBus = eventBus;
+ this.queueObserverProvider = queueObserverProvider;
+ this.configValidator = configValidator;
+ }
+
+ @Override
+ public synchronized void run() {
+ String pendingConfigVersion = loadedConfig.getVersion();
+ ObservableQueue queue = queueObserverProvider.get();
+ if (pendingConfigVersion.equals(loadedConfigVersion)
+ || pendingConfigVersion.equals(lastFailedConfigVersion)
+ || !queue.isRunning()
+ || queue.isReplaying()) {
+ return;
+ }
+
+ reload();
+ }
+
+ synchronized void reload() {
+ String pendingConfigVersion = loadedConfig.getVersion();
+ try {
+ ReplicationFileBasedConfig newConfig = new ReplicationFileBasedConfig(site, pluginDataDir);
+ final List<RemoteConfiguration> newValidDestinations =
+ configValidator.validateConfig(newConfig);
+ loadedConfig = newConfig;
+ loadedConfigVersion = newConfig.getVersion();
+ lastFailedConfigVersion = "";
+ eventBus.post(newValidDestinations);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log(
+ "Cannot reload replication configuration: keeping existing settings");
+ lastFailedConfigVersion = pendingConfigVersion;
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
index a8dede3..fa26e82 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -16,8 +16,8 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
@@ -31,7 +31,7 @@
}
private final RemoteConfig config;
- private final ReplicationConfig replicationConfig;
+ private final DestinationsCollection destinations;
private final DynamicItem<AdminApiFactory> adminApiFactory;
private final Project.NameKey project;
private final String head;
@@ -39,21 +39,20 @@
@Inject
CreateProjectTask(
RemoteConfig config,
- ReplicationConfig replicationConfig,
+ DestinationsCollection destinations,
DynamicItem<AdminApiFactory> adminApiFactory,
@Assisted Project.NameKey project,
@Assisted String head) {
this.config = config;
- this.replicationConfig = replicationConfig;
+ this.destinations = destinations;
this.adminApiFactory = adminApiFactory;
this.project = project;
this.head = head;
}
public boolean create() {
- return replicationConfig
- .getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION).values()
- .stream()
+ return destinations.getURIs(Optional.of(config.getName()), project, FilterType.PROJECT_CREATION)
+ .values().stream()
.map(u -> createProject(u, project, head))
.reduce(true, (a, b) -> a && b);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
index f9b2ad7..4617672 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -16,8 +16,8 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.ioutil.HexFormat;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 679776f..1524c2a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -26,13 +26,13 @@
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Lists;
import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.config.FactoryModule;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
-import com.google.gerrit.reviewdb.client.AccountGroup;
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.PluginUser;
import com.google.gerrit.server.account.GroupBackend;
@@ -60,11 +60,11 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
-import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -522,6 +522,7 @@
pending.put(uri, pushOp);
switch (reason) {
case COLLISION:
+ replicationTasksStorage.get().reset(pushOp);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
@@ -536,6 +537,7 @@
postReplicationFailedEvent(pushOp, status);
if (pushOp.setToRetry()) {
postReplicationScheduledEvent(pushOp);
+ replicationTasksStorage.get().reset(pushOp);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored2 =
pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
@@ -562,34 +564,30 @@
if (inFlightOp != null) {
return RunwayStatus.denied(inFlightOp.getId());
}
+ if (!replicationTasksStorage.get().start(op)) {
+ return RunwayStatus.deniedExternal();
+ }
inFlight.put(op.getURI(), op);
}
return RunwayStatus.allowed();
}
- void notifyFinished(PushOne task) {
+ void notifyFinished(PushOne op) {
synchronized (stateLock) {
- inFlight.remove(task.getURI());
- if (!task.wasCanceled()) {
- for (String ref : task.getRefs()) {
- if (!refHasPendingPush(task.getURI(), ref)) {
- replicationTasksStorage
- .get()
- .delete(
- new ReplicateRefUpdate(
- task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
- }
- }
- }
+ replicationTasksStorage.get().finish(op);
+ inFlight.remove(op.getURI());
}
}
- private boolean refHasPendingPush(URIish opUri, String ref) {
- return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
- }
-
- private boolean pushContainsRef(PushOne op, String ref) {
- return op != null && op.getRefs().contains(ref);
+ public Set<String> getPrunableTaskNames() {
+ Set<String> names = new HashSet<>();
+ for (PushOne push : pending.values()) {
+ if (!replicationTasksStorage.get().isWaiting(push)) {
+ repLog.debug("No longer isWaiting, can prune " + push.getURI());
+ names.add(push.toString());
+ }
+ }
+ return names;
}
boolean wouldPushProject(Project.NameKey project) {
@@ -607,27 +605,16 @@
}
boolean isSingleProjectMatch() {
- List<String> projects = config.getProjects();
- boolean ret = (projects.size() == 1);
- if (ret) {
- String projectMatch = projects.get(0);
- if (ReplicationFilter.getPatternType(projectMatch)
- != ReplicationFilter.PatternType.EXACT_MATCH) {
- // projectMatch is either regular expression, or wild-card.
- //
- // Even though they might refer to a single project now, they need not
- // after new projects have been created. Hence, we do not treat them as
- // matching a single project.
- ret = false;
- }
- }
- return ret;
+ return config.isSingleProjectMatch();
}
boolean wouldPushRef(String ref) {
if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) {
return false;
}
+ if (PushOne.ALL_REFS.equals(ref)) {
+ return true;
+ }
for (RefSpec s : config.getRemoteConfig().getPushRefSpecs()) {
if (s.matchSource(ref)) {
return true;
@@ -666,7 +653,7 @@
} else if (!remoteNameStyle.equals("slash")) {
repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
}
- String replacedPath = replaceName(uri.getPath(), name, isSingleProjectMatch());
+ String replacedPath = replaceName(uri.getPath(), name, config.isSingleProjectMatch());
if (replacedPath != null) {
uri = uri.setPath(replacedPath);
r.add(uri);
@@ -731,6 +718,10 @@
return config.getDelay() * 1000;
}
+ int getSlowLatencyThreshold() {
+ return config.getSlowLatencyThreshold();
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
@@ -750,7 +741,7 @@
ReplicationScheduledEvent event =
new ReplicationScheduledEvent(project.get(), ref, targetNode);
try {
- eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+ eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
} catch (PermissionBackendException e) {
repLog.error("error posting event", e);
}
@@ -764,7 +755,7 @@
RefReplicatedEvent event =
new RefReplicatedEvent(project.get(), ref, targetNode, RefPushResult.FAILED, status);
try {
- eventDispatcher.get().postEvent(new Branch.NameKey(project, ref), event);
+ eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
} catch (PermissionBackendException e) {
repLog.error("error posting event", e);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index f688cfc..4b757ea 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -16,13 +16,16 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.transport.RemoteConfig;
-public class DestinationConfiguration {
+public class DestinationConfiguration implements RemoteConfiguration {
static final int DEFAULT_REPLICATION_DELAY = 15;
static final int DEFAULT_RESCHEDULE_DELAY = 3;
static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
+ private static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
private final int delay;
private final int rescheduleDelay;
@@ -41,6 +44,7 @@
private final ImmutableList<String> authGroupNames;
private final RemoteConfig remoteConfig;
private final int maxRetries;
+ private final int slowLatencyThreshold;
protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
this.remoteConfig = remoteConfig;
@@ -67,16 +71,29 @@
maxRetries =
getInt(
remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+
+ slowLatencyThreshold =
+ (int)
+ ConfigUtil.getTimeUnit(
+ cfg,
+ "remote",
+ remoteConfig.getName(),
+ "slowLatencyThreshold",
+ DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+ TimeUnit.SECONDS);
}
+ @Override
public int getDelay() {
return delay;
}
+ @Override
public int getRescheduleDelay() {
return rescheduleDelay;
}
+ @Override
public int getRetryDelay() {
return retryDelay;
}
@@ -93,26 +110,32 @@
return lockErrorMaxRetries;
}
+ @Override
public ImmutableList<String> getUrls() {
return urls;
}
+ @Override
public ImmutableList<String> getAdminUrls() {
return adminUrls;
}
+ @Override
public ImmutableList<String> getProjects() {
return projects;
}
+ @Override
public ImmutableList<String> getAuthGroupNames() {
return authGroupNames;
}
+ @Override
public String getRemoteNameStyle() {
return remoteNameStyle;
}
+ @Override
public boolean replicatePermissions() {
return replicatePermissions;
}
@@ -129,10 +152,12 @@
return replicateHiddenProjects;
}
+ @Override
public RemoteConfig getRemoteConfig() {
return remoteConfig;
}
+ @Override
public int getMaxRetries() {
return maxRetries;
}
@@ -140,4 +165,9 @@
private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
return cfg.getInt("remote", rc.getName(), name, defValue);
}
+
+ @Override
+ public int getSlowLatencyThreshold() {
+ return slowLatencyThreshold;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
new file mode 100644
index 0000000..8aa3919
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -0,0 +1,348 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
+import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.Destination.Factory;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class DestinationsCollection implements ReplicationDestinations, ReplicationConfigValidator {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final Factory destinationFactory;
+ private final Provider<ReplicationQueue> replicationQueue;
+ private volatile List<Destination> destinations;
+ private boolean shuttingDown;
+
+ public static class EventQueueNotEmptyException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public EventQueueNotEmptyException(String errorMessage) {
+ super(errorMessage);
+ }
+ }
+
+ @Inject
+ public DestinationsCollection(
+ Destination.Factory destinationFactory,
+ Provider<ReplicationQueue> replicationQueue,
+ ReplicationFileBasedConfig replicationConfig,
+ EventBus eventBus)
+ throws ConfigInvalidException {
+ this.destinationFactory = destinationFactory;
+ this.replicationQueue = replicationQueue;
+ this.destinations = allDestinations(destinationFactory, validateConfig(replicationConfig));
+ eventBus.register(this);
+ }
+
+ @Override
+ public Multimap<Destination, URIish> getURIs(
+ Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
+ if (getAll(filterType).isEmpty()) {
+ return ImmutableMultimap.of();
+ }
+
+ SetMultimap<Destination, URIish> uris = HashMultimap.create();
+ for (Destination config : getAll(filterType)) {
+ if (!config.wouldPushProject(projectName)) {
+ continue;
+ }
+
+ if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
+ continue;
+ }
+
+ boolean adminURLUsed = false;
+
+ for (String url : config.getAdminUrls()) {
+ if (Strings.isNullOrEmpty(url)) {
+ continue;
+ }
+
+ URIish uri;
+ try {
+ uri = new URIish(url);
+ } catch (URISyntaxException e) {
+ repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
+ continue;
+ }
+
+ if (!isGerrit(uri) && !isGerritHttp(uri)) {
+ String path =
+ replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
+ if (path == null) {
+ repLog.warn("adminURL {} does not contain ${name}", uri);
+ continue;
+ }
+
+ uri = uri.setPath(path);
+ if (!isSSH(uri)) {
+ repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
+ continue;
+ }
+ }
+ uris.put(config, uri);
+ adminURLUsed = true;
+ }
+
+ if (!adminURLUsed) {
+ for (URIish uri : config.getURIs(projectName, "*")) {
+ uris.put(config, uri);
+ }
+ }
+ }
+ return uris;
+ }
+
+ @Override
+ public synchronized List<Destination> getAll(FilterType filterType) {
+ Predicate<? super Destination> filter;
+ switch (filterType) {
+ case PROJECT_CREATION:
+ filter = dest -> dest.isCreateMissingRepos();
+ break;
+ case PROJECT_DELETION:
+ filter = dest -> dest.isReplicateProjectDeletions();
+ break;
+ case ALL:
+ default:
+ filter = dest -> true;
+ break;
+ }
+ return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return destinations.isEmpty();
+ }
+
+ @Override
+ public synchronized void startup(WorkQueue workQueue) {
+ shuttingDown = false;
+ for (Destination cfg : destinations) {
+ cfg.start(workQueue);
+ }
+ }
+
+ /* shutdown() cannot be set as a synchronized method because
+ * it may need to wait for pending events to complete;
+ * e.g. when enabling the drain of replication events before
+ * shutdown.
+ *
+ * As a rule of thumb for synchronized methods, because they
+ * implicitly define a critical section and associated lock,
+ * they should never hold waiting for another resource, otherwise
+ * the risk of deadlock is very high.
+ *
+ * See more background about deadlocks, what they are and how to
+ * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+ */
+ @Override
+ public int shutdown() {
+ synchronized (this) {
+ shuttingDown = true;
+ }
+
+ int discarded = 0;
+ for (Destination cfg : destinations) {
+ try {
+ drainReplicationEvents(cfg);
+ } catch (EventQueueNotEmptyException e) {
+ logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+ } finally {
+ discarded += cfg.shutdown();
+ }
+ }
+ return discarded;
+ }
+
+ void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+ int drainQueueAttempts = destination.getDrainQueueAttempts();
+ if (drainQueueAttempts == 0) {
+ return;
+ }
+ int pending = destination.getQueueInfo().pending.size();
+ int inFlight = destination.getQueueInfo().inFlight.size();
+ while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+ try {
+ logger.atInfo().log(
+ "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+ inFlight, pending);
+ Thread.sleep(destination.getReplicationDelaySeconds());
+ } catch (InterruptedException ie) {
+ logger.atWarning().withCause(ie).log(
+ "Wait for replication events to drain has been interrupted");
+ }
+ pending = destination.getQueueInfo().pending.size();
+ inFlight = destination.getQueueInfo().inFlight.size();
+ drainQueueAttempts--;
+ }
+ if (pending > 0 || inFlight > 0) {
+ throw new EventQueueNotEmptyException(
+ String.format("Pending: %d - InFlight: %d", pending, inFlight));
+ }
+ }
+
+ @Subscribe
+ public synchronized void onReload(List<RemoteConfiguration> remoteConfigurations) {
+ if (shuttingDown) {
+ logger.atWarning().log("Shutting down: configuration reload ignored");
+ return;
+ }
+
+ try {
+ replicationQueue.get().stop();
+ destinations = allDestinations(destinationFactory, remoteConfigurations);
+ logger.atInfo().log("Configuration reloaded: %d destinations", getAll(FilterType.ALL).size());
+ } finally {
+ replicationQueue.get().start();
+ }
+ }
+
+ @Override
+ public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig replicationConfig)
+ throws ConfigInvalidException {
+ if (!replicationConfig.getConfig().getFile().exists()) {
+ logger.atWarning().log(
+ "Config file %s does not exist; not replicating",
+ replicationConfig.getConfig().getFile());
+ return Collections.emptyList();
+ }
+ if (replicationConfig.getConfig().getFile().length() == 0) {
+ logger.atInfo().log(
+ "Config file %s is empty; not replicating", replicationConfig.getConfig().getFile());
+ return Collections.emptyList();
+ }
+
+ try {
+ replicationConfig.getConfig().load();
+ } catch (ConfigInvalidException e) {
+ throw new ConfigInvalidException(
+ String.format(
+ "Config file %s is invalid: %s",
+ replicationConfig.getConfig().getFile(), e.getMessage()),
+ e);
+ } catch (IOException e) {
+ throw new ConfigInvalidException(
+ String.format(
+ "Cannot read %s: %s", replicationConfig.getConfig().getFile(), e.getMessage()),
+ e);
+ }
+
+ boolean defaultForceUpdate =
+ replicationConfig.getConfig().getBoolean("gerrit", "defaultForceUpdate", false);
+
+ ImmutableList.Builder<RemoteConfiguration> confs = ImmutableList.builder();
+ for (RemoteConfig c : allRemotes(replicationConfig.getConfig())) {
+ if (c.getURIs().isEmpty()) {
+ continue;
+ }
+
+ // If destination for push is not set assume equal to source.
+ for (RefSpec ref : c.getPushRefSpecs()) {
+ if (ref.getDestination() == null) {
+ ref.setDestination(ref.getSource());
+ }
+ }
+
+ if (c.getPushRefSpecs().isEmpty()) {
+ c.addPushRefSpec(
+ new RefSpec()
+ .setSourceDestination("refs/*", "refs/*")
+ .setForceUpdate(defaultForceUpdate));
+ }
+
+ DestinationConfiguration destinationConfiguration =
+ new DestinationConfiguration(c, replicationConfig.getConfig());
+
+ if (!destinationConfiguration.isSingleProjectMatch()) {
+ for (URIish u : c.getURIs()) {
+ if (u.getPath() == null || !u.getPath().contains("${name}")) {
+ throw new ConfigInvalidException(
+ String.format(
+ "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
+ c.getName(), u, replicationConfig.getConfig().getFile()));
+ }
+ }
+ }
+
+ confs.add(destinationConfiguration);
+ }
+
+ return confs.build();
+ }
+
+ private List<Destination> allDestinations(
+ Destination.Factory destinationFactory, List<RemoteConfiguration> remoteConfigurations) {
+
+ ImmutableList.Builder<Destination> dest = ImmutableList.builder();
+ for (RemoteConfiguration c : remoteConfigurations) {
+ if (c instanceof DestinationConfiguration) {
+ dest.add(destinationFactory.create((DestinationConfiguration) c));
+ }
+ }
+ return dest.build();
+ }
+
+ private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
+ Set<String> names = cfg.getSubsections("remote");
+ List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+ for (String name : names) {
+ try {
+ result.add(new RemoteConfig(cfg, name));
+ } catch (URISyntaxException e) {
+ throw new ConfigInvalidException(
+ String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
+ }
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index aaf2b15..f910a40 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -18,8 +18,8 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
import com.google.common.base.Charsets;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.restapi.Url;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 56cff5a..d37321a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.server.ssh.SshAddressesModule;
import java.io.IOException;
import java.io.OutputStream;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index fa17dce..d337883 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -40,11 +40,11 @@
@Option(name = "--json", usage = "output in json format")
private boolean json;
- @Inject private ReplicationConfig config;
+ @Inject private ReplicationDestinations destinations;
@Override
protected void run() {
- for (Destination d : config.getDestinations(FilterType.ALL)) {
+ for (Destination d : destinations.getAll(FilterType.ALL)) {
if (matches(d.getRemoteConfigName())) {
printRemote(d);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index aa6e16c..da960e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -16,7 +16,7 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import java.io.File;
import java.io.IOException;
import org.eclipse.jgit.internal.storage.file.FileRepository;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
new file mode 100644
index 0000000..1007ae5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ObservableQueue.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+/** Allows a queue activity to be observed */
+public interface ObservableQueue {
+ /**
+ * Indicates whether the observed queue is running
+ *
+ * @return true, when the queue is running, false otherwise
+ */
+ boolean isRunning();
+
+ /**
+ * Indicates whether the observed queue is replaying queued events
+ *
+ * @return true, when the queue is replaying, false otherwise
+ */
+ boolean isReplaying();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
index 833b02b..4f60319 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushAll.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.common.Nullable;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.project.ProjectCache;
import com.google.inject.Inject;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 56cecfe..d31ae3b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -16,19 +16,20 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toMap;
import com.google.common.base.Throwables;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.git.ProjectRunnable;
@@ -318,6 +319,8 @@
if (!status.isAllowed()) {
if (status.isCanceled()) {
repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri);
+ } else if (status.isExternalInflight()) {
+ repLog.info("PushOp for replication to {} was denied externally", uri);
} else {
repLog.info(
"Rescheduling replication to {} to avoid collision with the in-flight push [{}].",
@@ -329,14 +332,19 @@
}
repLog.info("Replication to {} started...", uri);
- Timer1.Context context = metrics.start(config.getName());
+ Timer1.Context<String> destinationContext = metrics.start(config.getName());
try {
- long startedAt = context.getStartTime();
+ long startedAt = destinationContext.getStartTime();
long delay = NANOSECONDS.toMillis(startedAt - createdAt);
metrics.record(config.getName(), delay, retryCount);
git = gitManager.openRepository(projectName);
runImpl();
- long elapsed = NANOSECONDS.toMillis(context.stop());
+ long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
+
+ if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
+ metrics.recordSlowProjectReplication(
+ config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed);
+ }
repLog.info(
"Replication to {} completed in {}ms, {}ms delay, {} retries",
uri,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index fccdb7b..d1ab790 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -14,9 +14,10 @@
package com.googlesource.gerrit.plugins.replication;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.server.events.RefEvent;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import java.util.Objects;
import org.eclipse.jgit.transport.RemoteRefUpdate;
import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
@@ -45,11 +46,40 @@
@Override
public Project.NameKey getProjectNameKey() {
- return new Project.NameKey(project);
+ return Project.nameKey(project);
}
@Override
public String getRefName() {
return ref;
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof RefReplicatedEvent)) {
+ return false;
+ }
+ RefReplicatedEvent event = (RefReplicatedEvent) other;
+ if (!Objects.equals(event.project, this.project)) {
+ return false;
+ }
+ if (!Objects.equals(event.ref, this.ref)) {
+ return false;
+ }
+ if (!Objects.equals(event.targetNode, this.targetNode)) {
+ return false;
+ }
+ if (!Objects.equals(event.status, this.status)) {
+ return false;
+ }
+ if (!Objects.equals(event.refStatus, this.refStatus)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index 4789a96..e663194 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -14,8 +14,9 @@
package com.googlesource.gerrit.plugins.replication;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
public class RefReplicationDoneEvent extends RefEvent {
public static final String TYPE = "ref-replication-done";
@@ -33,11 +34,35 @@
@Override
public Project.NameKey getProjectNameKey() {
- return new Project.NameKey(project);
+ return Project.nameKey(project);
}
@Override
public String getRefName() {
return ref;
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof RefReplicationDoneEvent)) {
+ return false;
+ }
+
+ RefReplicationDoneEvent event = (RefReplicationDoneEvent) other;
+ if (!Objects.equals(event.project, this.project)) {
+ return false;
+ }
+ if (!Objects.equals(event.ref, this.ref)) {
+ return false;
+ }
+ if (event.nodesCount != this.nodesCount) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
new file mode 100644
index 0000000..b66e73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteConfiguration.java
@@ -0,0 +1,122 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+/** Remote configuration for a replication endpoint */
+public interface RemoteConfiguration {
+ /**
+ * Time to wait before scheduling a remote replication operation. Setting to 0 effectively
+ * disables the delay.
+ *
+ * @return the delay value in seconds
+ */
+ int getDelay();
+ /**
+ * Time to wait before rescheduling a remote replication operation, which might have failed the
+ * first time round. Setting to 0 effectively disables the delay.
+ *
+ * @return the delay value in seconds
+ */
+ int getRescheduleDelay();
+ /**
+ * Time to wait before retrying a failed remote replication operation, Setting to 0 effectively
+ * disables the delay.
+ *
+ * @return the delay value in seconds
+ */
+ int getRetryDelay();
+ /**
+ * List of the remote endpoint addresses used for replication.
+ *
+ * @return list of remote URL strings
+ */
+ ImmutableList<String> getUrls();
+ /**
+ * List of alternative remote endpoint addresses, used for admin operations, such as repository
+ * creation
+ *
+ * @return list of remote URL strings
+ */
+ ImmutableList<String> getAdminUrls();
+ /**
+ * List of repositories that should be replicated
+ *
+ * @return list of project strings
+ */
+ ImmutableList<String> getProjects();
+ /**
+ * List of groups that should be used to access the repositories.
+ *
+ * @return list of group strings
+ */
+ ImmutableList<String> getAuthGroupNames();
+ /**
+ * Influence how the name of the remote repository should be computed.
+ *
+ * @return a string representing a remote style name
+ */
+ String getRemoteNameStyle();
+ /**
+ * If true, permissions-only projects and the refs/meta/config branch will also be replicated
+ *
+ * @return a string representing a remote style name
+ */
+ boolean replicatePermissions();
+ /**
+ * the JGIT remote configuration representing the replication for this endpoint
+ *
+ * @return The remote config {@link RemoteConfig}
+ */
+ RemoteConfig getRemoteConfig();
+ /**
+ * Number of times to retry a replication operation
+ *
+ * @return the number of retries
+ */
+ int getMaxRetries();
+
+ /**
+ * the time duration after which the replication for a project should be considered “slow”
+ *
+ * @return the slow latency threshold
+ */
+ int getSlowLatencyThreshold();
+
+ /**
+ * Whether the remote configuration is for a single project only
+ *
+ * @return true, when configuration is for a single project, false otherwise
+ */
+ default boolean isSingleProjectMatch() {
+ List<String> projects = getProjects();
+ boolean ret = (projects.size() == 1);
+ if (ret) {
+ String projectMatch = projects.get(0);
+ if (ReplicationFilter.getPatternType(projectMatch)
+ != ReplicationFilter.PatternType.EXACT_MATCH) {
+ // projectMatch is either regular expression, or wild-card.
+ //
+ // Even though they might refer to a single project now, they need not
+ // after new projects have been created. Hence, we do not treat them as
+ // matching a single project.
+ ret = false;
+ }
+ }
+ return ret;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index ee9d4c0..d4d979f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -16,7 +16,7 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import java.io.IOException;
import java.io.OutputStream;
import org.eclipse.jgit.transport.URIish;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 929c538..68fe430 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -11,44 +11,70 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
+
package com.googlesource.gerrit.plugins.replication;
-import com.google.common.collect.Multimap;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.git.WorkQueue;
import java.nio.file.Path;
-import java.util.List;
-import java.util.Optional;
-import org.eclipse.jgit.transport.URIish;
+/** Configuration of all the replication end points. */
public interface ReplicationConfig {
+ /** Filter for accessing replication projects. */
enum FilterType {
PROJECT_CREATION,
PROJECT_DELETION,
ALL
}
- List<Destination> getDestinations(FilterType filterType);
-
- Multimap<Destination, URIish> getURIs(
- Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
-
+ /**
+ * Returns current replication configuration of whether to replicate or not all the projects when
+ * the plugin starts.
+ *
+ * @return true if replication at plugin start, false otherwise.
+ */
boolean isReplicateAllOnPluginStart();
+ /**
+ * Returns the default behaviour of the replication plugin when pushing to remote replication
+ * ends. Even though the property name has the 'update' suffix, it actually refers to Git push
+ * operation and not to a Git update.
+ *
+ * @return true if forced push is the default, false otherwise.
+ */
boolean isDefaultForceUpdate();
+ /**
+ * Returns the interval in seconds for running task distribution.
+ *
+ * @return number of seconds, zero if never.
+ */
+ int getDistributionInterval();
+
+ /**
+ * Returns the maximum number of ref-specs to log into the replication_log whenever a push
+ * operation is completed against a replication end.
+ *
+ * @return maximum number of refs to log, zero if unlimited.
+ */
int getMaxRefsToLog();
- boolean isEmpty();
-
+ /**
+ * Configured location where the replication events are stored on the filesystem for being resumed
+ * and kept across restarts.
+ *
+ * @return path to store persisted events.
+ */
Path getEventsDirectory();
- int shutdown();
-
- void startup(WorkQueue workQueue);
-
int getSshConnectionTimeout();
int getSshCommandTimeout();
+
+ /**
+ * Current logical version string of the current configuration loaded in memory, depending on the
+ * actual implementation of the configuration on the persistent storage.
+ *
+ * @return current logical version number.
+ */
+ String getVersion();
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java
new file mode 100644
index 0000000..7883114
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfigValidator.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+
+public interface ReplicationConfigValidator {
+
+ /**
+ * validate the new replication.config
+ *
+ * @param newConfig new configuration detected
+ * @return List of validated {@link RemoteConfiguration}
+ * @throws ConfigInvalidException if the new configuration is not valid.
+ */
+ List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig)
+ throws ConfigInvalidException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
new file mode 100644
index 0000000..b191d7d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationDestinations.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import com.google.common.collect.Multimap;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import java.util.List;
+import java.util.Optional;
+import org.eclipse.jgit.transport.URIish;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationDestinations {
+
+ /**
+ * Return all the URIs associated to a project and a filter criteria.
+ *
+ * @param remoteName name of the replication end or empty if selecting all ends.
+ * @param projectName name of the project
+ * @param filterType type of filter criteria for selecting projects
+ * @return the multi-map of destinations and the associated replication URIs
+ */
+ Multimap<Destination, URIish> getURIs(
+ Optional<String> remoteName, Project.NameKey projectName, FilterType filterType);
+
+ /**
+ * List of currently active replication destinations.
+ *
+ * @param filterType type project filtering
+ * @return the list of active destinations
+ */
+ List<Destination> getAll(FilterType filterType);
+
+ /** @return true if there are no destinations, false otherwise. */
+ boolean isEmpty();
+
+ /**
+ * Start replicating to all destinations.
+ *
+ * @param workQueue execution queue for scheduling the replication events.
+ */
+ void startup(WorkQueue workQueue);
+
+ /**
+ * Stop the replication to all destinations.
+ *
+ * @return number of events cancelled during shutdown.
+ */
+ int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 6476412..d714376 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -13,51 +13,19 @@
// limitations under the License.
package com.googlesource.gerrit.plugins.replication;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerrit;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
-import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
-import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-
import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.SetMultimap;
-import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.annotations.PluginData;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.nio.file.Path;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Predicate;
-import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.util.FS;
@Singleton
public class ReplicationFileBasedConfig implements ReplicationConfig {
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
- private List<Destination> destinations;
private final SitePaths site;
private Path cfgPath;
private boolean replicateAllOnPluginStart;
@@ -69,177 +37,17 @@
private final Path pluginDataDir;
@Inject
- public ReplicationFileBasedConfig(
- SitePaths site, Destination.Factory destinationFactory, @PluginData Path pluginDataDir)
- throws ConfigInvalidException, IOException {
+ public ReplicationFileBasedConfig(SitePaths site, @PluginData Path pluginDataDir) {
this.site = site;
this.cfgPath = site.etc_dir.resolve("replication.config");
this.config = new FileBasedConfig(cfgPath.toFile(), FS.DETECTED);
- this.destinations = allDestinations(destinationFactory);
+ this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
+ this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
+ this.maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
this.pluginDataDir = pluginDataDir;
}
- /*
- * (non-Javadoc)
- * @see
- * com.googlesource.gerrit.plugins.replication.ReplicationConfig#getDestinations
- * (com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType)
- */
- @Override
- public List<Destination> getDestinations(FilterType filterType) {
- Predicate<? super Destination> filter;
- switch (filterType) {
- case PROJECT_CREATION:
- filter = dest -> dest.isCreateMissingRepos();
- break;
- case PROJECT_DELETION:
- filter = dest -> dest.isReplicateProjectDeletions();
- break;
- case ALL:
- default:
- filter = dest -> true;
- break;
- }
- return destinations.stream().filter(Objects::nonNull).filter(filter).collect(toList());
- }
-
- private List<Destination> allDestinations(Destination.Factory destinationFactory)
- throws ConfigInvalidException, IOException {
- if (!config.getFile().exists()) {
- logger.atWarning().log("Config file %s does not exist; not replicating", config.getFile());
- return Collections.emptyList();
- }
- if (config.getFile().length() == 0) {
- logger.atInfo().log("Config file %s is empty; not replicating", config.getFile());
- return Collections.emptyList();
- }
-
- try {
- config.load();
- } catch (ConfigInvalidException e) {
- throw new ConfigInvalidException(
- String.format("Config file %s is invalid: %s", config.getFile(), e.getMessage()), e);
- } catch (IOException e) {
- throw new IOException(
- String.format("Cannot read %s: %s", config.getFile(), e.getMessage()), e);
- }
-
- replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false);
-
- defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false);
-
- maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
-
- sshCommandTimeout =
- (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
- sshConnectionTimeout =
- (int)
- ConfigUtil.getTimeUnit(
- config,
- "gerrit",
- null,
- "sshConnectionTimeout",
- DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
- MILLISECONDS);
-
- ImmutableList.Builder<Destination> dest = ImmutableList.builder();
- for (RemoteConfig c : allRemotes(config)) {
- if (c.getURIs().isEmpty()) {
- continue;
- }
-
- // If destination for push is not set assume equal to source.
- for (RefSpec ref : c.getPushRefSpecs()) {
- if (ref.getDestination() == null) {
- ref.setDestination(ref.getSource());
- }
- }
-
- if (c.getPushRefSpecs().isEmpty()) {
- c.addPushRefSpec(
- new RefSpec()
- .setSourceDestination("refs/*", "refs/*")
- .setForceUpdate(defaultForceUpdate));
- }
-
- Destination destination = destinationFactory.create(new DestinationConfiguration(c, config));
-
- if (!destination.isSingleProjectMatch()) {
- for (URIish u : c.getURIs()) {
- if (u.getPath() == null || !u.getPath().contains("${name}")) {
- throw new ConfigInvalidException(
- String.format(
- "remote.%s.url \"%s\" lacks ${name} placeholder in %s",
- c.getName(), u, config.getFile()));
- }
- }
- }
-
- dest.add(destination);
- }
- return dest.build();
- }
-
- @Override
- public Multimap<Destination, URIish> getURIs(
- Optional<String> remoteName, Project.NameKey projectName, FilterType filterType) {
- if (getDestinations(filterType).isEmpty()) {
- return ImmutableMultimap.of();
- }
-
- SetMultimap<Destination, URIish> uris = HashMultimap.create();
- for (Destination config : getDestinations(filterType)) {
- if (!config.wouldPushProject(projectName)) {
- continue;
- }
-
- if (remoteName.isPresent() && !config.getRemoteConfigName().equals(remoteName.get())) {
- continue;
- }
-
- boolean adminURLUsed = false;
-
- for (String url : config.getAdminUrls()) {
- if (Strings.isNullOrEmpty(url)) {
- continue;
- }
-
- URIish uri;
- try {
- uri = new URIish(url);
- } catch (URISyntaxException e) {
- repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage());
- continue;
- }
-
- if (!isGerrit(uri) && !isGerritHttp(uri)) {
- String path =
- replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch());
- if (path == null) {
- repLog.warn("adminURL {} does not contain ${name}", uri);
- continue;
- }
-
- uri = uri.setPath(path);
- if (!isSSH(uri)) {
- repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri);
- continue;
- }
- }
- uris.put(config, uri);
- adminURLUsed = true;
- }
-
- if (!adminURLUsed) {
- for (URIish uri : config.getURIs(projectName, "*")) {
- uris.put(config, uri);
- }
- }
- }
- return uris;
- }
-
- static String replaceName(String in, String name, boolean keyIsOptional) {
+ public static String replaceName(String in, String name, boolean keyIsOptional) {
String key = "${name}";
int n = in.indexOf(key);
if (0 <= n) {
@@ -268,32 +76,15 @@
}
@Override
+ public int getDistributionInterval() {
+ return config.getInt("replication", "distributionInterval", 0);
+ }
+
+ @Override
public int getMaxRefsToLog() {
return maxRefsToLog;
}
- private static List<RemoteConfig> allRemotes(FileBasedConfig cfg) throws ConfigInvalidException {
- Set<String> names = cfg.getSubsections("remote");
- List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
- for (String name : names) {
- try {
- result.add(new RemoteConfig(cfg, name));
- } catch (URISyntaxException e) {
- throw new ConfigInvalidException(
- String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
- }
- }
- return result;
- }
-
- /* (non-Javadoc)
- * @see com.googlesource.gerrit.plugins.replication.ReplicationConfig#isEmpty()
- */
- @Override
- public boolean isEmpty() {
- return destinations.isEmpty();
- }
-
@Override
public Path getEventsDirectory() {
String eventsDirectory = config.getString("replication", null, "eventsDirectory");
@@ -307,67 +98,13 @@
return cfgPath;
}
- @Override
- public int shutdown() {
- int discarded = 0;
- for (Destination cfg : destinations) {
- try {
- drainReplicationEvents(cfg);
- } catch (EventQueueNotEmptyException e) {
- logger.atWarning().log("Event queue not empty: %s", e.getMessage());
- } finally {
- discarded += cfg.shutdown();
- }
- }
- return discarded;
- }
-
- void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
- int drainQueueAttempts = destination.getDrainQueueAttempts();
- if (drainQueueAttempts == 0) {
- return;
- }
- int pending = destination.getQueueInfo().pending.size();
- int inFlight = destination.getQueueInfo().inFlight.size();
-
- while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
- try {
- logger.atInfo().log(
- "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
- inFlight, pending);
- Thread.sleep(destination.getReplicationDelaySeconds());
- } catch (InterruptedException ie) {
- logger.atWarning().withCause(ie).log(
- "Wait for replication events to drain has been interrupted");
- }
- pending = destination.getQueueInfo().pending.size();
- inFlight = destination.getQueueInfo().inFlight.size();
- drainQueueAttempts--;
- }
-
- if (pending > 0 || inFlight > 0) {
- throw new EventQueueNotEmptyException(
- String.format("Pending: %d - InFlight: %d", pending, inFlight));
- }
- }
-
- public static class EventQueueNotEmptyException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public EventQueueNotEmptyException(String errorMessage) {
- super(errorMessage);
- }
- }
-
- FileBasedConfig getConfig() {
+ public FileBasedConfig getConfig() {
return config;
}
@Override
- public void startup(WorkQueue workQueue) {
- for (Destination cfg : destinations) {
- cfg.start(workQueue);
- }
+ public String getVersion() {
+ return Long.toString(config.getFile().lastModified());
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
index 05bbb03..5b4204e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFilter.java
@@ -15,7 +15,7 @@
package com.googlesource.gerrit.plugins.replication;
import com.google.gerrit.common.data.AccessSection;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import java.util.Collections;
import java.util.List;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
index afc7926..1bc17ec 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationMetrics.java
@@ -14,11 +14,14 @@
package com.googlesource.gerrit.plugins.replication;
+import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.Field;
import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.Histogram3;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -27,10 +30,37 @@
private final Timer1<String> executionTime;
private final Histogram1<String> executionDelay;
private final Histogram1<String> executionRetries;
+ private final Histogram3<Integer, String, String> slowProjectReplicationLatency;
@Inject
- ReplicationMetrics(MetricMaker metricMaker) {
- Field<String> DEST_FIELD = Field.ofString("destination");
+ ReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+ Field<String> DEST_FIELD =
+ Field.ofString(
+ "destination",
+ (metadataBuilder, fieldValue) ->
+ metadataBuilder
+ .pluginName(pluginName)
+ .addPluginMetadata(PluginMetadata.create("destination", fieldValue)))
+ .build();
+
+ Field<String> PROJECT_FIELD =
+ Field.ofString(
+ "project",
+ (metadataBuilder, fieldValue) ->
+ metadataBuilder
+ .pluginName(pluginName)
+ .addPluginMetadata(PluginMetadata.create("project", fieldValue)))
+ .build();
+
+ Field<Integer> SLOW_THRESHOLD_FIELD =
+ Field.ofInteger(
+ "slow_threshold",
+ (metadataBuilder, fieldValue) ->
+ metadataBuilder
+ .pluginName(pluginName)
+ .addPluginMetadata(
+ PluginMetadata.create("slow_threshold", fieldValue.toString())))
+ .build();
executionTime =
metricMaker.newTimer(
@@ -55,6 +85,17 @@
.setCumulative()
.setUnit("retries"),
DEST_FIELD);
+
+ slowProjectReplicationLatency =
+ metricMaker.newHistogram(
+ "latency_slower_than_threshold" + "",
+ new Description(
+ "latency for project to destination, where latency was slower than threshold")
+ .setCumulative()
+ .setUnit(Description.Units.MILLISECONDS),
+ SLOW_THRESHOLD_FIELD,
+ PROJECT_FIELD,
+ DEST_FIELD);
}
/**
@@ -63,7 +104,7 @@
* @param name the destination name.
* @return the timer context.
*/
- Timer1.Context start(String name) {
+ Timer1.Context<String> start(String name) {
return executionTime.start(name);
}
@@ -78,4 +119,17 @@
executionDelay.record(name, delay);
executionRetries.record(name, retries);
}
+
+ /**
+ * Record replication latency for project to destination, where latency was slower than threshold
+ *
+ * @param destinationName the destination name.
+ * @param projectName the project name.
+ * @param slowThreshold replication initialDelay in milliseconds.
+ * @param latency number of retries.
+ */
+ void recordSlowProjectReplication(
+ String destinationName, String projectName, Integer slowThreshold, long latency) {
+ slowProjectReplicationLatency.record(slowThreshold, destinationName, projectName, latency);
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index 835d068..979c8e3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -16,6 +16,7 @@
import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
+import com.google.common.eventbus.EventBus;
import com.google.gerrit.extensions.annotations.Exports;
import com.google.gerrit.extensions.config.CapabilityDefinition;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
@@ -23,18 +24,35 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.events.EventTypes;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.internal.UniqueAnnotations;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.transport.SshSessionFactory;
+import org.eclipse.jgit.util.FS;
class ReplicationModule extends AbstractModule {
+ private final Path cfgPath;
+
+ @Inject
+ public ReplicationModule(SitePaths site) {
+ cfgPath = site.etc_dir.resolve("replication.config");
+ }
+
@Override
protected void configure() {
install(new FactoryModuleBuilder().build(Destination.Factory.class));
bind(ReplicationQueue.class).in(Scopes.SINGLETON);
+ bind(ObservableQueue.class).to(ReplicationQueue.class);
bind(LifecycleListener.class)
.annotatedWith(UniqueAnnotations.create())
.to(ReplicationQueue.class);
@@ -57,7 +75,19 @@
install(new FactoryModuleBuilder().build(PushAll.Factory.class));
- bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+ bind(EventBus.class).in(Scopes.SINGLETON);
+ bind(ReplicationDestinations.class).to(DestinationsCollection.class);
+ bind(ReplicationConfigValidator.class).to(DestinationsCollection.class);
+
+ if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+ bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+ bind(LifecycleListener.class)
+ .annotatedWith(UniqueAnnotations.create())
+ .to(AutoReloadConfigDecorator.class);
+ } else {
+ bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+ }
+
DynamicSet.setOf(binder(), ReplicationStateListener.class);
DynamicSet.bind(binder(), ReplicationStateListener.class).to(ReplicationStateLogger.class);
@@ -68,4 +98,15 @@
bind(TransportFactory.class).to(TransportFactoryImpl.class).in(Scopes.SINGLETON);
}
+
+ private FileBasedConfig getReplicationConfig() {
+ File replicationConfigFile = cfgPath.toFile();
+ FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+ try {
+ config.load();
+ } catch (IOException | ConfigInvalidException e) {
+ throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+ }
+ return config;
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 8030d28..0dcfc95 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,18 +14,21 @@
package com.googlesource.gerrit.plugins.replication;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
@@ -33,13 +36,16 @@
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Manages automatic replication to remote repositories. */
public class ReplicationQueue
- implements LifecycleListener,
+ implements ObservableQueue,
+ LifecycleListener,
GitReferenceUpdatedListener,
ProjectDeletedListener,
HeadUpdatedListener {
@@ -48,24 +54,28 @@
private final ReplicationStateListener stateLog;
+ private final ReplicationConfig replConfig;
private final WorkQueue workQueue;
private final DynamicItem<EventDispatcher> dispatcher;
- private final ReplicationConfig config;
+ private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency
private final ReplicationTasksStorage replicationTasksStorage;
private volatile boolean running;
private volatile boolean replaying;
private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
+ private Distributor distributor;
@Inject
ReplicationQueue(
- WorkQueue wq,
ReplicationConfig rc,
+ WorkQueue wq,
+ Provider<ReplicationDestinations> rd,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
ReplicationTasksStorage rts) {
+ replConfig = rc;
workQueue = wq;
dispatcher = dis;
- config = rc;
+ destinations = rd;
stateLog = sl;
replicationTasksStorage = rts;
beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
@@ -74,26 +84,31 @@
@Override
public void start() {
if (!running) {
- config.startup(workQueue);
+ destinations.get().startup(workQueue);
running = true;
+ replicationTasksStorage.resetAll();
firePendingEvents();
fireBeforeStartupEvents();
+ distributor = new Distributor(workQueue);
}
}
@Override
public void stop() {
running = false;
- int discarded = config.shutdown();
+ distributor.stop();
+ int discarded = destinations.get().shutdown();
if (discarded > 0) {
repLog.warn("Canceled {} replication events during shutdown", discarded);
}
}
+ @Override
public boolean isRunning() {
return running;
}
+ @Override
public boolean isReplaying() {
return replaying;
}
@@ -105,49 +120,46 @@
@VisibleForTesting
public void scheduleFullSync(
Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
- if (!running) {
- stateLog.warn("Replication plugin did not finish startup before event", state);
- return;
- }
-
- for (Destination cfg : config.getDestinations(FilterType.ALL)) {
- if (cfg.wouldPushProject(project)) {
- for (URIish uri : cfg.getURIs(project, urlMatch)) {
- cfg.schedule(project, PushOne.ALL_REFS, uri, state, now);
- replicationTasksStorage.persist(
- new ReplicateRefUpdate(
- project.get(), PushOne.ALL_REFS, uri, cfg.getRemoteConfigName()));
- }
- }
- }
+ fire(project, urlMatch, PushOne.ALL_REFS, state, now, false);
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
- onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ fire(event.getProjectName(), event.getRefName(), false);
}
- private void onGitReferenceUpdated(String projectName, String refName) {
+ private void fire(String projectName, String refName, boolean isPersisted) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
+ fire(Project.nameKey(projectName), null, refName, state, false, isPersisted);
+ state.markAllPushTasksScheduled();
+ }
+
+ private void fire(
+ Project.NameKey project,
+ String urlMatch,
+ String refName,
+ ReplicationState state,
+ boolean now,
+ boolean isPersisted) {
if (!running) {
stateLog.warn(
"Replication plugin did not finish startup before event, event replication is postponed",
state);
- beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
+ beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName));
return;
}
- Project.NameKey project = new Project.NameKey(projectName);
- for (Destination cfg : config.getDestinations(FilterType.ALL)) {
+ for (Destination cfg : destinations.get().getAll(FilterType.ALL)) {
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
- for (URIish uri : cfg.getURIs(project, null)) {
- replicationTasksStorage.persist(
- new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
- cfg.schedule(project, refName, uri, state);
+ for (URIish uri : cfg.getURIs(project, urlMatch)) {
+ if (!isPersisted) {
+ replicationTasksStorage.create(
+ new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName()));
+ }
+ cfg.schedule(project, refName, uri, state, now);
}
}
}
- state.markAllPushTasksScheduled();
}
private void firePendingEvents() {
@@ -155,11 +167,11 @@
try {
Set<String> eventsReplayed = new HashSet<>();
replaying = true;
- for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+ for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) {
String eventKey = String.format("%s:%s", t.project, t.ref);
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", eventKey);
- onGitReferenceUpdated(t.project, t.ref);
+ fire(t.project, t.ref, true);
eventsReplayed.add(eventKey);
}
}
@@ -168,17 +180,41 @@
}
}
+ private void pruneCompleted() {
+ // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes.
+ // We also cannot access them by taskId since PushOnes don't have a taskId, they do have
+ // and Id, but it not the id assigned to the task in the queues. The tasks in the queue
+ // do use the same name as returned by toString() though, so that be used to correlate
+ // PushOnes with queue tasks despite their wrappers.
+ Set<String> prunableTaskNames = new HashSet<>();
+ for (Destination destination : destinations.get().getAll(FilterType.ALL)) {
+ prunableTaskNames.addAll(destination.getPrunableTaskNames());
+ }
+
+ for (WorkQueue.Task<?> task : workQueue.getTasks()) {
+ WorkQueue.Task.State state = task.getState();
+ if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) {
+ if (task instanceof WorkQueue.ProjectTask) {
+ if (prunableTaskNames.contains(task.toString())) {
+ repLog.debug("Pruning externally completed task:" + task);
+ task.cancel(false);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void onProjectDeleted(ProjectDeletedListener.Event event) {
- Project.NameKey p = new Project.NameKey(event.getProjectName());
- config.getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
+ Project.NameKey p = Project.nameKey(event.getProjectName());
+ destinations.get().getURIs(Optional.empty(), p, FilterType.PROJECT_DELETION).entries().stream()
.forEach(e -> e.getKey().scheduleDeleteProject(e.getValue(), p));
}
@Override
public void onHeadUpdated(HeadUpdatedListener.Event event) {
- Project.NameKey p = new Project.NameKey(event.getProjectName());
- config.getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
+ Project.NameKey p = Project.nameKey(event.getProjectName());
+ destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
.forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName()));
}
@@ -188,7 +224,7 @@
String eventKey = String.format("%s:%s", event.projectName(), event.refName());
if (!eventsReplayed.contains(eventKey)) {
repLog.info("Firing pending task {}", event);
- onGitReferenceUpdated(event.projectName(), event.refName());
+ fire(event.projectName(), event.refName(), false);
eventsReplayed.add(eventKey);
}
}
@@ -205,4 +241,49 @@
public abstract String refName();
}
+
+ protected class Distributor implements WorkQueue.CancelableRunnable {
+ public ScheduledThreadPoolExecutor executor;
+ public ScheduledFuture<?> future;
+
+ public Distributor(WorkQueue wq) {
+ int distributionInterval = replConfig.getDistributionInterval();
+ if (distributionInterval > 0) {
+ executor = wq.createQueue(1, "Replication Distribution", false);
+ future =
+ executor.scheduleWithFixedDelay(
+ this, distributionInterval, distributionInterval, SECONDS);
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!running) {
+ return;
+ }
+ try {
+ firePendingEvents();
+ pruneCompleted();
+ } catch (Exception e) {
+ repLog.error("error distributing tasks", e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ future.cancel(true);
+ }
+
+ public void stop() {
+ if (executor != null) {
+ cancel();
+ executor.getQueue().remove(this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Replication Distributor";
+ }
+ }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index aa965fe..28f6b6b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.replication;
-import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.server.events.RefEvent;
public class ReplicationScheduledEvent extends RefEvent {
@@ -38,6 +38,6 @@
@Override
public Project.NameKey getProjectNameKey() {
- return new Project.NameKey(project);
+ return Project.nameKey(project);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 64397f9..992cf5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,63 +25,108 @@
import com.google.inject.Singleton;
import java.io.IOException;
import java.nio.file.DirectoryStream;
+import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.transport.URIish;
+/**
+ * A persistent store for replication tasks.
+ *
+ * <p>The data of this store lives under <replication_data>/ref-updates where replication_data is
+ * determined by the replication.eventsDirectory config option and defaults to
+ * <site_dir>/data/replication. Atomic renames must be supported from anywhere within the store to
+ * anywhere within the store. This generally means that all the contents of the store needs to live
+ * on the same filesystem.
+ *
+ * <p>Individual tasks are stored in files under the following directories using the sha1 of the
+ * task:
+ *
+ * <p><code>
+ * .../building/<tmp_name> new replication tasks under construction
+ * .../running/<uri_sha1>/ lock for URI
+ * .../running/<uri_sha1>/<task_sha1> running replication tasks
+ * .../waiting/<task_sha1> outstanding replication tasks
+ * </code>
+ *
+ * <p>The URI lock is acquired by creating the directory and released by removing it.
+ *
+ * <p>Tasks are moved atomically via a rename between those directories to indicate the current
+ * state of each task.
+ */
@Singleton
public class ReplicationTasksStorage {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private boolean disableDeleteForTesting;
- public static class ReplicateRefUpdate {
- public final String project;
+ public static class ReplicateRefUpdate extends UriUpdate {
public final String ref;
+
+ public ReplicateRefUpdate(UriUpdate update, String ref) {
+ this(update.project, ref, update.uri, update.remote);
+ }
+
+ public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+ this(project, ref, uri.toASCIIString(), remote);
+ }
+
+ protected ReplicateRefUpdate(String project, String ref, String uri, String remote) {
+ super(project, uri, remote);
+ this.ref = ref;
+ }
+
+ @Override
+ public String toString() {
+ return "ref-update " + ref + " (" + super.toString() + ")";
+ }
+ }
+
+ public static class UriUpdate {
+ public final String project;
public final String uri;
public final String remote;
- public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+ public UriUpdate(PushOne push) {
+ this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName());
+ }
+
+ public UriUpdate(String project, URIish uri, String remote) {
+ this(project, uri.toASCIIString(), remote);
+ }
+
+ public UriUpdate(String project, String uri, String remote) {
this.project = project;
- this.ref = ref;
- this.uri = uri.toASCIIString();
+ this.uri = uri;
this.remote = remote;
}
@Override
public String toString() {
- return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
+ return "uri-update " + project + " uri:" + uri + " remote:" + remote;
}
}
private static Gson GSON = new Gson();
- private final Path refUpdates;
+ private final Path buildingUpdates;
+ private final Path runningUpdates;
+ private final Path waitingUpdates;
@Inject
ReplicationTasksStorage(ReplicationConfig config) {
- refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ Path refUpdates = config.getEventsDirectory().resolve("ref-updates");
+ buildingUpdates = refUpdates.resolve("building");
+ runningUpdates = refUpdates.resolve("running");
+ waitingUpdates = refUpdates.resolve("waiting");
}
- public String persist(ReplicateRefUpdate r) {
- String json = GSON.toJson(r) + "\n";
- String eventKey = sha1(json).name();
- Path file = refUpdates().resolve(eventKey);
-
- if (Files.exists(file)) {
- return eventKey;
- }
-
- try {
- logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
- Files.write(file, json.getBytes(UTF_8));
- } catch (IOException e) {
- logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
- }
- return eventKey;
+ public String create(ReplicateRefUpdate r) {
+ return new Task(r).create();
}
@VisibleForTesting
@@ -89,37 +134,79 @@
this.disableDeleteForTesting = deleteDisabled;
}
- public void delete(ReplicateRefUpdate r) {
- String taskJson = GSON.toJson(r) + "\n";
- String taskKey = sha1(taskJson).name();
- Path file = refUpdates().resolve(taskKey);
-
- if (disableDeleteForTesting) {
- logger.atFine().log("DELETE %s (%s:%s => %s) DISABLED", file, r.project, r.ref, r.uri);
- return;
+ public boolean start(PushOne push) {
+ UriLock lock = new UriLock(push);
+ if (!lock.acquire()) {
+ return false;
}
- try {
- logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
- Files.delete(file);
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
+ boolean started = false;
+ for (String ref : push.getRefs()) {
+ started = new Task(lock, ref).start() || started;
}
+
+ if (!started) { // No tasks left, likely replicated externally
+ lock.release();
+ }
+ return started;
}
- public List<ReplicateRefUpdate> list() {
- ArrayList<ReplicateRefUpdate> result = new ArrayList<>();
- try (DirectoryStream<Path> events = Files.newDirectoryStream(refUpdates())) {
- for (Path e : events) {
- if (Files.isRegularFile(e)) {
- String json = new String(Files.readAllBytes(e), UTF_8);
- result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ public void reset(PushOne push) {
+ UriLock lock = new UriLock(push);
+ for (String ref : push.getRefs()) {
+ new Task(lock, ref).reset();
+ }
+ lock.release();
+ }
+
+ public void resetAll() {
+ try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) {
+ for (Path dir : dirs) {
+ UriLock lock = null;
+ for (ReplicateRefUpdate u : list(dir)) {
+ if (lock == null) {
+ lock = new UriLock(u);
+ }
+ new Task(u).reset();
+ }
+ if (lock != null) {
+ lock.release();
}
}
} catch (IOException e) {
- logger.atSevere().withCause(e).log("Error when firing pending events");
+ logger.atSevere().withCause(e).log("Error while aborting running tasks");
}
- return result;
+ }
+
+ public boolean isWaiting(PushOne push) {
+ return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting);
+ }
+
+ public void finish(PushOne push) {
+ UriLock lock = new UriLock(push);
+ for (ReplicateRefUpdate r : list(lock.runningDir)) {
+ new Task(lock, r.ref).finish();
+ }
+ lock.release();
+ }
+
+ public List<ReplicateRefUpdate> listWaiting() {
+ return list(createDir(waitingUpdates));
+ }
+
+ private List<ReplicateRefUpdate> list(Path tasks) {
+ List<ReplicateRefUpdate> results = new ArrayList<>();
+ try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
+ for (Path e : events) {
+ if (Files.isRegularFile(e)) {
+ String json = new String(Files.readAllBytes(e), UTF_8);
+ results.add(GSON.fromJson(json, ReplicateRefUpdate.class));
+ }
+ }
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while listing tasks");
+ }
+ return results;
}
@SuppressWarnings("deprecation")
@@ -127,11 +214,140 @@
return ObjectId.fromRaw(Hashing.sha1().hashString(s, UTF_8).asBytes());
}
- private Path refUpdates() {
+ private static Path createDir(Path dir) {
try {
- return Files.createDirectories(refUpdates);
+ return Files.createDirectories(dir);
} catch (IOException e) {
- throw new ProvisionException(String.format("Couldn't create %s", refUpdates), e);
+ throw new ProvisionException(String.format("Couldn't create %s", dir), e);
+ }
+ }
+
+ private class UriLock {
+ public final UriUpdate update;
+ public final String uriKey;
+ public final Path runningDir;
+
+ public UriLock(PushOne push) {
+ this(new UriUpdate(push));
+ }
+
+ public UriLock(UriUpdate update) {
+ this.update = update;
+ uriKey = sha1(update.uri).name();
+ runningDir = createDir(runningUpdates).resolve(uriKey);
+ }
+
+ public boolean acquire() {
+ try {
+ logger.atFine().log("MKDIR %s %s", runningDir, updateLog());
+ Files.createDirectory(runningDir);
+ return true;
+ } catch (FileAlreadyExistsException e) {
+ return false; // already running, likely externally
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey);
+ return true; // safer to risk a duplicate than to skip it
+ }
+ }
+
+ public void release() {
+ if (disableDeleteForTesting) {
+ logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog());
+ return;
+ }
+
+ try {
+ logger.atFine().log("DELETE %s %s", runningDir, updateLog());
+ Files.delete(runningDir);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey);
+ }
+ }
+
+ private String updateLog() {
+ return String.format("(%s => %s)", update.project, update.uri);
+ }
+ }
+
+ private class Task {
+ public final ReplicateRefUpdate update;
+ public final String json;
+ public final String taskKey;
+ public final Path running;
+ public final Path waiting;
+
+ public Task(ReplicateRefUpdate r) {
+ this(new UriLock(r), r.ref);
+ }
+
+ public Task(PushOne push, String ref) {
+ this(new UriLock(push), ref);
+ }
+
+ public Task(UriLock lock, String ref) {
+ update = new ReplicateRefUpdate(lock.update, ref);
+ json = GSON.toJson(update) + "\n";
+ String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote;
+ taskKey = sha1(key).name();
+ running = lock.runningDir.resolve(taskKey);
+ waiting = createDir(waitingUpdates).resolve(taskKey);
+ }
+
+ public String create() {
+ if (Files.exists(waiting)) {
+ return taskKey;
+ }
+
+ try {
+ Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null);
+ logger.atFine().log("CREATE %s %s", tmp, updateLog());
+ Files.write(tmp, json.getBytes(UTF_8));
+ logger.atFine().log("RENAME %s %s %s", tmp, waiting, updateLog());
+ rename(tmp, waiting);
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log("Couldn't create task %s", json);
+ }
+ return taskKey;
+ }
+
+ public boolean start() {
+ rename(waiting, running);
+ return Files.exists(running);
+ }
+
+ public void reset() {
+ rename(running, waiting);
+ }
+
+ public boolean isWaiting() {
+ return Files.exists(waiting);
+ }
+
+ public void finish() {
+ if (disableDeleteForTesting) {
+ logger.atFine().log("DELETE %s %s DISABLED", running, updateLog());
+ return;
+ }
+
+ try {
+ logger.atFine().log("DELETE %s %s", running, updateLog());
+ Files.delete(running);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey);
+ }
+ }
+
+ private void rename(Path from, Path to) {
+ try {
+ logger.atFine().log("RENAME %s to %s %s", from, to, updateLog());
+ Files.move(from, to, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while renaming task %s", taskKey);
+ }
+ }
+
+ private String updateLog() {
+ return String.format("(%s:%s => %s)", update.project, update.ref, update.uri);
}
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
index bcb1e2f..f7071d8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@
return new RunwayStatus(false, inFlightPushId);
}
+ public static RunwayStatus deniedExternal() {
+ return new RunwayStatus(false, -1);
+ }
+
private final boolean allowed;
private final int inFlightPushId;
@@ -43,6 +47,10 @@
return !allowed && inFlightPushId == 0;
}
+ public boolean isExternalInflight() {
+ return !allowed && inFlightPushId == -1;
+ }
+
public int getInFlightPushId() {
return inFlightPushId;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
index 18a4cc2..ed15b92 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SecureCredentialsFactory.java
@@ -26,11 +26,11 @@
import org.eclipse.jgit.util.FS;
/** Looks up a remote's password in secure.config. */
-class SecureCredentialsFactory implements CredentialsFactory {
+public class SecureCredentialsFactory implements CredentialsFactory {
private final Config config;
@Inject
- SecureCredentialsFactory(SitePaths site) throws ConfigInvalidException, IOException {
+ public SecureCredentialsFactory(SitePaths site) throws ConfigInvalidException, IOException {
config = load(site);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
index 70452b4..ffa6be1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -16,8 +16,8 @@
import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicItem;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.ioutil.HexFormat;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 32bc630..19a478d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@
: Timeout for SSH connections. If 0, there is no timeout and
the client waits indefinitely. By default, 2 minutes.
+replication.distributionInterval
+: Interval in seconds for running the replication distributor. When
+ run, the replication distributor will add all persisted waiting tasks
+ to the queue to ensure that externally loaded tasks are visible to
+ the current process. If zero, turn off the replication distributor. By
+ default, zero.
+
+ Turning this on is likely only useful when there are other processes
+ (such as other masters in the same cluster) writing to the same
+ persistence store. To ensure that updates are seen well before their
+ replicationDelay expires when the distributor is used, the recommended
+ value for this is approximately the smallest remote.NAME.replicationDelay
+ divided by 5.
+
replication.lockErrorMaxRetries
: Number of times to retry a replication operation if a lock
error is detected.
@@ -142,6 +156,11 @@
persisted events will not be deleted. When the plugin is started again,
it will trigger all replications found under this directory.
+ For replication to work, is is important that atomic renames be possible
+ from within any subdirectory of the eventsDirectory to within any other
+ subdirectory of the eventsDirectory. This generally means that the entire
+ contents of the eventsDirectory should live on the same filesystem.
+
When not set, defaults to the plugin's data directory.
remote.NAME.url
@@ -419,6 +438,15 @@
By default, replicates without matching, i.e. replicates
everything to all remotes.
+remote.NAME.slowLatencyThreshold
+: the time duration after which the replication of a project to this
+ destination will be considered "slow". A slow project replication
+ will cause additional metrics to be exposed for further investigation.
+ See [metrics.md](metrics.md) for further details.
+
+ default: 15 minutes
+
+
File `secure.config`
--------------------
diff --git a/src/main/resources/Documentation/metrics.md b/src/main/resources/Documentation/metrics.md
new file mode 100644
index 0000000..ef8b150
--- /dev/null
+++ b/src/main/resources/Documentation/metrics.md
@@ -0,0 +1,61 @@
+# Metrics
+
+Some metrics are emitted when replication occurs to a remote destination.
+The granularity of the metrics recorded is at destination level, however when a particular project replication is flagged
+as slow. This happens when the replication took longer than allowed threshold (see _remote.NAME.slowLatencyThreshold_ in [config.md](config.md))
+
+The reason only slow metrics are published, rather than all, is to contain their number, which, on a big Gerrit installation
+could potentially be considerably big.
+
+### Project level
+
+* plugins_replication_latency_slower_than_<threshold>_<destinationName>_<ProjectName> - Time spent pushing <ProjectName> to remote <destinationName> (in ms)
+
+### Destination level
+
+* plugins_replication_replication_delay_<destinationName> - Time spent waiting before pushing to remote <destinationName> (in ms)
+* plugins_replication_replication_retries_<destinationName> - Number of retries when pushing to remote <destinationName>
+* plugins_replication_replication_latency_<destinationName> - Time spent pushing to remote <destinationName> (in ms)
+
+### Example
+```
+# HELP plugins_replication_replication_delay_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_delay/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_delay_destination summary
+plugins_replication_replication_delay_destinationName{quantile="0.5",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.75",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.95",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.98",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.99",} 65726.0
+plugins_replication_replication_delay_destinationName{quantile="0.999",} 65726.0
+plugins_replication_replication_delay_destinationName_count 3.0
+
+# HELP plugins_replication_replication_retries_destination Generated from Dropwizard metric import (metric=plugins/replication/replication_retries/destination, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_replication_retries_destination summary
+plugins_replication_replication_retries_destinationName{quantile="0.5",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.75",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.95",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.98",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.99",} 1.0
+plugins_replication_replication_retries_destinationName{quantile="0.999",} 1.0
+plugins_replication_replication_retries_destinationName_count 3.0
+
+# HELP plugins_replication_replication_latency_destinationName Generated from Dropwizard metric import (metric=plugins/replication/replication_latency/destinationName, type=com.codahale.metrics.Timer)
+# TYPE plugins_replication_replication_latency_destinationName summary
+plugins_replication_replication_latency_destinationName{quantile="0.5",} 0.21199641400000002
+plugins_replication_replication_latency_destinationName{quantile="0.75",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.95",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.98",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.99",} 0.321083881
+plugins_replication_replication_latency_destinationName{quantile="0.999",} 0.321083881
+plugins_replication_replication_latency_destinationName_count 2.0
+
+# HELP plugins_replication_latency_slower_than_60_destinationName_projectName Generated from Dropwizard metric import (metric=plugins/replication/latency_slower_than/60/destinationName/projectName, type=com.codahale.metrics.Histogram)
+# TYPE plugins_replication_latency_slower_than_60_destinationName_projectName summary
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.5",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.75",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.95",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.98",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.99",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName{quantile="0.999",} 278.0
+plugins_replication_latency_slower_than_60_destinationName_projectName 1.0
+```
\ No newline at end of file
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
index 77dc1cc..3932fb3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AbstractConfigTest.java
@@ -16,26 +16,28 @@
import static com.google.common.truth.Truth.assertThat;
import static java.nio.file.Files.createTempDirectory;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.isA;
-import static org.easymock.EasyMock.replay;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import com.google.common.eventbus.EventBus;
import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Injector;
import com.google.inject.Module;
+import com.google.inject.util.Providers;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
-import org.easymock.IAnswer;
+import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
import org.junit.Before;
import org.junit.Ignore;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
@Ignore
public abstract class AbstractConfigTest {
@@ -43,6 +45,10 @@
protected final SitePaths sitePaths;
protected final Destination.Factory destinationFactoryMock;
protected final Path pluginDataPath;
+ protected ReplicationQueue replicationQueueMock;
+ protected WorkQueue workQueueMock;
+ protected EventBus eventBus = new EventBus();
+ protected FakeExecutorService executorService = new FakeExecutorService();
static class FakeDestination extends Destination {
public final DestinationConfiguration config;
@@ -53,11 +59,9 @@
}
private static Injector injectorMock() {
- Injector injector = createNiceMock(Injector.class);
- Injector childInjectorMock = createNiceMock(Injector.class);
- expect(injector.createChildInjector((Module) anyObject())).andReturn(childInjectorMock);
- replay(childInjectorMock);
- replay(injector);
+ Injector injector = mock(Injector.class);
+ Injector childInjectorMock = mock(Injector.class);
+ when(injector.createChildInjector(any(Module.class))).thenReturn(childInjectorMock);
return injector;
}
}
@@ -66,21 +70,25 @@
sitePath = createTempPath("site");
sitePaths = new SitePaths(sitePath);
pluginDataPath = createTempPath("data");
- destinationFactoryMock = createMock(Destination.Factory.class);
+ destinationFactoryMock = mock(Destination.Factory.class);
}
@Before
public void setup() {
- expect(destinationFactoryMock.create(isA(DestinationConfiguration.class)))
- .andAnswer(
- new IAnswer<Destination>() {
+ when(destinationFactoryMock.create(any(DestinationConfiguration.class)))
+ .thenAnswer(
+ new Answer<Destination>() {
@Override
- public Destination answer() throws Throwable {
- return new FakeDestination((DestinationConfiguration) getCurrentArguments()[0]);
+ public Destination answer(InvocationOnMock invocation) throws Throwable {
+ return new FakeDestination((DestinationConfiguration) invocation.getArguments()[0]);
}
- })
- .anyTimes();
- replay(destinationFactoryMock);
+ });
+
+ replicationQueueMock = mock(ReplicationQueue.class);
+ when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+
+ workQueueMock = mock(WorkQueue.class);
+ when(workQueueMock.createQueue(anyInt(), any(String.class))).thenReturn(executorService);
}
protected static Path createTempPath(String prefix) throws IOException {
@@ -113,4 +121,13 @@
assertThatIsDestination(matchingDestinations.get(0), remoteName, remoteUrls);
}
+
+ protected DestinationsCollection newDestinationsCollections(
+ ReplicationFileBasedConfig replicationFileBasedConfig) throws ConfigInvalidException {
+ return new DestinationsCollection(
+ destinationFactoryMock,
+ Providers.of(replicationQueueMock),
+ replicationFileBasedConfig,
+ eventBus);
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
index 211cafa..d85f622 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecoratorTest.java
@@ -15,127 +15,19 @@
package com.googlesource.gerrit.plugins.replication;
import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.util.Providers;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.junit.Before;
import org.junit.Test;
public class AutoReloadConfigDecoratorTest extends AbstractConfigTest {
- private AutoReloadConfigDecorator autoReloadConfig;
- private ReplicationQueue replicationQueueMock;
- private WorkQueue workQueueMock;
- private FakeExecutorService executorService = new FakeExecutorService();
-
- public class FakeExecutorService implements ScheduledExecutorService {
- public Runnable refreshCommand;
-
- @Override
- public void shutdown() {}
-
- @Override
- public List<Runnable> shutdownNow() {
- return null;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return null;
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return null;
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- return null;
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- return null;
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(
- Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException {
- return null;
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- return null;
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return null;
- }
-
- @Override
- public void execute(Runnable command) {}
-
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- return null;
- }
-
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- return null;
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(
- Runnable command, long initialDelay, long period, TimeUnit unit) {
- refreshCommand = command;
- return null;
- }
-
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(
- Runnable command, long initialDelay, long delay, TimeUnit unit) {
- return null;
- }
- }
+ ReplicationFileBasedConfig replicationFileBasedConfig;
public AutoReloadConfigDecoratorTest() throws IOException {
super();
@@ -146,39 +38,7 @@
public void setup() {
super.setup();
- setupMocks();
- }
-
- private void setupMocks() {
- replicationQueueMock = createNiceMock(ReplicationQueue.class);
- expect(replicationQueueMock.isRunning()).andReturn(true);
- replay(replicationQueueMock);
-
- workQueueMock = createNiceMock(WorkQueue.class);
- expect(workQueueMock.createQueue(anyInt(), anyObject(String.class))).andReturn(executorService);
- replay(workQueueMock);
- }
-
- @Test
- public void shouldLoadNotEmptyInitialReplicationConfig() throws Exception {
- FileBasedConfig replicationConfig = newReplicationConfig();
- String remoteName = "foo";
- String remoteUrl = "ssh://git@git.somewhere.com/${name}";
- replicationConfig.setString("remote", remoteName, "url", remoteUrl);
- replicationConfig.save();
-
- autoReloadConfig =
- new AutoReloadConfigDecorator(
- sitePaths,
- destinationFactoryMock,
- Providers.of(replicationQueueMock),
- pluginDataPath,
- "replication",
- workQueueMock);
-
- List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
- assertThat(destinations).hasSize(1);
- assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
+ replicationFileBasedConfig = newReplicationFileBasedConfig();
}
@Test
@@ -190,17 +50,12 @@
replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
replicationConfig.save();
- autoReloadConfig =
- new AutoReloadConfigDecorator(
- sitePaths,
- destinationFactoryMock,
- Providers.of(replicationQueueMock),
- pluginDataPath,
- "replication",
- workQueueMock);
- autoReloadConfig.startup(workQueueMock);
+ newAutoReloadConfig().start();
- List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+ DestinationsCollection destinationsCollections =
+ newDestinationsCollections(replicationFileBasedConfig);
+ destinationsCollections.startup(workQueueMock);
+ List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
assertThat(destinations).hasSize(1);
assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
@@ -212,7 +67,7 @@
replicationConfig.save();
executorService.refreshCommand.run();
- destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+ destinations = destinationsCollections.getAll(FilterType.ALL);
assertThat(destinations).hasSize(2);
assertThatContainsDestination(destinations, remoteName1, remoteUrl1);
assertThatContainsDestination(destinations, remoteName2, remoteUrl2);
@@ -227,17 +82,10 @@
replicationConfig.setString("remote", remoteName1, "url", remoteUrl1);
replicationConfig.save();
- autoReloadConfig =
- new AutoReloadConfigDecorator(
- sitePaths,
- destinationFactoryMock,
- Providers.of(replicationQueueMock),
- pluginDataPath,
- "replication",
- workQueueMock);
- autoReloadConfig.startup(workQueueMock);
-
- List<Destination> destinations = autoReloadConfig.getDestinations(FilterType.ALL);
+ DestinationsCollection destinationsCollections =
+ newDestinationsCollections(replicationFileBasedConfig);
+ destinationsCollections.startup(workQueueMock);
+ List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
assertThat(destinations).hasSize(1);
assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
@@ -247,6 +95,23 @@
replicationConfig.save();
executorService.refreshCommand.run();
- assertThat(autoReloadConfig.getDestinations(FilterType.ALL)).isEqualTo(destinations);
+ assertThat(destinationsCollections.getAll(FilterType.ALL)).isEqualTo(destinations);
+ }
+
+ private AutoReloadConfigDecorator newAutoReloadConfig() throws ConfigInvalidException {
+ AutoReloadRunnable autoReloadRunnable =
+ new AutoReloadRunnable(
+ newDestinationsCollections(replicationFileBasedConfig),
+ replicationFileBasedConfig,
+ sitePaths,
+ pluginDataPath,
+ eventBus,
+ Providers.of(replicationQueueMock));
+ return new AutoReloadConfigDecorator(
+ "replication", workQueueMock, replicationFileBasedConfig, autoReloadRunnable, eventBus);
+ }
+
+ private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
+ return new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
new file mode 100644
index 0000000..b1aa7c8
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/AutoReloadRunnableTest.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.util.Providers;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AutoReloadRunnableTest {
+
+ private SitePaths sitePaths;
+ private EventBus eventBus;
+ private ReloadTrackerSubscriber onReloadSubscriber;
+ private String pluginName;
+ private ReplicationQueue replicationQueueMock;
+
+ @Before
+ public void setUp() throws IOException {
+ Path tmp = Files.createTempFile(pluginName, "_site");
+ Files.deleteIfExists(tmp);
+ sitePaths = new SitePaths(tmp);
+ pluginName = "replication";
+ eventBus = new EventBus();
+ onReloadSubscriber = new ReloadTrackerSubscriber();
+ eventBus.register(onReloadSubscriber);
+
+ replicationQueueMock = mock(ReplicationQueue.class);
+ when(replicationQueueMock.isRunning()).thenReturn(Boolean.TRUE);
+ }
+
+ @Test
+ public void configurationIsReloadedWhenValidationSucceeds() {
+ ReplicationConfigValidator validator = new TestValidConfigurationListener();
+
+ attemptAutoReload(validator);
+
+ assertThat(onReloadSubscriber.reloaded).isTrue();
+ }
+
+ @Test
+ public void configurationIsNotReloadedWhenValidationFails() {
+ ReplicationConfigValidator validator = new TestInvalidConfigurationListener();
+
+ attemptAutoReload(validator);
+
+ assertThat(onReloadSubscriber.reloaded).isFalse();
+ }
+
+ private void attemptAutoReload(ReplicationConfigValidator validator) {
+ final AutoReloadRunnable autoReloadRunnable =
+ new AutoReloadRunnable(
+ validator,
+ newVersionConfig(),
+ sitePaths,
+ sitePaths.data_dir,
+ eventBus,
+ Providers.of(replicationQueueMock));
+
+ autoReloadRunnable.run();
+ }
+
+ private ReplicationFileBasedConfig newVersionConfig() {
+ return new ReplicationFileBasedConfig(sitePaths, sitePaths.data_dir) {
+ @Override
+ public String getVersion() {
+ return String.format("%s", System.nanoTime());
+ }
+ };
+ }
+
+ private static class ReloadTrackerSubscriber {
+ public boolean reloaded = false;
+
+ @Subscribe
+ public void onReload(
+ @SuppressWarnings("unused") List<DestinationConfiguration> destinationConfigurations) {
+ reloaded = true;
+ }
+ }
+
+ private static class TestValidConfigurationListener implements ReplicationConfigValidator {
+ @Override
+ public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig) {
+ return Collections.emptyList();
+ }
+ }
+
+ private static class TestInvalidConfigurationListener implements ReplicationConfigValidator {
+ @Override
+ public List<RemoteConfiguration> validateConfig(
+ ReplicationFileBasedConfig configurationChangeEvent) throws ConfigInvalidException {
+ throw new ConfigInvalidException("expected test failure");
+ }
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
new file mode 100644
index 0000000..2f7059e
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/FakeExecutorService.java
@@ -0,0 +1,118 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FakeExecutorService implements ScheduledExecutorService {
+ public Runnable refreshCommand = () -> {};
+
+ @Override
+ public void shutdown() {}
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return false;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return null;
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return null;
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+
+ @Override
+ public void execute(Runnable command) {}
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ refreshCommand = command;
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return null;
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
index 0f6d629..2ee9a39 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/GitUpdateProcessingTest.java
@@ -14,11 +14,10 @@
package com.googlesource.gerrit.plugins.replication;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import com.google.gerrit.server.events.EventDispatcher;
import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -36,14 +35,12 @@
@Before
public void setUp() throws Exception {
- dispatcherMock = createMock(EventDispatcher.class);
- replay(dispatcherMock);
+ dispatcherMock = mock(EventDispatcher.class);
gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
}
@Test
public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
- reset(dispatcherMock);
RefReplicatedEvent expectedEvent =
new RefReplicatedEvent(
"someProject",
@@ -51,9 +48,6 @@
"someHost",
RefPushResult.SUCCEEDED,
RemoteRefUpdate.Status.OK);
- dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
- expectLastCall().once();
- replay(dispatcherMock);
gitUpdateProcessing.onRefReplicatedToOneNode(
"someProject",
@@ -61,12 +55,11 @@
new URIish("git://someHost/someProject.git"),
RefPushResult.SUCCEEDED,
RemoteRefUpdate.Status.OK);
- verify(dispatcherMock);
+ verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
}
@Test
public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
- reset(dispatcherMock);
RefReplicatedEvent expectedEvent =
new RefReplicatedEvent(
"someProject",
@@ -74,9 +67,6 @@
"someHost",
RefPushResult.FAILED,
RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
- dispatcherMock.postEvent(RefReplicatedEventEquals.eqEvent(expectedEvent));
- expectLastCall().once();
- replay(dispatcherMock);
gitUpdateProcessing.onRefReplicatedToOneNode(
"someProject",
@@ -84,19 +74,15 @@
new URIish("git://someHost/someProject.git"),
RefPushResult.FAILED,
RemoteRefUpdate.Status.REJECTED_NONFASTFORWARD);
- verify(dispatcherMock);
+ verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
}
@Test
public void onAllNodesReplicated() throws PermissionBackendException {
- reset(dispatcherMock);
RefReplicationDoneEvent expectedDoneEvent =
new RefReplicationDoneEvent("someProject", "refs/heads/master", 5);
- dispatcherMock.postEvent(RefReplicationDoneEventEquals.eqEvent(expectedDoneEvent));
- expectLastCall().once();
- replay(dispatcherMock);
gitUpdateProcessing.onRefReplicatedToAllNodes("someProject", "refs/heads/master", 5);
- verify(dispatcherMock);
+ verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index c010ddb..c09fcd1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -14,19 +14,18 @@
package com.googlesource.gerrit.plugins.replication;
-import static com.googlesource.gerrit.plugins.replication.RemoteRefUpdateCollectionMatcher.eqRemoteRef;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.getCurrentArguments;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.eclipse.jgit.lib.Ref.Storage.NEW;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.metrics.Timer1;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.PerThreadRequestScope;
import com.google.gerrit.server.permissions.PermissionBackend;
@@ -45,8 +44,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import junit.framework.AssertionFailedError;
-import org.easymock.IAnswer;
import org.eclipse.jgit.errors.NotSupportedException;
import org.eclipse.jgit.errors.TransportException;
import org.eclipse.jgit.lib.Config;
@@ -68,6 +65,8 @@
import org.eclipse.jgit.util.FS;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class PushOneTest {
private static final int TEST_PUSH_TIMEOUT_SECS = 10;
@@ -86,7 +85,7 @@
private IdGenerator idGeneratorMock;
private ReplicationStateListeners replicationStateListenersMock;
private ReplicationMetrics replicationMetricsMock;
- private Timer1.Context timerContextMock;
+ private Timer1.Context<String> timerContextMock;
private ProjectCache projectCacheMock;
private TransportFactory transportFactoryMock;
private Transport transportMock;
@@ -108,7 +107,7 @@
@Before
public void setup() throws Exception {
- projectNameKey = new Project.NameKey("fooProject");
+ projectNameKey = Project.nameKey("fooProject");
urish = new URIish("http://foo.com/fooProject.git");
newLocalRef =
@@ -126,6 +125,7 @@
setupMocks();
}
+ @SuppressWarnings("unchecked")
private void setupMocks() throws Exception {
FileBasedConfig config = new FileBasedConfig(new Config(), new File("/foo"), FS.DETECTED);
config.setString("remote", "Replication", "push", "foo");
@@ -134,8 +134,8 @@
setupRepositoryMock(config);
setupGitRepoManagerMock();
- projectStateMock = createNiceMock(ProjectState.class);
- forProjectMock = createNiceMock(ForProject.class);
+ projectStateMock = mock(ProjectState.class);
+ forProjectMock = mock(ForProject.class);
setupWithUserMock();
setupPermissionBackedMock();
@@ -144,46 +144,22 @@
setupRefSpecMock();
setupRemoteConfigMock();
- credentialsFactory = createNiceMock(CredentialsFactory.class);
+ credentialsFactory = mock(CredentialsFactory.class);
setupFetchConnectionMock();
setupPushConnectionMock();
setupRequestScopeMock();
- idGeneratorMock = createNiceMock(IdGenerator.class);
- replicationStateListenersMock = createNiceMock(ReplicationStateListeners.class);
+ idGeneratorMock = mock(IdGenerator.class);
+ replicationStateListenersMock = mock(ReplicationStateListeners.class);
- timerContextMock = createNiceMock(Timer1.Context.class);
+ timerContextMock = mock(Timer1.Context.class);
setupReplicationMetricsMock();
setupTransportMock();
setupProjectCacheMock();
- replicationConfigMock = createNiceMock(ReplicationConfig.class);
-
- replay(
- gitRepositoryManagerMock,
- refUpdateMock,
- repositoryMock,
- permissionBackendMock,
- destinationMock,
- remoteConfigMock,
- credentialsFactory,
- threadRequestScoperMock,
- idGeneratorMock,
- replicationStateListenersMock,
- replicationMetricsMock,
- projectCacheMock,
- timerContextMock,
- transportFactoryMock,
- projectStateMock,
- withUserMock,
- forProjectMock,
- fetchConnection,
- pushConnection,
- refSpecMock,
- refDatabaseMock,
- replicationConfigMock);
+ replicationConfigMock = mock(ReplicationConfig.class);
}
@Test
@@ -212,10 +188,7 @@
PushResult pushResult = new PushResult();
- expect(transportMock.push(anyObject(), eqRemoteRef(expectedUpdates)))
- .andReturn(pushResult)
- .once();
- replay(transportMock);
+ when(transportMock.push(any(), eq(expectedUpdates))).thenReturn(pushResult);
PushOne pushOne = createPushOne(replicationPushFilter);
@@ -223,8 +196,6 @@
pushOne.run();
isCallFinished.await(TEST_PUSH_TIMEOUT_SECS, TimeUnit.SECONDS);
-
- verify(transportMock);
}
@Test
@@ -241,12 +212,6 @@
}
});
- // easymock way to check if method was never called
- expect(transportMock.push(anyObject(), anyObject()))
- .andThrow(new AssertionFailedError())
- .anyTimes();
- replay(transportMock);
-
PushOne pushOne = createPushOne(replicationPushFilter);
pushOne.addRef(PushOne.ALL_REFS);
@@ -254,7 +219,7 @@
isCallFinished.await(10, TimeUnit.SECONDS);
- verify(transportMock);
+ verify(transportMock, never()).push(any(), any());
}
private PushOne createPushOne(DynamicItem<ReplicationPushFilter> replicationPushFilter) {
@@ -281,31 +246,31 @@
}
private void setupProjectCacheMock() throws IOException {
- projectCacheMock = createNiceMock(ProjectCache.class);
- expect(projectCacheMock.checkedGet(projectNameKey)).andReturn(projectStateMock);
+ projectCacheMock = mock(ProjectCache.class);
+ when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock);
}
private void setupTransportMock() throws NotSupportedException, TransportException {
- transportMock = createNiceMock(Transport.class);
- expect(transportMock.openFetch()).andReturn(fetchConnection);
- transportFactoryMock = createNiceMock(TransportFactory.class);
- expect(transportFactoryMock.open(repositoryMock, urish)).andReturn(transportMock).anyTimes();
+ transportMock = mock(Transport.class);
+ when(transportMock.openFetch()).thenReturn(fetchConnection);
+ transportFactoryMock = mock(TransportFactory.class);
+ when(transportFactoryMock.open(repositoryMock, urish)).thenReturn(transportMock);
}
private void setupReplicationMetricsMock() {
- replicationMetricsMock = createNiceMock(ReplicationMetrics.class);
- expect(replicationMetricsMock.start(anyObject())).andReturn(timerContextMock);
+ replicationMetricsMock = mock(ReplicationMetrics.class);
+ when(replicationMetricsMock.start(any())).thenReturn(timerContextMock);
}
private void setupRequestScopeMock() {
- threadRequestScoperMock = createNiceMock(PerThreadRequestScope.Scoper.class);
- expect(threadRequestScoperMock.scope(anyObject()))
- .andAnswer(
- new IAnswer<Callable<Object>>() {
+ threadRequestScoperMock = mock(PerThreadRequestScope.Scoper.class);
+ when(threadRequestScoperMock.scope(any()))
+ .thenAnswer(
+ new Answer<Callable<Object>>() {
@SuppressWarnings("unchecked")
@Override
- public Callable<Object> answer() throws Throwable {
- Callable<Object> originalCall = (Callable<Object>) getCurrentArguments()[0];
+ public Callable<Object> answer(InvocationOnMock invocation) throws Throwable {
+ Callable<Object> originalCall = (Callable<Object>) invocation.getArguments()[0];
return new Callable<Object>() {
@Override
@@ -316,66 +281,64 @@
}
};
}
- })
- .anyTimes();
+ });
}
private void setupPushConnectionMock() {
- pushConnection = createNiceMock(PushConnection.class);
- expect(pushConnection.getRefsMap()).andReturn(remoteRefs);
+ pushConnection = mock(PushConnection.class);
+ when(pushConnection.getRefsMap()).thenReturn(remoteRefs);
}
private void setupFetchConnectionMock() {
- fetchConnection = createNiceMock(FetchConnection.class);
- expect(fetchConnection.getRefsMap()).andReturn(remoteRefs);
+ fetchConnection = mock(FetchConnection.class);
+ when(fetchConnection.getRefsMap()).thenReturn(remoteRefs);
}
private void setupRemoteConfigMock() {
- remoteConfigMock = createNiceMock(RemoteConfig.class);
- expect(remoteConfigMock.getPushRefSpecs()).andReturn(ImmutableList.of(refSpecMock));
+ remoteConfigMock = mock(RemoteConfig.class);
+ when(remoteConfigMock.getPushRefSpecs()).thenReturn(ImmutableList.of(refSpecMock));
}
private void setupRefSpecMock() {
- refSpecMock = createNiceMock(RefSpec.class);
- expect(refSpecMock.matchSource(anyObject(String.class))).andReturn(true);
- expect(refSpecMock.expandFromSource(anyObject(String.class))).andReturn(refSpecMock);
- expect(refSpecMock.getDestination()).andReturn("fooProject").anyTimes();
- expect(refSpecMock.isForceUpdate()).andReturn(false).anyTimes();
+ refSpecMock = mock(RefSpec.class);
+ when(refSpecMock.matchSource(any(String.class))).thenReturn(true);
+ when(refSpecMock.expandFromSource(any(String.class))).thenReturn(refSpecMock);
+ when(refSpecMock.getDestination()).thenReturn("fooProject");
+ when(refSpecMock.isForceUpdate()).thenReturn(false);
}
private void setupDestinationMock() {
- destinationMock = createNiceMock(Destination.class);
- expect(destinationMock.requestRunway(anyObject())).andReturn(RunwayStatus.allowed());
+ destinationMock = mock(Destination.class);
+ when(destinationMock.requestRunway(any())).thenReturn(RunwayStatus.allowed());
}
private void setupPermissionBackedMock() {
- permissionBackendMock = createNiceMock(PermissionBackend.class);
- expect(permissionBackendMock.currentUser()).andReturn(withUserMock);
+ permissionBackendMock = mock(PermissionBackend.class);
+ when(permissionBackendMock.currentUser()).thenReturn(withUserMock);
}
private void setupWithUserMock() {
- withUserMock = createNiceMock(WithUser.class);
- expect(withUserMock.project(projectNameKey)).andReturn(forProjectMock);
+ withUserMock = mock(WithUser.class);
+ when(withUserMock.project(projectNameKey)).thenReturn(forProjectMock);
}
private void setupGitRepoManagerMock() throws IOException {
- gitRepositoryManagerMock = createNiceMock(GitRepositoryManager.class);
- expect(gitRepositoryManagerMock.openRepository(projectNameKey)).andReturn(repositoryMock);
+ gitRepositoryManagerMock = mock(GitRepositoryManager.class);
+ when(gitRepositoryManagerMock.openRepository(projectNameKey)).thenReturn(repositoryMock);
}
private void setupRepositoryMock(FileBasedConfig config) throws IOException {
- repositoryMock = createNiceMock(Repository.class);
- refDatabaseMock = createNiceMock(RefDatabase.class);
- expect(repositoryMock.getConfig()).andReturn(config).anyTimes();
- expect(repositoryMock.getRefDatabase()).andReturn(refDatabaseMock);
- expect(refDatabaseMock.getRefs()).andReturn(localRefs);
- expect(repositoryMock.updateRef("fooProject")).andReturn(refUpdateMock);
+ repositoryMock = mock(Repository.class);
+ refDatabaseMock = mock(RefDatabase.class);
+ when(repositoryMock.getConfig()).thenReturn(config);
+ when(repositoryMock.getRefDatabase()).thenReturn(refDatabaseMock);
+ when(refDatabaseMock.getRefs()).thenReturn(localRefs);
+ when(repositoryMock.updateRef("fooProject")).thenReturn(refUpdateMock);
}
private void setupRefUpdateMock() {
- refUpdateMock = createNiceMock(RefUpdate.class);
- expect(refUpdateMock.getOldObjectId())
- .andReturn(ObjectId.fromString("0000000000000000000000000000000000000001"))
- .anyTimes();
+ refUpdateMock = mock(RefUpdate.class);
+ when(refUpdateMock.getOldObjectId())
+ .thenReturn(ObjectId.fromString("0000000000000000000000000000000000000001"));
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
deleted file mode 100644
index 983e97f..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEventEquals.java
+++ /dev/null
@@ -1,83 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicatedEventEquals implements IArgumentMatcher {
-
- private RefReplicatedEvent expected;
-
- public RefReplicatedEventEquals(RefReplicatedEvent expected) {
- this.expected = expected;
- }
-
- public static final RefReplicatedEvent eqEvent(RefReplicatedEvent refReplicatedEvent) {
- EasyMock.reportMatcher(new RefReplicatedEventEquals(refReplicatedEvent));
- return null;
- }
-
- @Override
- public boolean matches(Object actual) {
- if (!(actual instanceof RefReplicatedEvent)) {
- return false;
- }
- RefReplicatedEvent actualRefReplicatedEvent = (RefReplicatedEvent) actual;
- if (!equals(expected.project, actualRefReplicatedEvent.project)) {
- return false;
- }
- if (!equals(expected.ref, actualRefReplicatedEvent.ref)) {
- return false;
- }
- if (!equals(expected.targetNode, actualRefReplicatedEvent.targetNode)) {
- return false;
- }
- if (!equals(expected.status, actualRefReplicatedEvent.status)) {
- return false;
- }
- if (!equals(expected.refStatus, actualRefReplicatedEvent.refStatus)) {
- return false;
- }
- return true;
- }
-
- private static boolean equals(Object object1, Object object2) {
- if (object1 == object2) {
- return true;
- }
- if (object1 != null && !object1.equals(object2)) {
- return false;
- }
- return true;
- }
-
- @Override
- public void appendTo(StringBuffer buffer) {
- buffer.append("eqEvent(");
- buffer.append(expected.getClass().getName());
- buffer.append(" with project \"");
- buffer.append(expected.project);
- buffer.append("\" and ref \"");
- buffer.append(expected.ref);
- buffer.append("\" and targetNode \"");
- buffer.append(expected.targetNode);
- buffer.append("\" and status \"");
- buffer.append(expected.status);
- buffer.append("\" and refStatus \"");
- buffer.append(expected.refStatus);
- buffer.append("\")");
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
deleted file mode 100644
index d1284e1..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEventEquals.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright (C) 2013 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-public class RefReplicationDoneEventEquals implements IArgumentMatcher {
-
- private RefReplicationDoneEvent expected;
-
- public RefReplicationDoneEventEquals(RefReplicationDoneEvent expected) {
- this.expected = expected;
- }
-
- public static final RefReplicationDoneEvent eqEvent(RefReplicationDoneEvent refReplicatedEvent) {
- EasyMock.reportMatcher(new RefReplicationDoneEventEquals(refReplicatedEvent));
- return null;
- }
-
- @Override
- public boolean matches(Object actual) {
- if (!(actual instanceof RefReplicationDoneEvent)) {
- return false;
- }
- RefReplicationDoneEvent actualRefReplicatedDoneEvent = (RefReplicationDoneEvent) actual;
- if (!equals(expected.project, actualRefReplicatedDoneEvent.project)) {
- return false;
- }
- if (!equals(expected.ref, actualRefReplicatedDoneEvent.ref)) {
- return false;
- }
- if (expected.nodesCount != actualRefReplicatedDoneEvent.nodesCount) {
- return false;
- }
- return true;
- }
-
- private static boolean equals(Object object1, Object object2) {
- if (object1 == object2) {
- return true;
- }
- if (object1 != null && !object1.equals(object2)) {
- return false;
- }
- return true;
- }
-
- @Override
- public void appendTo(StringBuffer buffer) {
- buffer.append("eqEvent(");
- buffer.append(expected.getClass().getName());
- buffer.append(" with project \"");
- buffer.append(expected.project);
- buffer.append("\" and ref \"");
- buffer.append(expected.ref);
- buffer.append("\" and nodesCount \"");
- buffer.append(expected.nodesCount);
- buffer.append("\")");
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java b/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
deleted file mode 100644
index 111a792..0000000
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/RemoteRefUpdateCollectionMatcher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication;
-
-import java.util.Collection;
-import java.util.Objects;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-import org.eclipse.jgit.transport.RemoteRefUpdate;
-
-public class RemoteRefUpdateCollectionMatcher implements IArgumentMatcher {
- Collection<RemoteRefUpdate> expectedRemoteRefs;
-
- public static Collection<RemoteRefUpdate> eqRemoteRef(
- Collection<RemoteRefUpdate> expectedRemoteRefs) {
- EasyMock.reportMatcher(new RemoteRefUpdateCollectionMatcher(expectedRemoteRefs));
- return null;
- }
-
- public RemoteRefUpdateCollectionMatcher(Collection<RemoteRefUpdate> expectedRemoteRefs) {
- this.expectedRemoteRefs = expectedRemoteRefs;
- }
-
- @Override
- public boolean matches(Object argument) {
- if (!(argument instanceof Collection)) return false;
-
- @SuppressWarnings("unchecked")
- Collection<RemoteRefUpdate> refs = (Collection<RemoteRefUpdate>) argument;
-
- if (expectedRemoteRefs.size() != refs.size()) return false;
- return refs.stream()
- .allMatch(
- ref -> expectedRemoteRefs.stream().anyMatch(expectedRef -> compare(ref, expectedRef)));
- }
-
- @Override
- public void appendTo(StringBuffer buffer) {
- buffer.append("expected:" + expectedRemoteRefs.toString());
- }
-
- private boolean compare(RemoteRefUpdate ref, RemoteRefUpdate expectedRef) {
- return Objects.equals(ref.getRemoteName(), expectedRef.getRemoteName())
- && Objects.equals(ref.getStatus(), expectedRef.getStatus())
- && Objects.equals(ref.getExpectedOldObjectId(), expectedRef.getExpectedOldObjectId())
- && Objects.equals(ref.getNewObjectId(), expectedRef.getNewObjectId())
- && Objects.equals(ref.isFastForward(), expectedRef.isFastForward())
- && Objects.equals(ref.getSrcRef(), expectedRef.getSrcRef())
- && Objects.equals(ref.isForceUpdate(), expectedRef.isForceUpdate())
- && Objects.equals(ref.getMessage(), expectedRef.getMessage());
- }
-}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
index 36cc209..f2b027c 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -19,7 +19,6 @@
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
import java.io.IOException;
import java.util.List;
-import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.junit.Test;
@@ -37,8 +36,10 @@
config.setString("remote", remoteName, "url", remoteUrl);
config.save();
- ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
- List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+ DestinationsCollection destinationsCollections =
+ newDestinationsCollections(newReplicationFileBasedConfig());
+ destinationsCollections.startup(workQueueMock);
+ List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
assertThat(destinations).hasSize(1);
assertThatIsDestination(destinations.get(0), remoteName, remoteUrl);
@@ -55,18 +56,19 @@
config.setString("remote", remoteName2, "url", remoteUrl2);
config.save();
- ReplicationFileBasedConfig replicationConfig = newReplicationFileBasedConfig();
- List<Destination> destinations = replicationConfig.getDestinations(FilterType.ALL);
+ DestinationsCollection destinationsCollections =
+ newDestinationsCollections(newReplicationFileBasedConfig());
+ destinationsCollections.startup(workQueueMock);
+ List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL);
assertThat(destinations).hasSize(2);
assertThatIsDestination(destinations.get(0), remoteName1, remoteUrl1);
assertThatIsDestination(destinations.get(1), remoteName2, remoteUrl2);
}
- private ReplicationFileBasedConfig newReplicationFileBasedConfig()
- throws ConfigInvalidException, IOException {
+ private ReplicationFileBasedConfig newReplicationFileBasedConfig() {
ReplicationFileBasedConfig replicationConfig =
- new ReplicationFileBasedConfig(sitePaths, destinationFactoryMock, pluginDataPath);
+ new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
return replicationConfig;
}
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index a6a8ac8..19269f0 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -24,10 +24,10 @@
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginData;
import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.common.ProjectInfo;
-import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.SitePaths;
import com.google.inject.Inject;
import com.google.inject.Key;
@@ -100,7 +100,7 @@
assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
- waitUntil(() -> projectExists(new Project.NameKey(sourceProject + "replica.git")));
+ waitUntil(() -> projectExists(Project.nameKey(sourceProject + "replica.git")));
ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
assertThat(replicaProject).isNotNull();
@@ -115,7 +115,7 @@
Result pushResult = createChange();
RevCommit sourceCommit = pushResult.getCommit();
- String sourceRef = pushResult.getPatchSet().getRefName();
+ String sourceRef = pushResult.getPatchSet().refName();
assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
@@ -164,7 +164,7 @@
Result pushResult = createChange();
RevCommit sourceCommit = pushResult.getCommit();
- String sourceRef = pushResult.getPatchSet().getRefName();
+ String sourceRef = pushResult.getPatchSet().refName();
assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
@@ -267,10 +267,10 @@
setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
Result pushResult = createChange();
- shutdownConfig();
+ shutdownDestinations();
pushResult.getCommit();
- String sourceRef = pushResult.getPatchSet().getRefName();
+ String sourceRef = pushResult.getPatchSet().refName();
assertThrows(
InterruptedException.class,
@@ -293,10 +293,10 @@
reloadConfig();
Result pushResult = createChange();
- shutdownConfig();
+ shutdownDestinations();
RevCommit sourceCommit = pushResult.getCommit();
- String sourceRef = pushResult.getPatchSet().getRefName();
+ String sourceRef = pushResult.getPatchSet().refName();
try (Repository repo = repoManager.openRepository(targetProject)) {
waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -318,7 +318,7 @@
replicationQueueStart();
RevCommit sourceCommit = pushResult.getCommit();
- String sourceRef = pushResult.getPatchSet().getRefName();
+ String sourceRef = pushResult.getPatchSet().refName();
try (Repository repo = repoManager.openRepository(targetProject)) {
waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
@@ -357,6 +357,7 @@
config.setStringList("remote", remoteName, "url", replicaUrls);
config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
+ config.setBoolean("gerrit", null, "autoReload", true);
config.save();
}
@@ -365,11 +366,11 @@
}
private void reloadConfig() {
- getAutoReloadConfigDecoratorInstance().forceReload();
+ getAutoReloadConfigDecoratorInstance().reload();
}
- private void shutdownConfig() {
- getAutoReloadConfigDecoratorInstance().shutdown();
+ private void shutdownDestinations() {
+ getInstance(DestinationsCollection.class).shutdown();
}
private void replicationQueueStart() {
@@ -398,7 +399,7 @@
private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
- return tasksStorage.list().stream()
+ return tasksStorage.listWaiting().stream()
.filter(task -> refmaskPattern.matcher(task.ref).matches())
.collect(toList());
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index 193af1e..e0de577 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -15,11 +15,9 @@
package com.googlesource.gerrit.plugins.replication;
import static com.google.common.truth.Truth.assertThat;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.resetToDefault;
-import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
import java.net.URISyntaxException;
@@ -35,8 +33,7 @@
@Before
public void setUp() throws Exception {
- pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
- replay(pushResultProcessingMock);
+ pushResultProcessingMock = mock(PushResultProcessing.class);
replicationState = new ReplicationState(pushResultProcessingMock);
}
@@ -53,52 +50,36 @@
@Test
public void shouldFireOneReplicationEventWhenNothingToReplicate() {
- resetToDefault(pushResultProcessingMock);
-
- // expected event
- pushResultProcessingMock.onAllRefsReplicatedToAllNodes(0);
- replay(pushResultProcessingMock);
-
// actual test
replicationState.markAllPushTasksScheduled();
- verify(pushResultProcessingMock);
+
+ // expected event
+ verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(0);
}
@Test
public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
- resetToDefault(pushResultProcessingMock);
URIish uri = new URIish("git://someHost/someRepo.git");
- // expected events
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 1);
- pushResultProcessingMock.onAllRefsReplicatedToAllNodes(1);
- replay(pushResultProcessingMock);
-
// actual test
replicationState.increasePushTaskCount("someProject", "someRef");
replicationState.markAllPushTasksScheduled();
replicationState.notifyRefReplicated(
"someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- verify(pushResultProcessingMock);
+
+ // expected events
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "someRef", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 1);
+ verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(1);
}
@Test
public void shouldFireEventsForReplicationOfOneRefToMultipleNodes() throws URISyntaxException {
- resetToDefault(pushResultProcessingMock);
URIish uri1 = new URIish("git://someHost1/someRepo.git");
URIish uri2 = new URIish("git://someHost2/someRepo.git");
- // expected events
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
- pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "someRef", 2);
- pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
- replay(pushResultProcessingMock);
-
// actual test
replicationState.increasePushTaskCount("someProject", "someRef");
replicationState.increasePushTaskCount("someProject", "someRef");
@@ -107,33 +88,29 @@
"someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
replicationState.notifyRefReplicated(
"someProject", "someRef", uri2, RefPushResult.FAILED, RemoteRefUpdate.Status.NON_EXISTING);
- verify(pushResultProcessingMock);
+
+ // expected events
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "someRef", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject",
+ "someRef",
+ uri2,
+ RefPushResult.FAILED,
+ RemoteRefUpdate.Status.NON_EXISTING);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "someRef", 2);
+ verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
}
@Test
public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
throws URISyntaxException {
- resetToDefault(pushResultProcessingMock);
URIish uri1 = new URIish("git://host1/someRepo.git");
URIish uri2 = new URIish("git://host2/someRepo.git");
URIish uri3 = new URIish("git://host3/someRepo.git");
- // expected events
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 3);
- pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 2);
- pushResultProcessingMock.onAllRefsReplicatedToAllNodes(5);
- replay(pushResultProcessingMock);
-
// actual test
replicationState.increasePushTaskCount("someProject", "ref1");
replicationState.increasePushTaskCount("someProject", "ref1");
@@ -151,24 +128,32 @@
"someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
replicationState.notifyRefReplicated(
"someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- verify(pushResultProcessingMock);
+
+ // expected events
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref1", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref1", uri3, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref2", uri2, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 3);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 2);
+ verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(5);
}
@Test
public void shouldFireEventsForReplicationSameRefDifferentProjects() throws URISyntaxException {
- resetToDefault(pushResultProcessingMock);
URIish uri = new URIish("git://host1/someRepo.git");
- // expected events
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToAllNodes("project1", "ref1", 1);
- pushResultProcessingMock.onRefReplicatedToAllNodes("project2", "ref2", 1);
- pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
- replay(pushResultProcessingMock);
-
// actual test
replicationState.increasePushTaskCount("project1", "ref1");
replicationState.increasePushTaskCount("project2", "ref2");
@@ -177,25 +162,24 @@
"project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
replicationState.notifyRefReplicated(
"project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- verify(pushResultProcessingMock);
+
+ // expected events
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "project1", "ref1", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "project2", "ref2", uri, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project1", "ref1", 1);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("project2", "ref2", 1);
+ verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
}
@Test
public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
throws URISyntaxException {
- resetToDefault(pushResultProcessingMock);
URIish uri1 = new URIish("git://host1/someRepo.git");
- // expected events
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToOneNode(
- "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
- pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref1", 1);
- pushResultProcessingMock.onRefReplicatedToAllNodes("someProject", "ref2", 1);
- pushResultProcessingMock.onAllRefsReplicatedToAllNodes(2);
- replay(pushResultProcessingMock);
-
// actual test
replicationState.increasePushTaskCount("someProject", "ref1");
replicationState.increasePushTaskCount("someProject", "ref2");
@@ -204,7 +188,17 @@
replicationState.notifyRefReplicated(
"someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
replicationState.markAllPushTasksScheduled();
- verify(pushResultProcessingMock);
+
+ // expected events
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref1", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock)
+ .onRefReplicatedToOneNode(
+ "someProject", "ref2", uri1, RefPushResult.SUCCEEDED, RemoteRefUpdate.Status.OK);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref1", 1);
+ verify(pushResultProcessingMock).onRefReplicatedToAllNodes("someProject", "ref2", 1);
+ verify(pushResultProcessingMock).onAllRefsReplicatedToAllNodes(2);
}
@Test