Initial commit
Change-Id: I9404351df2b4222699b7aa78e04ec23792f5a03b
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..7fa956c
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,30 @@
+load("//tools/bzl:junit.bzl", "junit_tests")
+load("//tools/bzl:plugin.bzl", "PLUGIN_DEPS", "PLUGIN_TEST_DEPS", "gerrit_plugin")
+
+gerrit_plugin(
+ name = "pull-replication",
+ srcs = glob(["src/main/java/**/*.java"]),
+ manifest_entries = [
+ "Implementation-Title: Pull Replication plugin",
+ "Implementation-URL: https://gerrit-review.googlesource.com/#/admin/projects/plugins/pull-replication",
+ "Gerrit-PluginName: pull-replication",
+ "Gerrit-Module: com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+ "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.pull.SshModule",
+ ],
+ resources = glob(["src/main/resources/**/*"]),
+ deps = [
+ "//lib/commons:io",
+ "//plugins/replication:replication",
+ ],
+)
+
+junit_tests(
+ name = "pull_replication_tests",
+ srcs = glob(["src/test/java/**/*Test.java"]),
+ tags = ["pull-replication"],
+ visibility = ["//visibility:public"],
+ deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+ ":pull-replication__plugin",
+ "//plugins/replication:replication",
+ ],
+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..11069ed
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
new file mode 100644
index 0000000..baffff7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.transport.URIish;
+
+public class FetchAll implements Runnable {
+ private final ReplicationStateListener stateLog;
+
+ public interface Factory {
+ FetchAll create(String urlMatch, ReplicationFilter filter, ReplicationState state, boolean now);
+ }
+
+ private final WorkQueue workQueue;
+ private final ProjectCache projectCache;
+ private final String urlMatch;
+ private final ReplicationFilter filter;
+ private final ReplicationState state;
+ private final boolean now;
+ private final SourcesCollection sources;
+
+ @Inject
+ protected FetchAll(
+ WorkQueue wq,
+ ProjectCache projectCache,
+ ReplicationStateListeners stateLog,
+ SourcesCollection sources,
+ @Assisted @Nullable String urlMatch,
+ @Assisted ReplicationFilter filter,
+ @Assisted ReplicationState state,
+ @Assisted boolean now) {
+ this.workQueue = wq;
+ this.projectCache = projectCache;
+ this.stateLog = stateLog;
+ this.sources = sources;
+ this.urlMatch = urlMatch;
+ this.filter = filter;
+ this.state = state;
+ this.now = now;
+ }
+
+ Future<?> schedule(long delay, TimeUnit unit) {
+ return workQueue.getDefaultQueue().schedule(this, delay, unit);
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (Project.NameKey nameKey : projectCache.all()) {
+ if (filter.matches(nameKey)) {
+ scheduleFullSync(nameKey, urlMatch, state, now);
+ }
+ }
+ } catch (Exception e) {
+ stateLog.error("Cannot enumerate known projects", e, state);
+ }
+ state.markAllFetchTasksScheduled();
+ }
+
+ private void scheduleFullSync(
+ Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
+
+ for (Source cfg : sources.getAll()) {
+ if (cfg.wouldFetchProject(project)) {
+ for (URIish uri : cfg.getURIs(project, urlMatch)) {
+ cfg.schedule(project, FetchOne.ALL_REFS, uri, state, now);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ String s = "Replicate All Projects";
+ if (urlMatch != null) {
+ s = s + " from " + urlMatch;
+ }
+ return s;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
new file mode 100644
index 0000000..7ee0da1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -0,0 +1,470 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.git.ProjectRunnable;
+import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
+import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.jcraft.jsch.JSchException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.eclipse.jgit.errors.NoRemoteRepositoryException;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.RemoteRepositoryException;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.NullProgressMonitor;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.FetchResult;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.TrackingRefUpdate;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+import org.slf4j.MDC;
+
+/**
+ * A pull from remote operation started by command-line.
+ *
+ * <p>Instance members are protected by the lock within FetchQueue. Callers must take that lock to
+ * ensure they are working with a current view of the object.
+ */
+class FetchOne implements ProjectRunnable, CanceledWhileRunning {
+ private final ReplicationStateListener stateLog;
+ static final String ALL_REFS = "..all..";
+ static final String ID_MDC_KEY = "fetchOneId";
+
+ interface Factory {
+ FetchOne create(Project.NameKey d, URIish u);
+ }
+
+ private final GitRepositoryManager gitManager;
+ private final Source pool;
+ private final RemoteConfig config;
+ private final CredentialsProvider credentialsProvider;
+ private final PerThreadRequestScope.Scoper threadScoper;
+
+ private final Project.NameKey projectName;
+ private final URIish uri;
+ private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
+ private boolean fetchAllRefs;
+ private Repository git;
+ private boolean retrying;
+ private int retryCount;
+ private final int maxRetries;
+ private boolean canceled;
+ private final ListMultimap<String, ReplicationState> stateMap = LinkedListMultimap.create();
+ private final int maxLockRetries;
+ private int lockRetryCount;
+ private final int id;
+ private final long createdAt;
+ private final FetchReplicationMetrics metrics;
+ private final AtomicBoolean canceledWhileRunning;
+
+ @Inject
+ FetchOne(
+ GitRepositoryManager grm,
+ Source s,
+ RemoteConfig c,
+ CredentialsFactory cpFactory,
+ PerThreadRequestScope.Scoper ts,
+ IdGenerator ig,
+ ReplicationStateListeners sl,
+ FetchReplicationMetrics m,
+ @Assisted Project.NameKey d,
+ @Assisted URIish u) {
+ gitManager = grm;
+ pool = s;
+ config = c;
+ credentialsProvider = cpFactory.create(c.getName());
+ threadScoper = ts;
+ projectName = d;
+ uri = u;
+ lockRetryCount = 0;
+ maxLockRetries = pool.getLockErrorMaxRetries();
+ id = ig.next();
+ stateLog = sl;
+ createdAt = System.nanoTime();
+ metrics = m;
+ canceledWhileRunning = new AtomicBoolean(false);
+ maxRetries = s.getMaxRetries();
+ }
+
+ @Override
+ public void cancel() {
+ repLog.info("Replication {} was canceled", getURI());
+ canceledByReplication();
+ pool.fetchWasCanceled(this);
+ }
+
+ @Override
+ public void setCanceledWhileRunning() {
+ repLog.info("Replication {} was canceled while being executed", getURI());
+ canceledWhileRunning.set(true);
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return projectName;
+ }
+
+ @Override
+ public String getRemoteName() {
+ return config.getName();
+ }
+
+ @Override
+ public boolean hasCustomizedPrint() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ String print = "[" + HexFormat.fromInt(id) + "] fetch " + uri;
+
+ if (retryCount > 0) {
+ print = "(retry " + retryCount + ") " + print;
+ }
+ return print;
+ }
+
+ boolean isRetrying() {
+ return retrying;
+ }
+
+ boolean setToRetry() {
+ retrying = true;
+ retryCount++;
+ return maxRetries == 0 || retryCount <= maxRetries;
+ }
+
+ void canceledByReplication() {
+ canceled = true;
+ }
+
+ boolean wasCanceled() {
+ return canceled;
+ }
+
+ URIish getURI() {
+ return uri;
+ }
+
+ void addRef(String ref) {
+ if (ALL_REFS.equals(ref)) {
+ delta.clear();
+ fetchAllRefs = true;
+ repLog.trace("Added all refs for replication from {}", uri);
+ } else if (!fetchAllRefs) {
+ delta.add(ref);
+ repLog.trace("Added ref {} for replication from {}", ref, uri);
+ }
+ }
+
+ Set<String> getRefs() {
+ return fetchAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
+ }
+
+ void addRefs(Set<String> refs) {
+ if (!fetchAllRefs) {
+ for (String ref : refs) {
+ addRef(ref);
+ }
+ }
+ }
+
+ void addState(String ref, ReplicationState state) {
+ stateMap.put(ref, state);
+ }
+
+ ListMultimap<String, ReplicationState> getStates() {
+ return stateMap;
+ }
+
+ ReplicationState[] getStatesAsArray() {
+ Set<ReplicationState> statesSet = new HashSet<>();
+ statesSet.addAll(stateMap.values());
+ return statesSet.toArray(new ReplicationState[statesSet.size()]);
+ }
+
+ ReplicationState[] getStatesByRef(String ref) {
+ Collection<ReplicationState> states = stateMap.get(ref);
+ return states.toArray(new ReplicationState[states.size()]);
+ }
+
+ void addStates(ListMultimap<String, ReplicationState> states) {
+ stateMap.putAll(states);
+ }
+
+ void removeStates() {
+ stateMap.clear();
+ }
+
+ private void statesCleanUp() {
+ if (!stateMap.isEmpty() && !isRetrying()) {
+ for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
+ entry
+ .getValue()
+ .notifyRefReplicated(
+ projectName.get(),
+ entry.getKey(),
+ uri,
+ ReplicationState.RefFetchResult.FAILED,
+ null);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ threadScoper
+ .scope(
+ new Callable<Void>() {
+ @Override
+ public Void call() {
+ runFetchOperation();
+ return null;
+ }
+ })
+ .call();
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ } finally {
+ statesCleanUp();
+ }
+ }
+
+ private void runFetchOperation() {
+ // Lock the queue, and remove ourselves, so we can't be modified once
+ // we start replication (instead a new instance, with the same URI, is
+ // created and scheduled for a future point in time.)
+ //
+ MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
+ if (!pool.requestRunway(this)) {
+ if (!canceled) {
+ repLog.info(
+ "Rescheduling replication to {} to avoid collision with an in-flight fetch.", uri);
+ pool.reschedule(this, Source.RetryReason.COLLISION);
+ }
+ return;
+ }
+
+ repLog.info("Replication from {} started...", uri);
+ Timer1.Context<String> context = metrics.start(config.getName());
+ try {
+ long startedAt = context.getStartTime();
+ long delay = NANOSECONDS.toMillis(startedAt - createdAt);
+ metrics.record(config.getName(), delay, retryCount);
+ git = gitManager.openRepository(projectName);
+ runImpl();
+ long elapsed = NANOSECONDS.toMillis(context.stop());
+ repLog.info(
+ "Replication from {} completed in {}ms, {}ms delay, {} retries",
+ uri,
+ elapsed,
+ delay,
+ retryCount);
+ } catch (RepositoryNotFoundException e) {
+ stateLog.error(
+ "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
+ getStatesAsArray());
+
+ } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
+ // Tried to replicate to a remote via anonymous git:// but the repository
+ // does not exist. In this case NoRemoteRepositoryException is not
+ // raised.
+ String msg = e.getMessage();
+ repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg);
+ } catch (NotSupportedException e) {
+ stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+ } catch (TransportException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
+ repLog.error("Cannot replicate from {}: {}", uri, cause.getMessage());
+ } else if (e instanceof LockFailureException) {
+ lockRetryCount++;
+ // The LockFailureException message contains both URI and reason
+ // for this failure.
+ repLog.error("Cannot replicate from {}: {}", uri, e.getMessage());
+
+ // The remote fetch operation should be retried.
+ if (lockRetryCount <= maxLockRetries) {
+ if (canceledWhileRunning.get()) {
+ logCanceledWhileRunningException(e);
+ } else {
+ pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+ }
+ } else {
+ repLog.error(
+ "Giving up after {} occurrences of this error: {} during replication from {}",
+ lockRetryCount,
+ e.getMessage(),
+ uri);
+ }
+ } else {
+ if (canceledWhileRunning.get()) {
+ logCanceledWhileRunningException(e);
+ } else {
+ repLog.error("Cannot replicate from {}", uri, e);
+ // The remote fetch operation should be retried.
+ pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+ }
+ }
+ } catch (IOException e) {
+ stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+ } catch (RuntimeException | Error e) {
+ stateLog.error("Unexpected error during replication from " + uri, e, getStatesAsArray());
+ } finally {
+ if (git != null) {
+ git.close();
+ }
+ pool.notifyFinished(this);
+ }
+ }
+
+ private void logCanceledWhileRunningException(TransportException e) {
+ repLog.info("Cannot replicate from {}. It was canceled while running", uri, e);
+ }
+
+ private void runImpl() throws IOException {
+ FetchResult res;
+ try (Transport tn = Transport.open(git, uri)) {
+ res = fetchVia(tn);
+ }
+ updateStates(res.getTrackingRefUpdates());
+ }
+
+ private FetchResult fetchVia(Transport tn) throws IOException {
+ tn.applyConfig(config);
+ tn.setCredentialsProvider(credentialsProvider);
+
+ repLog.info("Fetch references {} from {}", config.getFetchRefSpecs(), uri);
+
+ return tn.fetch(NullProgressMonitor.INSTANCE, config.getFetchRefSpecs());
+ }
+
+ private void updateStates(Collection<TrackingRefUpdate> refUpdates) throws IOException {
+ Set<String> doneRefs = new HashSet<>();
+ boolean anyRefFailed = false;
+ RefUpdate.Result lastRefUpdateResult = RefUpdate.Result.NO_CHANGE;
+
+ for (TrackingRefUpdate u : refUpdates) {
+ ReplicationState.RefFetchResult fetchStatus = ReplicationState.RefFetchResult.SUCCEEDED;
+ Set<ReplicationState> logStates = new HashSet<>();
+ lastRefUpdateResult = u.getResult();
+
+ logStates.addAll(stateMap.get(u.getRemoteName()));
+ logStates.addAll(stateMap.get(ALL_REFS));
+ ReplicationState[] logStatesArray = logStates.toArray(new ReplicationState[logStates.size()]);
+
+ doneRefs.add(u.getRemoteName());
+ switch (u.getResult()) {
+ case NO_CHANGE:
+ case NEW:
+ case FORCED:
+ case RENAMED:
+ case FAST_FORWARD:
+ break;
+ case NOT_ATTEMPTED:
+ case REJECTED:
+ case REJECTED_CURRENT_BRANCH:
+ case REJECTED_MISSING_OBJECT:
+ stateLog.error(
+ String.format(
+ "Failed replicate %s from %s: result %s", uri, u.getRemoteName(), u.getResult()),
+ logStatesArray);
+ fetchStatus = ReplicationState.RefFetchResult.FAILED;
+ anyRefFailed = true;
+ break;
+
+ case LOCK_FAILURE:
+ throw new LockFailureException(uri, u.toString());
+ case IO_FAILURE:
+ throw new IOException(u.toString());
+
+ case REJECTED_OTHER_REASON:
+ stateLog.error(
+ String.format(
+ "Failed replicate %s from %s, reason: %s", uri, u.getRemoteName(), u.toString()),
+ logStatesArray);
+
+ fetchStatus = ReplicationState.RefFetchResult.FAILED;
+ anyRefFailed = true;
+ break;
+ }
+
+ for (ReplicationState rs : getStatesByRef(u.getRemoteName())) {
+ rs.notifyRefReplicated(
+ projectName.get(), u.getRemoteName(), uri, fetchStatus, u.getResult());
+ }
+ }
+
+ doneRefs.add(ALL_REFS);
+ for (ReplicationState rs : getStatesByRef(ALL_REFS)) {
+ rs.notifyRefReplicated(
+ projectName.get(),
+ ALL_REFS,
+ uri,
+ anyRefFailed
+ ? ReplicationState.RefFetchResult.FAILED
+ : ReplicationState.RefFetchResult.SUCCEEDED,
+ lastRefUpdateResult);
+ }
+ for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
+ if (!doneRefs.contains(entry.getKey())) {
+ entry
+ .getValue()
+ .notifyRefReplicated(
+ projectName.get(),
+ entry.getKey(),
+ uri,
+ ReplicationState.RefFetchResult.NOT_ATTEMPTED,
+ null);
+ }
+ }
+ stateMap.clear();
+ }
+
+ public static class LockFailureException extends TransportException {
+ private static final long serialVersionUID = 1L;
+
+ LockFailureException(URIish uri, String message) {
+ super(uri, message);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
new file mode 100644
index 0000000..eb825b8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
+import org.eclipse.jgit.lib.RefUpdate;
+
+public class FetchRefReplicatedEvent extends RefEvent {
+ static final String TYPE = "fetch-ref-replicated";
+
+ final String project;
+ final String ref;
+ final String sourceNode;
+ final String status;
+ final RefUpdate.Result refUpdateResult;
+
+ public FetchRefReplicatedEvent(
+ String project,
+ String ref,
+ String sourceNode,
+ ReplicationState.RefFetchResult status,
+ RefUpdate.Result refUpdateResult) {
+ super(TYPE);
+ this.project = project;
+ this.ref = ref;
+ this.sourceNode = sourceNode;
+ this.status = status.toString();
+ this.refUpdateResult = refUpdateResult;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(project, ref, status, refUpdateResult);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof FetchRefReplicatedEvent)) {
+ return false;
+ }
+ FetchRefReplicatedEvent event = (FetchRefReplicatedEvent) other;
+ if (!Objects.equals(event.project, this.project)) {
+ return false;
+ }
+ if (!Objects.equals(event.ref, this.ref)) {
+ return false;
+ }
+ if (!Objects.equals(event.sourceNode, this.sourceNode)) {
+ return false;
+ }
+ if (!Objects.equals(event.status, this.status)) {
+ return false;
+ }
+ return Objects.equals(event.refUpdateResult, this.refUpdateResult);
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+
+ @Override
+ public String getRefName() {
+ return ref;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicationDoneEvent.java
new file mode 100644
index 0000000..a434702
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicationDoneEvent.java
@@ -0,0 +1,65 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
+
+public class FetchRefReplicationDoneEvent extends RefEvent {
+ static final String TYPE = "fetch-ref-replication-done";
+
+ final String project;
+ final String ref;
+ final int nodesCount;
+
+ public FetchRefReplicationDoneEvent(String project, String ref, int nodesCount) {
+ super(TYPE);
+ this.project = project;
+ this.ref = ref;
+ this.nodesCount = nodesCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(TYPE, project, ref, nodesCount);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof FetchRefReplicationDoneEvent)) {
+ return false;
+ }
+
+ FetchRefReplicationDoneEvent event = (FetchRefReplicationDoneEvent) other;
+ if (!Objects.equals(event.project, this.project)) {
+ return false;
+ }
+ if (!Objects.equals(event.ref, this.ref)) {
+ return false;
+ }
+ return event.nodesCount == this.nodesCount;
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+
+ @Override
+ public String getRefName() {
+ return ref;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
new file mode 100644
index 0000000..e952252
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class FetchReplicationMetrics {
+ private final Timer1<String> executionTime;
+ private final Histogram1<String> executionDelay;
+ private final Histogram1<String> executionRetries;
+
+ @Inject
+ FetchReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+ Field<String> SOURCE_FIELD =
+ Field.ofString(
+ "source",
+ (metadataBuilder, fieldValue) ->
+ metadataBuilder
+ .pluginName(pluginName)
+ .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+ .build();
+
+ executionTime =
+ metricMaker.newTimer(
+ "replication_latency",
+ new Description("Time spent fetching from remote source.")
+ .setCumulative()
+ .setUnit(Description.Units.MILLISECONDS),
+ SOURCE_FIELD);
+
+ executionDelay =
+ metricMaker.newHistogram(
+ "replication_delay",
+ new Description("Time spent waiting before fetching from remote source")
+ .setCumulative()
+ .setUnit(Description.Units.MILLISECONDS),
+ SOURCE_FIELD);
+
+ executionRetries =
+ metricMaker.newHistogram(
+ "replication_retries",
+ new Description("Number of retries when fetching from remote sources")
+ .setCumulative()
+ .setUnit("retries"),
+ SOURCE_FIELD);
+ }
+
+ /**
+ * Start the replication latency timer from a source.
+ *
+ * @param name the source name.
+ * @return the timer context.
+ */
+ public Timer1.Context<String> start(String name) {
+ return executionTime.start(name);
+ }
+
+ /**
+ * Record the replication delay and retry metrics for a source.
+ *
+ * @param name the source name.
+ * @param delay replication delay in milliseconds.
+ * @param retries number of retries.
+ */
+ public void record(String name, long delay, long retries) {
+ executionDelay.record(name, delay);
+ executionRetries.record(name, retries);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
new file mode 100644
index 0000000..c3d6aca
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+
+public class FetchReplicationScheduledEvent extends RefEvent {
+ static final String TYPE = "fetch-ref-replication-scheduled";
+
+ final String project;
+ final String ref;
+ final String sourceNode;
+
+ public FetchReplicationScheduledEvent(String project, String ref, String sourceNode) {
+ super(TYPE);
+ this.project = project;
+ this.ref = ref;
+ this.sourceNode = sourceNode;
+ }
+
+ @Override
+ public String getRefName() {
+ return ref;
+ }
+
+ @Override
+ public Project.NameKey getProjectNameKey() {
+ return Project.nameKey(project);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
new file mode 100644
index 0000000..202d7e6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
@@ -0,0 +1,194 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+
+public abstract class FetchResultProcessing {
+
+ abstract void onOneProjectReplicationDone(
+ String project,
+ String ref,
+ URIish uri,
+ ReplicationState.RefFetchResult status,
+ RefUpdate.Result refUpdateResult);
+
+ abstract void onRefReplicatedFromAllNodes(String project, String ref, int nodesCount);
+
+ abstract void onAllRefsReplicatedFromAllNodes(int totalFetchTasksCount);
+
+ /**
+ * Write message to standard out.
+ *
+ * @param message message text.
+ */
+ void writeStdOut(String message) {
+ // Default doing nothing
+ }
+
+ /**
+ * Write message to standard error.
+ *
+ * @param message message text.
+ */
+ void writeStdErr(String message) {
+ // Default doing nothing
+ }
+
+ public static String resolveNodeName(URIish uri) {
+ StringBuilder sb = new StringBuilder();
+ if (uri.isRemote()) {
+ sb.append(uri.getHost());
+ if (uri.getPort() != -1) {
+ sb.append(":");
+ sb.append(uri.getPort());
+ }
+ } else {
+ sb.append(uri.getPath());
+ }
+ return sb.toString();
+ }
+
+ public static class CommandProcessing extends FetchResultProcessing {
+ private WeakReference<StartFetchCommand> sshCommand;
+ private AtomicBoolean hasError = new AtomicBoolean();
+
+ CommandProcessing(StartFetchCommand sshCommand) {
+ this.sshCommand = new WeakReference<>(sshCommand);
+ }
+
+ @Override
+ void onOneProjectReplicationDone(
+ String project,
+ String ref,
+ URIish uri,
+ ReplicationState.RefFetchResult status,
+ RefUpdate.Result refUpdateResult) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Replicate ");
+ sb.append(project);
+ sb.append(" ref ");
+ sb.append(ref);
+ sb.append(" from ");
+ sb.append(resolveNodeName(uri));
+ sb.append(", ");
+ switch (status) {
+ case SUCCEEDED:
+ sb.append("Succeeded!");
+ break;
+ case FAILED:
+ sb.append("FAILED!");
+ hasError.compareAndSet(false, true);
+ break;
+ case NOT_ATTEMPTED:
+ sb.append("NOT ATTEMPTED!");
+ break;
+ default:
+ sb.append("UNKNOWN RESULT!");
+ break;
+ }
+ sb.append(" (");
+ sb.append(refUpdateResult.toString());
+ sb.append(")");
+ writeStdOut(sb.toString());
+ }
+
+ @Override
+ void onRefReplicatedFromAllNodes(String project, String ref, int nodesCount) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Replication of ");
+ sb.append(project);
+ sb.append(" ref ");
+ sb.append(ref);
+ sb.append(" completed from ");
+ sb.append(nodesCount);
+ sb.append(" nodes, ");
+ writeStdOut(sb.toString());
+ }
+
+ @Override
+ void onAllRefsReplicatedFromAllNodes(int totalFetchTasksCount) {
+ if (totalFetchTasksCount == 0) {
+ return;
+ }
+ writeStdOut("----------------------------------------------");
+ if (hasError.get()) {
+ writeStdOut("Replication completed with some errors!");
+ } else {
+ writeStdOut("Replication completed successfully!");
+ }
+ }
+
+ @Override
+ void writeStdOut(String message) {
+ StartFetchCommand command = sshCommand.get();
+ if (command != null) {
+ command.writeStdOutSync(message);
+ }
+ }
+
+ @Override
+ void writeStdErr(String message) {
+ StartFetchCommand command = sshCommand.get();
+ if (command != null) {
+ command.writeStdErrSync(message);
+ }
+ }
+ }
+
+ public static class GitUpdateProcessing extends FetchResultProcessing {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final EventDispatcher dispatcher;
+
+ public GitUpdateProcessing(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ void onOneProjectReplicationDone(
+ String project,
+ String ref,
+ URIish uri,
+ ReplicationState.RefFetchResult result,
+ RefUpdate.Result refUpdateResult) {
+ postEvent(
+ new FetchRefReplicatedEvent(project, ref, resolveNodeName(uri), result, refUpdateResult));
+ }
+
+ @Override
+ void onRefReplicatedFromAllNodes(String project, String ref, int nodesCount) {
+ postEvent(new FetchRefReplicationDoneEvent(project, ref, nodesCount));
+ }
+
+ @Override
+ void onAllRefsReplicatedFromAllNodes(int totalFetchTasksCount) {}
+
+ private void postEvent(RefEvent event) {
+ try {
+ dispatcher.postEvent(event);
+ } catch (PermissionBackendException e) {
+ logger.atSevere().withCause(e).log("Cannot post event");
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ListCommand.java
new file mode 100644
index 0000000..d7d55ec
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ListCommand.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.sshd.CommandMetaData;
+import com.google.gerrit.sshd.SshCommand;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.google.inject.Inject;
+import java.util.Collection;
+import java.util.List;
+import org.kohsuke.args4j.Option;
+
+@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
+@CommandMetaData(name = "list", description = "List remote source information")
+final class ListCommand extends SshCommand {
+ @Option(name = "--remote", metaVar = "PATTERN", usage = "pattern to match remote name on")
+ private String remote;
+
+ @Option(name = "--detail", usage = "output detailed information")
+ private boolean detail;
+
+ @Option(name = "--json", usage = "output in json format")
+ private boolean json;
+
+ @Inject private SourcesCollection sourcesCollection;
+
+ @Override
+ protected void run() {
+ for (Source s : sourcesCollection.getAll()) {
+ if (matches(s.getRemoteConfigName())) {
+ printRemote(s);
+ }
+ }
+ }
+
+ private boolean matches(String name) {
+ return (Strings.isNullOrEmpty(remote) || name.contains(remote) || name.matches(remote));
+ }
+
+ private void addProperty(JsonObject obj, String key, List<String> values) {
+ if (!values.isEmpty()) {
+ JsonArray list = new JsonArray();
+ for (String v : values) {
+ list.add(new JsonPrimitive(v));
+ }
+ obj.add(key, list);
+ }
+ }
+
+ private void addQueueDetails(StringBuilder out, Collection<FetchOne> values) {
+ for (FetchOne f : values) {
+ out.append(" ").append(f.toString()).append("\n");
+ }
+ }
+
+ private void addQueueDetails(JsonObject obj, String key, Collection<FetchOne> values) {
+ if (values.size() > 0) {
+ JsonArray list = new JsonArray();
+ for (FetchOne f : values) {
+ list.add(new JsonPrimitive(f.toString()));
+ }
+ obj.add(key, list);
+ }
+ }
+
+ private void printRemote(Source s) {
+ if (json) {
+ JsonObject obj = new JsonObject();
+ obj.addProperty("Remote", s.getRemoteConfigName());
+ addProperty(obj, "Url", s.getUrls());
+ if (detail) {
+ addProperty(obj, "AdminUrl", s.getAdminUrls());
+ addProperty(obj, "AuthGroup", s.getAuthGroupNames());
+ addProperty(obj, "Project", s.getProjects());
+ Source.QueueInfo q = s.getQueueInfo();
+ addQueueDetails(obj, "InFlight", q.inFlight.values());
+ addQueueDetails(obj, "Pending", q.pending.values());
+ }
+ stdout.print(obj.toString() + "\n");
+ } else {
+ StringBuilder out = new StringBuilder();
+ out.append("Remote: ").append(s.getRemoteConfigName()).append("\n");
+ for (String url : s.getUrls()) {
+ out.append("Url: ").append(url).append("\n");
+ }
+
+ if (detail) {
+ for (String adminUrl : s.getAdminUrls()) {
+ out.append("AdminUrl: ").append(adminUrl).append("\n");
+ }
+
+ for (String authGroup : s.getAuthGroupNames()) {
+ out.append("AuthGroup: ").append(authGroup).append("\n");
+ }
+
+ for (String project : s.getProjects()) {
+ out.append("Project: ").append(project).append("\n");
+ }
+
+ Source.QueueInfo q = s.getQueueInfo();
+ out.append("In Flight: ").append(q.inFlight.size()).append("\n");
+ addQueueDetails(out, q.inFlight.values());
+ out.append("Pending: ").append(q.pending.size()).append("\n");
+ addQueueDetails(out, q.pending.values());
+ }
+ stdout.print(out.toString() + "\n");
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/NoopObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/NoopObservableQueue.java
new file mode 100644
index 0000000..0e752c7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/NoopObservableQueue.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.googlesource.gerrit.plugins.replication.ObservableQueue;
+
+public class NoopObservableQueue implements ObservableQueue {
+
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
+
+ @Override
+ public boolean isReplaying() {
+ return false;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
new file mode 100644
index 0000000..d8c4a8d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -0,0 +1,82 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.util.concurrent.Atomics;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class OnStartStop implements LifecycleListener {
+ private final AtomicReference<Future<?>> fetchAllFuture;
+ private final ServerInformation srvInfo;
+ private final FetchAll.Factory fetchAll;
+ private final ReplicationConfig config;
+ private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final ReplicationState.Factory replicationStateFactory;
+ private final SourcesCollection sourcesCollection;
+ private final WorkQueue workQueue;
+
+ @Inject
+ protected OnStartStop(
+ ServerInformation srvInfo,
+ FetchAll.Factory fetchAll,
+ ReplicationConfig config,
+ DynamicItem<EventDispatcher> eventDispatcher,
+ ReplicationState.Factory replicationStateFactory,
+ SourcesCollection sourcesCollection,
+ WorkQueue workQueue) {
+ this.srvInfo = srvInfo;
+ this.fetchAll = fetchAll;
+ this.config = config;
+ this.eventDispatcher = eventDispatcher;
+ this.replicationStateFactory = replicationStateFactory;
+ this.fetchAllFuture = Atomics.newReference();
+ this.sourcesCollection = sourcesCollection;
+ this.workQueue = workQueue;
+ }
+
+ @Override
+ public void start() {
+ if (srvInfo.getState() == ServerInformation.State.STARTUP
+ && config.isReplicateAllOnPluginStart()) {
+ ReplicationState state =
+ replicationStateFactory.create(
+ new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get()));
+ fetchAllFuture.set(
+ fetchAll
+ .create(null, ReplicationFilter.all(), state, false)
+ .schedule(30, TimeUnit.SECONDS));
+ }
+
+ sourcesCollection.startup(workQueue);
+ }
+
+ @Override
+ public void stop() {
+ Future<?> f = fetchAllFuture.getAndSet(null);
+ if (f != null) {
+ f.cancel(true);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogFile.java
new file mode 100644
index 0000000..299648d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogFile.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2014 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import org.apache.log4j.PatternLayout;
+
+public class PullReplicationLogFile extends PluginLogFile {
+
+ @Inject
+ public PullReplicationLogFile(SystemLog systemLog, ServerInformation serverInfo) {
+ super(
+ systemLog,
+ serverInfo,
+ PullReplicationLogger.PULL_REPLICATION_LOG_NAME,
+ new PatternLayout("[%d] [%X{" + FetchOne.ID_MDC_KEY + "}] %m%n"));
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogger.java
new file mode 100644
index 0000000..25f883f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogger.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PullReplicationLogger {
+ public static final String PULL_REPLICATION_LOG_NAME = "pull_replication_log";
+ public static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
new file mode 100644
index 0000000..236c8b6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -0,0 +1,107 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
+
+import com.google.common.eventbus.EventBus;
+import com.google.gerrit.extensions.annotations.Exports;
+import com.google.gerrit.extensions.config.CapabilityDefinition;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.EventTypes;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Scopes;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.internal.UniqueAnnotations;
+import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
+import com.googlesource.gerrit.plugins.replication.AutoReloadSecureCredentialsFactoryDecorator;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.ObservableQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+class PullReplicationModule extends AbstractModule {
+ private final Path cfgPath;
+
+ @Inject
+ public PullReplicationModule(SitePaths site) {
+ cfgPath = site.etc_dir.resolve("replication.config");
+ }
+
+ @Override
+ protected void configure() {
+ install(new FactoryModuleBuilder().build(Source.Factory.class));
+ bind(FetchReplicationMetrics.class).in(Scopes.SINGLETON);
+
+ bind(OnStartStop.class).in(Scopes.SINGLETON);
+ bind(LifecycleListener.class).annotatedWith(UniqueAnnotations.create()).to(OnStartStop.class);
+ bind(LifecycleListener.class)
+ .annotatedWith(UniqueAnnotations.create())
+ .to(PullReplicationLogFile.class);
+ bind(CredentialsFactory.class)
+ .to(AutoReloadSecureCredentialsFactoryDecorator.class)
+ .in(Scopes.SINGLETON);
+ bind(CapabilityDefinition.class)
+ .annotatedWith(Exports.named(START_REPLICATION))
+ .to(StartReplicationCapability.class);
+
+ install(new FactoryModuleBuilder().build(FetchAll.Factory.class));
+ install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
+
+ bind(EventBus.class).in(Scopes.SINGLETON);
+ bind(ReplicationSources.class).to(SourcesCollection.class);
+ bind(ObservableQueue.class).to(NoopObservableQueue.class).in(Scopes.SINGLETON);
+
+ bind(ReplicationConfigValidator.class).to(SourcesCollection.class);
+
+ if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+ bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+ bind(LifecycleListener.class)
+ .annotatedWith(UniqueAnnotations.create())
+ .to(AutoReloadConfigDecorator.class);
+ } else {
+ bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+ }
+
+ DynamicSet.setOf(binder(), ReplicationStateListener.class);
+ DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
+ EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
+ EventTypes.register(FetchRefReplicationDoneEvent.TYPE, FetchRefReplicationDoneEvent.class);
+ EventTypes.register(FetchReplicationScheduledEvent.TYPE, FetchReplicationScheduledEvent.class);
+ }
+
+ private FileBasedConfig getReplicationConfig() {
+ File replicationConfigFile = cfgPath.toFile();
+ FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+ try {
+ config.load();
+ } catch (IOException | ConfigInvalidException e) {
+ throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+ }
+ return config;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
new file mode 100644
index 0000000..7e60323
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.google.inject.Singleton;
+
+/**
+ * Wrapper around a Logger that also logs out the replication state.
+ *
+ * <p>When logging replication errors it is useful to know the current replication state. This
+ * utility class wraps the methods from Logger and logs additional information about the replication
+ * state to the stderr console.
+ */
+@Singleton
+class PullReplicationStateLogger implements ReplicationStateListener {
+
+ @Override
+ public void warn(String msg, ReplicationState... states) {
+ stateWriteErr("Warning: " + msg, states);
+ repLog.warn(msg);
+ }
+
+ @Override
+ public void error(String msg, ReplicationState... states) {
+ stateWriteErr("Error: " + msg, states);
+ repLog.error(msg);
+ }
+
+ @Override
+ public void error(String msg, Throwable t, ReplicationState... states) {
+ stateWriteErr("Error: " + msg, states);
+ repLog.error(msg, t);
+ }
+
+ private void stateWriteErr(String msg, ReplicationState[] states) {
+ for (ReplicationState rs : states) {
+ if (rs != null) {
+ rs.writeStdErr(msg);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationSources.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationSources.java
new file mode 100644
index 0000000..25f42cd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationSources.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.server.git.WorkQueue;
+import java.util.List;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationSources {
+
+ /**
+ * List of currently active replication sources.
+ *
+ * @return the list of active sources
+ */
+ List<Source> getAll();
+
+ /** @return true if there are no destinations, false otherwise. */
+ boolean isEmpty();
+
+ /**
+ * Start replicating from all sources.
+ *
+ * @param workQueue execution queue for scheduling the replication events.
+ */
+ void startup(WorkQueue workQueue);
+
+ /**
+ * Stop the replication from all sources.
+ *
+ * @return number of events cancelled during shutdown.
+ */
+ int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
new file mode 100644
index 0000000..5044496
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
@@ -0,0 +1,184 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+
+public class ReplicationState {
+
+ public interface Factory {
+ ReplicationState create(FetchResultProcessing processing);
+ }
+
+ private boolean allScheduled;
+ private final FetchResultProcessing fetchResultProcessing;
+
+ private final Lock countingLock = new ReentrantLock();
+ private final CountDownLatch allFetchTasksFinished = new CountDownLatch(1);
+
+ private static class RefReplicationStatus {
+ private final String project;
+ private final String ref;
+ private int projectsToReplicateCount;
+ private int replicatedNodesCount;
+
+ RefReplicationStatus(String project, String ref) {
+ this.project = project;
+ this.ref = ref;
+ }
+
+ public boolean allDone() {
+ return replicatedNodesCount == projectsToReplicateCount;
+ }
+ }
+
+ private final Table<String, String, RefReplicationStatus> statusByProjectRef;
+ private int totalFetchTasksCount;
+ private int finishedFetchTasksCount;
+
+ @AssistedInject
+ ReplicationState(@Assisted FetchResultProcessing processing) {
+ fetchResultProcessing = processing;
+ statusByProjectRef = HashBasedTable.create();
+ }
+
+ public void increaseFetchTaskCount(String project, String ref) {
+ countingLock.lock();
+ try {
+ getRefStatus(project, ref).projectsToReplicateCount++;
+ totalFetchTasksCount++;
+ } finally {
+ countingLock.unlock();
+ }
+ }
+
+ public boolean hasFetchTask() {
+ return totalFetchTasksCount != 0;
+ }
+
+ public void notifyRefReplicated(
+ String project,
+ String ref,
+ URIish uri,
+ RefFetchResult status,
+ RefUpdate.Result refUpdateResult) {
+ fetchResultProcessing.onOneProjectReplicationDone(project, ref, uri, status, refUpdateResult);
+
+ RefReplicationStatus completedRefStatus = null;
+ boolean allFetchTasksCompleted = false;
+ countingLock.lock();
+ try {
+ RefReplicationStatus refStatus = getRefStatus(project, ref);
+ refStatus.replicatedNodesCount++;
+ finishedFetchTasksCount++;
+
+ if (allScheduled) {
+ if (refStatus.allDone()) {
+ completedRefStatus = statusByProjectRef.remove(project, ref);
+ }
+ allFetchTasksCompleted = finishedFetchTasksCount == totalFetchTasksCount;
+ }
+ } finally {
+ countingLock.unlock();
+ }
+
+ if (completedRefStatus != null) {
+ doRefFetchTasksCompleted(completedRefStatus);
+ }
+
+ if (allFetchTasksCompleted) {
+ doAllFetchTasksCompleted();
+ }
+ }
+
+ public void markAllFetchTasksScheduled() {
+ countingLock.lock();
+ try {
+ allScheduled = true;
+ if (finishedFetchTasksCount < totalFetchTasksCount) {
+ return;
+ }
+ } finally {
+ countingLock.unlock();
+ }
+
+ doAllFetchTasksCompleted();
+ }
+
+ private void doAllFetchTasksCompleted() {
+ fireRemainingOnRefReplicatedFromAllNodes();
+ fetchResultProcessing.onAllRefsReplicatedFromAllNodes(totalFetchTasksCount);
+ allFetchTasksFinished.countDown();
+ }
+
+ /**
+ * Some could be remaining if replication of a ref is completed before all tasks are scheduled.
+ */
+ private void fireRemainingOnRefReplicatedFromAllNodes() {
+ for (RefReplicationStatus refStatus : statusByProjectRef.values()) {
+ doRefFetchTasksCompleted(refStatus);
+ }
+ }
+
+ private void doRefFetchTasksCompleted(RefReplicationStatus refStatus) {
+ fetchResultProcessing.onRefReplicatedFromAllNodes(
+ refStatus.project, refStatus.ref, refStatus.projectsToReplicateCount);
+ }
+
+ private RefReplicationStatus getRefStatus(String project, String ref) {
+ if (!statusByProjectRef.contains(project, ref)) {
+ RefReplicationStatus refStatus = new RefReplicationStatus(project, ref);
+ statusByProjectRef.put(project, ref, refStatus);
+ return refStatus;
+ }
+ return statusByProjectRef.get(project, ref);
+ }
+
+ public void waitForReplication() throws InterruptedException {
+ allFetchTasksFinished.await();
+ }
+
+ public void writeStdOut(String message) {
+ fetchResultProcessing.writeStdOut(message);
+ }
+
+ public void writeStdErr(String message) {
+ fetchResultProcessing.writeStdErr(message);
+ }
+
+ public enum RefFetchResult {
+ /** The ref was not successfully replicated. */
+ FAILED,
+
+ /** The ref is not configured to be replicated. */
+ NOT_ATTEMPTED,
+
+ /** The ref was successfully replicated. */
+ SUCCEEDED;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase().replace("_", "-");
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListener.java
new file mode 100644
index 0000000..c4b11b7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListener.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+/** Interface for notifying replication status updates. */
+public interface ReplicationStateListener {
+
+ /**
+ * Notify a non-fatal replication error.
+ *
+ * <p>Replication states received a non-fatal error with an associated warning message.
+ *
+ * @param msg message description of the error
+ * @param states replication states impacted
+ */
+ void warn(String msg, ReplicationState... states);
+
+ /**
+ * Notify a fatal replication error.
+ *
+ * <p>Replication states have received a fatal error and replication has failed.
+ *
+ * @param msg message description of the error
+ * @param states replication states impacted
+ */
+ void error(String msg, ReplicationState... states);
+
+ /**
+ * Notify a fatal replication error with the associated exception.
+ *
+ * <p>Replication states have received a fatal exception and replication has failed.
+ *
+ * @param msg message description of the error
+ * @param t exception that caused the replication to fail
+ * @param states replication states impacted
+ */
+ void error(String msg, Throwable t, ReplicationState... states);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListeners.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListeners.java
new file mode 100644
index 0000000..f1db73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListeners.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+
+public class ReplicationStateListeners implements ReplicationStateListener {
+ private final DynamicSet<ReplicationStateListener> listeners;
+
+ @Inject
+ ReplicationStateListeners(DynamicSet<ReplicationStateListener> stateListeners) {
+ this.listeners = stateListeners;
+ }
+
+ @Override
+ public void warn(String msg, ReplicationState... states) {
+ for (ReplicationStateListener listener : listeners) {
+ listener.warn(msg, states);
+ }
+ }
+
+ @Override
+ public void error(String msg, ReplicationState... states) {
+ for (ReplicationStateListener listener : listeners) {
+ listener.error(msg, states);
+ }
+ }
+
+ @Override
+ public void error(String msg, Throwable t, ReplicationState... states) {
+ for (ReplicationStateListener listener : listeners) {
+ listener.error(msg, t, states);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
new file mode 100644
index 0000000..dbe8459
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -0,0 +1,636 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.resolveNodeName;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.collect.Lists;
+import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.PluginUser;
+import com.google.gerrit.server.account.GroupBackend;
+import com.google.gerrit.server.account.GroupBackends;
+import com.google.gerrit.server.account.GroupIncludeCache;
+import com.google.gerrit.server.account.ListGroupMembership;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.permissions.ProjectPermission;
+import com.google.gerrit.server.permissions.RefPermission;
+import com.google.gerrit.server.project.NoSuchProjectException;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Provider;
+import com.google.inject.Provides;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.servlet.RequestScoped;
+import com.googlesource.gerrit.plugins.replication.RemoteSiteUser;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FilenameUtils;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+import org.slf4j.Logger;
+
+public class Source {
+ private static final Logger repLog = PullReplicationLogger.repLog;
+
+ public interface Factory {
+ Source create(SourceConfiguration config);
+ }
+
+ private final ReplicationStateListener stateLog;
+ private final Object stateLock = new Object();
+ private final Map<URIish, FetchOne> pending = new HashMap<>();
+ private final Map<URIish, FetchOne> inFlight = new HashMap<>();
+ private final FetchOne.Factory opFactory;
+ private final GitRepositoryManager gitManager;
+ private final PermissionBackend permissionBackend;
+ private final Provider<CurrentUser> userProvider;
+ private final ProjectCache projectCache;
+ private volatile ScheduledExecutorService pool;
+ private final PerThreadRequestScope.Scoper threadScoper;
+ private final SourceConfiguration config;
+ private final DynamicItem<EventDispatcher> eventDispatcher;
+
+ protected enum RetryReason {
+ TRANSPORT_ERROR,
+ COLLISION,
+ REPOSITORY_MISSING
+ }
+
+ public static class QueueInfo {
+ public final Map<URIish, FetchOne> pending;
+ public final Map<URIish, FetchOne> inFlight;
+
+ public QueueInfo(Map<URIish, FetchOne> pending, Map<URIish, FetchOne> inFlight) {
+ this.pending = ImmutableMap.copyOf(pending);
+ this.inFlight = ImmutableMap.copyOf(inFlight);
+ }
+ }
+
+ @Inject
+ protected Source(
+ Injector injector,
+ @Assisted SourceConfiguration cfg,
+ PluginUser pluginUser,
+ GitRepositoryManager gitRepositoryManager,
+ PermissionBackend permissionBackend,
+ Provider<CurrentUser> userProvider,
+ ProjectCache projectCache,
+ GroupBackend groupBackend,
+ ReplicationStateListeners stateLog,
+ GroupIncludeCache groupIncludeCache,
+ DynamicItem<EventDispatcher> eventDispatcher) {
+ config = cfg;
+ this.eventDispatcher = eventDispatcher;
+ gitManager = gitRepositoryManager;
+ this.permissionBackend = permissionBackend;
+ this.userProvider = userProvider;
+ this.projectCache = projectCache;
+ this.stateLog = stateLog;
+
+ CurrentUser remoteUser;
+ if (!cfg.getAuthGroupNames().isEmpty()) {
+ Builder<AccountGroup.UUID> builder = ImmutableSet.builder();
+ for (String name : cfg.getAuthGroupNames()) {
+ GroupReference g = GroupBackends.findExactSuggestion(groupBackend, name);
+ if (g != null) {
+ builder.add(g.getUUID());
+ addRecursiveParents(g.getUUID(), builder, groupIncludeCache);
+ } else {
+ repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
+ }
+ }
+ remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
+ } else {
+ remoteUser = pluginUser;
+ }
+
+ Injector child =
+ injector.createChildInjector(
+ new FactoryModule() {
+ @Override
+ protected void configure() {
+ bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
+ bind(PerThreadRequestScope.Propagator.class);
+
+ bind(Source.class).toInstance(Source.this);
+ bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
+ install(new FactoryModuleBuilder().build(FetchOne.Factory.class));
+ }
+
+ @Provides
+ public PerThreadRequestScope.Scoper provideScoper(
+ final PerThreadRequestScope.Propagator propagator) {
+ final RequestContext requestContext =
+ new RequestContext() {
+ @Override
+ public CurrentUser getUser() {
+ return remoteUser;
+ }
+ };
+ return new PerThreadRequestScope.Scoper() {
+ @Override
+ public <T> Callable<T> scope(Callable<T> callable) {
+ return propagator.scope(requestContext, callable);
+ }
+ };
+ }
+ });
+
+ opFactory = child.getInstance(FetchOne.Factory.class);
+ threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
+ }
+
+ private void addRecursiveParents(
+ AccountGroup.UUID g,
+ Builder<AccountGroup.UUID> builder,
+ GroupIncludeCache groupIncludeCache) {
+ for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) {
+ if (builder.build().contains(p)) {
+ continue;
+ }
+ builder.add(p);
+ addRecursiveParents(p, builder, groupIncludeCache);
+ }
+ }
+
+ public QueueInfo getQueueInfo() {
+ synchronized (stateLock) {
+ return new QueueInfo(pending, inFlight);
+ }
+ }
+
+ public void start(WorkQueue workQueue) {
+ String poolName = "ReplicateFrom-" + config.getRemoteConfig().getName();
+ pool = workQueue.createQueue(config.getPoolThreads(), poolName);
+ }
+
+ public int shutdown() {
+ int cnt = 0;
+ if (pool != null) {
+ cnt = pool.shutdownNow().size();
+ pool = null;
+ }
+ return cnt;
+ }
+
+ private boolean shouldReplicate(ProjectState state, CurrentUser user)
+ throws PermissionBackendException {
+ if (!config.replicateHiddenProjects()
+ && state.getProject().getState()
+ == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
+ return false;
+ }
+
+ // Hidden projects(permitsRead = false) should only be accessible by the project owners.
+ // READ_CONFIG is checked here because it's only allowed to project owners(ACCESS may also
+ // be allowed for other users).
+ ProjectPermission permissionToCheck =
+ state.statePermitsRead() ? ProjectPermission.ACCESS : ProjectPermission.READ_CONFIG;
+ try {
+ permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
+ return true;
+ } catch (AuthException e) {
+ return false;
+ }
+ }
+
+ private boolean shouldReplicate(
+ final Project.NameKey project, String ref, ReplicationState... states) {
+ try {
+ return threadScoper
+ .scope(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws NoSuchProjectException, PermissionBackendException {
+ ProjectState projectState;
+ try {
+ projectState = projectCache.checkedGet(project);
+ } catch (IOException e) {
+ return false;
+ }
+ if (projectState == null) {
+ throw new NoSuchProjectException(project);
+ }
+ if (!projectState.statePermitsRead()) {
+ return false;
+ }
+ if (!shouldReplicate(projectState, userProvider.get())) {
+ return false;
+ }
+ if (FetchOne.ALL_REFS.equals(ref)) {
+ return true;
+ }
+ try {
+ permissionBackend
+ .user(userProvider.get())
+ .project(project)
+ .ref(ref)
+ .check(RefPermission.READ);
+ } catch (AuthException e) {
+ return false;
+ }
+ return true;
+ }
+ })
+ .call();
+ } catch (NoSuchProjectException err) {
+ stateLog.error(String.format("source project %s not available", project), err, states);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+
+ private boolean shouldReplicate(Project.NameKey project, ReplicationState... states) {
+ try {
+ return threadScoper
+ .scope(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws NoSuchProjectException, PermissionBackendException {
+ ProjectState projectState;
+ try {
+ projectState = projectCache.checkedGet(project);
+ } catch (IOException e) {
+ return false;
+ }
+ if (projectState == null) {
+ throw new NoSuchProjectException(project);
+ }
+ return shouldReplicate(projectState, userProvider.get());
+ }
+ })
+ .call();
+ } catch (NoSuchProjectException err) {
+ stateLog.error(String.format("source project %s not available", project), err, states);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+
+ void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) {
+ schedule(project, ref, uri, state, false);
+ }
+
+ void schedule(
+ Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
+ repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
+ if (!shouldReplicate(project, ref, state)) {
+ return;
+ }
+
+ if (!config.replicatePermissions()) {
+ FetchOne e;
+ synchronized (stateLock) {
+ e = pending.get(uri);
+ }
+ if (e == null) {
+ try (Repository git = gitManager.openRepository(project)) {
+ try {
+ Ref head = git.exactRef(Constants.HEAD);
+ if (head != null
+ && head.isSymbolic()
+ && RefNames.REFS_CONFIG.equals(head.getLeaf().getName())) {
+ return;
+ }
+ } catch (IOException err) {
+ stateLog.error(String.format("cannot check type of project %s", project), err, state);
+ return;
+ }
+ } catch (IOException err) {
+ stateLog.error(String.format("source project %s not available", project), err, state);
+ return;
+ }
+ }
+ }
+
+ synchronized (stateLock) {
+ FetchOne e = pending.get(uri);
+ if (e == null) {
+ e = opFactory.create(project, uri);
+ addRef(e, ref);
+ e.addState(ref, state);
+ pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+ pending.put(uri, e);
+ } else if (!e.getRefs().contains(ref)) {
+ addRef(e, ref);
+ e.addState(ref, state);
+ }
+ state.increaseFetchTaskCount(project.get(), ref);
+ repLog.info("scheduled {}:{} => {} to run after {}s", e, ref, project, config.getDelay());
+ }
+ }
+
+ void fetchWasCanceled(FetchOne fetchOp) {
+ synchronized (stateLock) {
+ URIish uri = fetchOp.getURI();
+ pending.remove(uri);
+ }
+ }
+
+ private void addRef(FetchOne e, String ref) {
+ e.addRef(ref);
+ postReplicationScheduledEvent(e, ref);
+ }
+
+ /**
+ * It schedules again a FetchOp instance.
+ *
+ * <p>If the reason for rescheduling is to avoid a collision with an in-flight push to the same
+ * URI, we don't mark the operation as "retrying," and we schedule using the replication delay,
+ * rather than the retry delay. Otherwise, the operation is marked as "retrying" and scheduled to
+ * run following the minutes count determined by class attribute retryDelay.
+ *
+ * <p>In case the FetchOp instance to be scheduled has same URI than one marked as "retrying," it
+ * adds to the one pending the refs list of the parameter instance.
+ *
+ * <p>In case the FetchOp instance to be scheduled has the same URI as one pending, but not marked
+ * "retrying," it indicates the one pending should be canceled when it starts executing, removes
+ * it from pending list, and adds its refs to the parameter instance. The parameter instance is
+ * scheduled for retry.
+ *
+ * <p>Notice all operations to indicate a FetchOp should be canceled, or it is retrying, or
+ * remove/add it from/to pending Map should be protected by synchronizing on the stateLock object.
+ *
+ * @param fetchOp The FetchOp instance to be scheduled.
+ */
+ void reschedule(FetchOne fetchOp, RetryReason reason) {
+ synchronized (stateLock) {
+ URIish uri = fetchOp.getURI();
+ FetchOne pendingFetchOp = pending.get(uri);
+
+ if (pendingFetchOp != null) {
+ // There is one FetchOp instance already pending to same URI.
+
+ if (pendingFetchOp.isRetrying()) {
+ // The one pending is one already retrying, so it should
+ // maintain it and add to it the refs of the one passed
+ // as parameter to the method.
+
+ // This scenario would happen if a FetchOp has started running
+ // and then before it failed due transport exception, another
+ // one to same URI started. The first one would fail and would
+ // be rescheduled, being present in pending list. When the
+ // second one fails, it will also be rescheduled and then,
+ // here, find out replication to its URI is already pending
+ // for retry (blocking).
+ pendingFetchOp.addRefs(fetchOp.getRefs());
+ pendingFetchOp.addStates(fetchOp.getStates());
+ fetchOp.removeStates();
+
+ } else {
+ // The one pending is one that is NOT retrying, it was just
+ // scheduled believing no problem would happen. The one pending
+ // should be canceled, and this is done by setting its canceled
+ // flag, removing it from pending list, and adding its refs to
+ // the fetchOp instance that should then, later, in this method,
+ // be scheduled for retry.
+
+ // Notice that the FetchOp found pending will start running and,
+ // when notifying it is starting (with pending lock protection),
+ // it will see it was canceled and then it will do nothing with
+ // pending list and it will not execute its run implementation.
+ pendingFetchOp.canceledByReplication();
+ pending.remove(uri);
+
+ fetchOp.addRefs(pendingFetchOp.getRefs());
+ fetchOp.addStates(pendingFetchOp.getStates());
+ pendingFetchOp.removeStates();
+ }
+ }
+
+ if (pendingFetchOp == null || !pendingFetchOp.isRetrying()) {
+ pending.put(uri, fetchOp);
+ switch (reason) {
+ case COLLISION:
+ pool.schedule(fetchOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+ break;
+ case TRANSPORT_ERROR:
+ case REPOSITORY_MISSING:
+ default:
+ RefUpdate.Result trackingRefUpdate =
+ RetryReason.REPOSITORY_MISSING.equals(reason)
+ ? RefUpdate.Result.NOT_ATTEMPTED
+ : RefUpdate.Result.REJECTED_OTHER_REASON;
+ postReplicationFailedEvent(fetchOp, trackingRefUpdate);
+ if (fetchOp.setToRetry()) {
+ postReplicationScheduledEvent(fetchOp);
+ pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES);
+ } else {
+ fetchOp.canceledByReplication();
+ pending.remove(uri);
+ stateLog.error(
+ "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
+ fetchOp.getStatesAsArray());
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ boolean requestRunway(FetchOne op) {
+ synchronized (stateLock) {
+ if (op.wasCanceled()) {
+ return false;
+ }
+ pending.remove(op.getURI());
+ if (inFlight.containsKey(op.getURI())) {
+ return false;
+ }
+ inFlight.put(op.getURI(), op);
+ }
+ return true;
+ }
+
+ void notifyFinished(FetchOne op) {
+ synchronized (stateLock) {
+ inFlight.remove(op.getURI());
+ }
+ }
+
+ boolean wouldFetchProject(Project.NameKey project) {
+ if (!shouldReplicate(project)) {
+ return false;
+ }
+
+ // by default fetch all projects
+ List<String> projects = config.getProjects();
+ if (projects.isEmpty()) {
+ return true;
+ }
+
+ return (new ReplicationFilter(projects)).matches(project);
+ }
+
+ public boolean isSingleProjectMatch() {
+ return config.isSingleProjectMatch();
+ }
+
+ List<URIish> getURIs(Project.NameKey project, String urlMatch) {
+ List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
+ for (URIish uri : config.getRemoteConfig().getURIs()) {
+ if (matches(uri, urlMatch)) {
+ String name = project.get();
+ if (needsUrlEncoding(uri)) {
+ name = encode(name);
+ }
+ String remoteNameStyle = config.getRemoteNameStyle();
+ if (remoteNameStyle.equals("dash")) {
+ name = name.replace("/", "-");
+ } else if (remoteNameStyle.equals("underscore")) {
+ name = name.replace("/", "_");
+ } else if (remoteNameStyle.equals("basenameOnly")) {
+ name = FilenameUtils.getBaseName(name);
+ } else if (!remoteNameStyle.equals("slash")) {
+ repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
+ }
+ String replacedPath = replaceName(uri.getPath(), name, isSingleProjectMatch());
+ if (replacedPath != null) {
+ uri = uri.setPath(replacedPath);
+ r.add(uri);
+ }
+ }
+ }
+ return r;
+ }
+
+ static boolean needsUrlEncoding(URIish uri) {
+ return "http".equalsIgnoreCase(uri.getScheme())
+ || "https".equalsIgnoreCase(uri.getScheme())
+ || "amazon-s3".equalsIgnoreCase(uri.getScheme());
+ }
+
+ static String encode(String str) {
+ try {
+ // Some cleanup is required. The '/' character is always encoded as %2F
+ // however remote servers will expect it to be not encoded as part of the
+ // path used to the repository. Space is incorrectly encoded as '+' for this
+ // context. In the path part of a URI space should be %20, but in form data
+ // space is '+'. Our cleanup replace fixes these two issues.
+ return URLEncoder.encode(str, "UTF-8").replaceAll("%2[fF]", "/").replace("+", "%20");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ ImmutableList<String> getAdminUrls() {
+ return config.getAdminUrls();
+ }
+
+ ImmutableList<String> getUrls() {
+ return config.getUrls();
+ }
+
+ ImmutableList<String> getAuthGroupNames() {
+ return config.getAuthGroupNames();
+ }
+
+ ImmutableList<String> getProjects() {
+ return config.getProjects();
+ }
+
+ int getLockErrorMaxRetries() {
+ return config.getLockErrorMaxRetries();
+ }
+
+ String getRemoteConfigName() {
+ return config.getRemoteConfig().getName();
+ }
+
+ public int getMaxRetries() {
+ return config.getMaxRetries();
+ }
+
+ private static boolean matches(URIish uri, String urlMatch) {
+ if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
+ return true;
+ }
+ return uri.toString().contains(urlMatch);
+ }
+
+ private void postReplicationScheduledEvent(FetchOne fetchOp) {
+ postReplicationScheduledEvent(fetchOp, null);
+ }
+
+ private void postReplicationScheduledEvent(FetchOne fetchOp, String inputRef) {
+ Set<String> refs = inputRef == null ? fetchOp.getRefs() : ImmutableSet.of(inputRef);
+ Project.NameKey project = fetchOp.getProjectNameKey();
+ String targetNode = resolveNodeName(fetchOp.getURI());
+ for (String ref : refs) {
+ FetchReplicationScheduledEvent event =
+ new FetchReplicationScheduledEvent(project.get(), ref, targetNode);
+ try {
+ eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
+ } catch (PermissionBackendException e) {
+ repLog.error("error posting event", e);
+ }
+ }
+ }
+
+ private void postReplicationFailedEvent(FetchOne fetchOp, RefUpdate.Result result) {
+ Project.NameKey project = fetchOp.getProjectNameKey();
+ String sourceNode = resolveNodeName(fetchOp.getURI());
+ for (String ref : fetchOp.getRefs()) {
+ FetchRefReplicatedEvent event =
+ new FetchRefReplicatedEvent(
+ project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
+ try {
+ eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
+ } catch (PermissionBackendException e) {
+ repLog.error("error posting event", e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
new file mode 100644
index 0000000..8ef68b0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -0,0 +1,153 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+public class SourceConfiguration implements RemoteConfiguration {
+ static final int DEFAULT_REPLICATION_DELAY = 15;
+ static final int DEFAULT_RESCHEDULE_DELAY = 3;
+ static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
+
+ private final int delay;
+ private final int rescheduleDelay;
+ private final int retryDelay;
+ private final int lockErrorMaxRetries;
+ private final ImmutableList<String> adminUrls;
+ private final int poolThreads;
+ private final boolean replicatePermissions;
+ private final boolean replicateHiddenProjects;
+ private final String remoteNameStyle;
+ private final ImmutableList<String> urls;
+ private final ImmutableList<String> projects;
+ private final ImmutableList<String> authGroupNames;
+ private final RemoteConfig remoteConfig;
+ private final int maxRetries;
+ private int slowLatencyThreshold;
+
+ public SourceConfiguration(RemoteConfig remoteConfig, Config cfg) {
+ this.remoteConfig = remoteConfig;
+ String name = remoteConfig.getName();
+ urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url"));
+ delay = Math.max(0, getInt(remoteConfig, cfg, "replicationdelay", DEFAULT_REPLICATION_DELAY));
+ rescheduleDelay =
+ Math.max(3, getInt(remoteConfig, cfg, "rescheduledelay", DEFAULT_RESCHEDULE_DELAY));
+ projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
+ adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
+ retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+ poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
+ authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
+ lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
+
+ replicatePermissions = cfg.getBoolean("remote", name, "replicatePermissions", true);
+ replicateHiddenProjects = cfg.getBoolean("remote", name, "replicateHiddenProjects", false);
+ remoteNameStyle =
+ MoreObjects.firstNonNull(cfg.getString("remote", name, "remoteNameStyle"), "slash");
+ maxRetries =
+ getInt(
+ remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+ slowLatencyThreshold =
+ (int)
+ ConfigUtil.getTimeUnit(
+ cfg,
+ "remote",
+ remoteConfig.getName(),
+ "slowLatencyThreshold",
+ DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+ TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int getDelay() {
+ return delay;
+ }
+
+ @Override
+ public int getRescheduleDelay() {
+ return rescheduleDelay;
+ }
+
+ @Override
+ public int getRetryDelay() {
+ return retryDelay;
+ }
+
+ public int getPoolThreads() {
+ return poolThreads;
+ }
+
+ public int getLockErrorMaxRetries() {
+ return lockErrorMaxRetries;
+ }
+
+ @Override
+ public ImmutableList<String> getUrls() {
+ return urls;
+ }
+
+ @Override
+ public ImmutableList<String> getAdminUrls() {
+ return adminUrls;
+ }
+
+ @Override
+ public ImmutableList<String> getProjects() {
+ return projects;
+ }
+
+ @Override
+ public ImmutableList<String> getAuthGroupNames() {
+ return authGroupNames;
+ }
+
+ @Override
+ public String getRemoteNameStyle() {
+ return remoteNameStyle;
+ }
+
+ @Override
+ public boolean replicatePermissions() {
+ return replicatePermissions;
+ }
+
+ public boolean replicateHiddenProjects() {
+ return replicateHiddenProjects;
+ }
+
+ @Override
+ public RemoteConfig getRemoteConfig() {
+ return remoteConfig;
+ }
+
+ @Override
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
+ return cfg.getInt("remote", rc.getName(), name, defValue);
+ }
+
+ @Override
+ public int getSlowLatencyThreshold() {
+ return slowLatencyThreshold;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
new file mode 100644
index 0000000..f24d25e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
@@ -0,0 +1,181 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class SourcesCollection implements ReplicationSources, ReplicationConfigValidator {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private final Source.Factory sourceFactory;
+ private volatile List<Source> sources;
+ private boolean shuttingDown;
+
+ @Inject
+ public SourcesCollection(
+ ReplicationFileBasedConfig replicationConfig, Source.Factory sourceFactory, EventBus eventBus)
+ throws ConfigInvalidException {
+ this.sourceFactory = sourceFactory;
+ this.sources = allSources(sourceFactory, validateConfig(replicationConfig));
+ eventBus.register(this);
+ }
+
+ @Override
+ public List<Source> getAll() {
+ return sources.stream().filter(Objects::nonNull).collect(toList());
+ }
+
+ private List<Source> allSources(
+ Source.Factory sourceFactory, List<RemoteConfiguration> sourceConfigurations) {
+ return sourceConfigurations.stream()
+ .filter((c) -> c instanceof SourceConfiguration)
+ .map((c) -> (SourceConfiguration) c)
+ .map(sourceFactory::create)
+ .collect(toList());
+ }
+
+ @Override
+ public void startup(WorkQueue workQueue) {
+ shuttingDown = false;
+ for (Source cfg : sources) {
+ cfg.start(workQueue);
+ }
+ }
+
+ /* shutdown() cannot be set as a synchronized method because
+ * it may need to wait for pending events to complete;
+ * e.g. when enabling the drain of replication events before
+ * shutdown.
+ *
+ * As a rule of thumb for synchronized methods, because they
+ * implicitly define a critical section and associated lock,
+ * they should never hold waiting for another resource, otherwise
+ * the risk of deadlock is very high.
+ *
+ * See more background about deadlocks, what they are and how to
+ * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+ */
+ @Override
+ public int shutdown() {
+ synchronized (this) {
+ shuttingDown = true;
+ }
+
+ int discarded = 0;
+ for (Source cfg : sources) {
+ discarded += cfg.shutdown();
+ }
+ return discarded;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return sources.isEmpty();
+ }
+
+ @Subscribe
+ public synchronized void onReload(List<RemoteConfiguration> sourceConfigurations) {
+ if (shuttingDown) {
+ logger.atWarning().log("Shutting down: configuration reload ignored");
+ return;
+ }
+
+ sources = allSources(sourceFactory, sourceConfigurations);
+ logger.atInfo().log("Configuration reloaded: %d sources", getAll().size());
+ }
+
+ @Override
+ public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig)
+ throws ConfigInvalidException {
+
+ try {
+ newConfig.getConfig().load();
+ } catch (IOException e) {
+ throw new ConfigInvalidException(
+ String.format("Cannot read %s: %s", newConfig.getConfig().getFile(), e.getMessage()), e);
+ }
+
+ ImmutableList.Builder<RemoteConfiguration> sourceConfigs = ImmutableList.builder();
+ for (RemoteConfig c : allFetchRemotes(newConfig.getConfig())) {
+ if (c.getURIs().isEmpty()) {
+ continue;
+ }
+
+ // fetch source has to be specified.
+ if (c.getFetchRefSpecs().isEmpty()) {
+ throw new ConfigInvalidException(
+ String.format("You must specify a valid refSpec for this remote"));
+ }
+
+ SourceConfiguration sourceConfig = new SourceConfiguration(c, newConfig.getConfig());
+
+ if (!sourceConfig.isSingleProjectMatch()) {
+ for (URIish u : c.getURIs()) {
+ if (u.getPath() == null || !u.getPath().contains("${name}")) {
+ throw new ConfigInvalidException(
+ String.format("remote.%s.url \"%s\" lacks ${name} placeholder", c.getName(), u));
+ }
+ }
+ }
+ sourceConfigs.add(sourceConfig);
+ }
+ return sourceConfigs.build();
+ }
+
+ private static List<RemoteConfig> allFetchRemotes(FileBasedConfig cfg)
+ throws ConfigInvalidException {
+
+ Set<String> names = cfg.getSubsections("remote");
+ List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+ for (String name : names) {
+ try {
+ final RemoteConfig remoteConfig = new RemoteConfig(cfg, name);
+ if (!remoteConfig.getFetchRefSpecs().isEmpty()) {
+ result.add(remoteConfig);
+ } else {
+ logger.atWarning().log(
+ "Skip loading of remote [remote \"%s\"], since it has no 'fetch' configuration",
+ name);
+ }
+ } catch (URISyntaxException e) {
+ throw new ConfigInvalidException(
+ String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
+ }
+ }
+ return result;
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SshModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SshModule.java
new file mode 100644
index 0000000..9c89781
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SshModule.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.sshd.PluginCommandModule;
+
+class SshModule extends PluginCommandModule {
+ @Override
+ protected void configureCommands() {
+ command(StartFetchCommand.class);
+ command(ListCommand.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
new file mode 100644
index 0000000..bec0ffa
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.sshd.CommandMetaData;
+import com.google.gerrit.sshd.SshCommand;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.Option;
+
+@RequiresCapability(StartReplicationCapability.START_REPLICATION)
+@CommandMetaData(
+ name = "start",
+ description = "Start replication for specific project or all projects")
+public final class StartFetchCommand extends SshCommand {
+ @Inject private PullReplicationStateLogger fetchStateLog;
+
+ @Option(name = "--all", usage = "fetch all known projects")
+ private boolean all;
+
+ @Option(name = "--url", metaVar = "PATTERN", usage = "pattern to match URL on")
+ private String urlMatch;
+
+ @Option(name = "--wait", usage = "wait for replication to finish before exiting")
+ private boolean wait;
+
+ @Option(name = "--now", usage = "start replication without waiting for replicationDelay")
+ private boolean now;
+
+ @Argument(index = 0, multiValued = true, metaVar = "PATTERN", usage = "project name pattern")
+ private List<String> projectPatterns = new ArrayList<>(2);
+
+ @Inject private FetchAll.Factory fetchFactory;
+
+ @Inject private ReplicationState.Factory fetchReplicationStateFactory;
+
+ @Override
+ protected void run() throws Failure {
+ if (all && projectPatterns.size() > 0) {
+ throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
+ }
+
+ ReplicationState state =
+ fetchReplicationStateFactory.create(new FetchResultProcessing.CommandProcessing(this));
+ Future<?> future = null;
+
+ ReplicationFilter projectFilter;
+
+ if (all) {
+ projectFilter = ReplicationFilter.all();
+ } else {
+ projectFilter = new ReplicationFilter(projectPatterns);
+ }
+
+ future = fetchFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
+
+ if (wait) {
+ if (future != null) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ fetchStateLog.error(
+ "Thread was interrupted while waiting for FetchAll operation to finish", e, state);
+ return;
+ } catch (ExecutionException e) {
+ fetchStateLog.error("An exception was thrown in FetchAll operation", e, state);
+ return;
+ }
+ }
+
+ if (state.hasFetchTask()) {
+ try {
+ state.waitForReplication();
+ } catch (InterruptedException e) {
+ writeStdErrSync("We are interrupted while waiting replication to complete");
+ }
+ } else {
+ writeStdOutSync("Nothing to replicate");
+ }
+ }
+ }
+
+ public void writeStdOutSync(String message) {
+ if (wait) {
+ synchronized (stdout) {
+ stdout.println(message);
+ stdout.flush();
+ }
+ }
+ }
+
+ public void writeStdErrSync(String message) {
+ if (wait) {
+ synchronized (stderr) {
+ stderr.println(message);
+ stderr.flush();
+ }
+ }
+ }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
new file mode 100644
index 0000000..46ec740
--- /dev/null
+++ b/src/main/resources/Documentation/about.md
@@ -0,0 +1,10 @@
+This plugin can automatically mirror repositories from other systems.
+
+Typically replication should be done over SSH, with a passwordless
+public/private key pair. On a trusted network it is also possible to
+use replication over the insecure (but much faster due to no
+authentication overhead or encryption) git:// protocol, by enabling
+the `upload-pack` service on the receiving system, but this
+configuration is not recommended. It is also possible to specify a
+local path as replication source. This makes e.g. sense if a network
+share is mounted to which the repositories should be replicated from.
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
new file mode 100644
index 0000000..7142ae1
--- /dev/null
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -0,0 +1,74 @@
+@PLUGIN@ list
+==============
+
+NAME
+----
+@PLUGIN@ list - List remote destination information.
+
+SYNOPSIS
+--------
+```
+ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list
+ [--remote <PATTERN>]
+ [--detail]
+ [--json]
+```
+
+DESCRIPTION
+-----------
+Lists the name and URL for remote sources.
+
+ACCESS
+------
+Caller must be a member of the privileged 'Administrators' group.
+
+SCRIPTING
+---------
+This command is intended to be used in scripts.
+
+OPTIONS
+-------
+
+`--remote <PATTERN>`
+: Only print information for sources whose remote name matches
+ the `PATTERN`.
+
+`--detail`
+: Print additional detailed information: AdminUrl, AuthGroup, Project
+ and queue (pending and in-flight).
+
+`--json`
+: Output in json format.
+
+EXAMPLES
+--------
+List all sources:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list
+```
+
+List all sources detail information:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --detail
+```
+
+List all sources detail information in json format:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --detail --json
+```
+
+List sources whose name contains mirror:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --remote mirror
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --remote ^.*mirror.*
+```
+
+SEE ALSO
+--------
+
+* [Replication Configuration](config.md)
+* [Access Control](../../../Documentation/access-control.html)
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
new file mode 100644
index 0000000..51d0a53
--- /dev/null
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -0,0 +1,100 @@
+@PLUGIN@ start
+==============
+
+NAME
+----
+@PLUGIN@ start - Manually trigger replication, to recover a node
+
+SYNOPSIS
+--------
+```
+ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
+ [--now]
+ [--wait]
+ [--url <PATTERN>]
+ {--all | <PROJECT PATTERN> ...}
+```
+
+DESCRIPTION
+-----------
+Schedules pull replication of the specified projects to all configured
+replication sources, or only those whose URLs match the pattern
+given on the command line.
+
+If you get message "Nothing to replicate" while running this command,
+it may be caused by several reasons, such as you give a wrong url
+pattern in command options, or the authGroup in the @PLUGIN@.config
+has no read access for the replicated projects.
+
+If one or several project patterns are supplied, only those projects
+conforming to both this/these pattern(s) and those defined in
+@PLUGIN@.config for the target host(s) are queued for replication.
+
+The patterns follow the same format as those in @PLUGIN@.config,
+where wildcard or regular expression patterns can be given.
+Regular expression patterns must match a complete project name to be
+considered a match.
+
+A regular expression pattern starts with `^` and a wildcard pattern ends
+with a `*`. If the pattern starts with `^` and ends with `*`, it is
+treated as a regular expression.
+
+ACCESS
+------
+Caller must be a member of the privileged 'Administrators' group,
+or have been granted the 'Start Replication' plugin-owned capability.
+
+SCRIPTING
+---------
+This command is intended to be used in scripts.
+
+OPTIONS
+-------
+
+`--now`
+: Start replicating right away without waiting the per remote
+ replication delay.
+
+`--wait`
+: Wait for replication to finish before exiting.
+
+`--all`
+: Schedule replication for all projects.
+
+`--url <PATTERN>`
+: Replicate only from replication sources whose URL contains
+ the substring `PATTERN`. This can be useful to replicate
+ only from a previously down node, which has been brought back
+ online.
+
+EXAMPLES
+--------
+Replicate every project, from every configured remote:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --all
+```
+
+Replicate only from `srv2` now that it is back online:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url srv2 --all
+```
+
+Replicate only projects located in the `documentation` subdirectory:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/*
+```
+
+Replicate projects whose path includes a folder named `vendor` to host slave1:
+
+```
+ $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*)
+```
+
+SEE ALSO
+--------
+
+* [Replication Configuration](config.md)
+* [Access Control](../../../Documentation/access-control.html)
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
new file mode 100644
index 0000000..aa59a2d
--- /dev/null
+++ b/src/main/resources/Documentation/config.md
@@ -0,0 +1,329 @@
+@PLUGIN@ Configuration
+=========================
+
+Enabling Replication
+--------------------
+
+If replicating over SSH, ensure the host key of the
+remote system(s) is already in the Gerrit user's `~/.ssh/known_hosts`
+file. The easiest way to add the host key is to connect once by hand
+with the command line:
+
+```
+ sudo su -c 'ssh mirror1.us.some.org echo' gerrit2
+```
+
+<a name="example_file">
+Next, create `$site_path/etc/@PLUGIN@.config` as a Git-style config
+file, for example to replicate in parallel from four different hosts:</a>
+
+```
+ [remote "host-one"]
+ url = gerrit2@host-one.example.com:/some/path/${name}.git
+
+ [remote "pubmirror"]
+ url = mirror1.us.some.org:/pub/git/${name}.git
+ url = mirror2.us.some.org:/pub/git/${name}.git
+ url = mirror3.us.some.org:/pub/git/${name}.git
+ fetch = +refs/heads/*:refs/heads/*
+ fetch = +refs/tags/*:refs/tags/*
+ threads = 3
+ authGroup = Public Mirror Group
+ authGroup = Second Public Mirror Group
+```
+
+Then reload the replication plugin to pick up the new configuration:
+
+```
+ ssh -p 29418 localhost gerrit plugin reload @PLUGIN@
+```
+
+To manually trigger replication at runtime, see
+SSH command [start](cmd-start.md).
+
+File `@PLUGIN@.config`
+-------------------------
+
+The optional file `$site_path/etc/@PLUGIN@.config` is a Git-style
+config file that controls the replication settings for the replication
+plugin.
+
+The file is composed of one or more `remote` sections, each remote
+section provides common configuration settings for one or more
+source URLs.
+
+Each remote section uses its own thread pool. If fetching from
+multiple remotes, over differing types of network connections
+(e.g. LAN and also public Internet), its a good idea to put them
+into different remote sections, so that replication to the slower
+connection does not starve out the faster local one. The example
+file above does this.
+
+In the keys below, the `NAME` portion is unused by this plugin, but
+must be unique to distinguish the different sections if more than one
+remote section appears in the file.
+
+gerrit.replicateOnStartup
+: If true, replicates from all remotes on startup to ensure they
+ are in-sync with this server. By default, false.
+
+gerrit.autoReload
+: If true, automatically reloads replication sources and settings
+ after `replication.config` file is updated, without the need to restart
+ the replication plugin. When the reload takes place, pending replication
+ events based on old settings are discarded. By default, false.
+
+replication.lockErrorMaxRetries
+: Number of times to retry a replication operation if a lock
+ error is detected.
+
+ If two or more replication operations (to the same GIT and Ref)
+ are scheduled at approximately the same time (and end up on different
+ replication threads), there is a large probability that the last
+ fetch to complete will fail with a remote "failure to lock" error.
+ This option allows Gerrit to retry the replication fetch when the
+ "failure to lock" error is detected.
+
+ A good value would be 3 retries or less, depending on how often
+ you see lockError collisions in your server logs. A too highly set
+ value risks keeping around the replication operations in the queue
+ for a long time, and the number of items in the queue will increase
+ with time.
+
+ Normally Gerrit will succeed with the replication during its first
+ retry, but in certain edge cases (e.g. a mirror introduces a ref
+ namespace with the same name as a branch on the master) the retry
+ will never succeed.
+
+ The issue can also be mitigated somewhat by increasing the
+ replicationDelay.
+
+ Default: 0 (disabled, i.e. never retry)
+
+replication.maxRetries
+: Maximum number of times to retry a fetch operation that previously
+ failed.
+
+ When a fetch operation reaches its maximum number of retries,
+ the replication event is discarded from the queue.
+
+ Can be overridden at remote-level by setting replicationMaxRetries.
+
+ By default, fetchs are retried indefinitely.
+
+remote.NAME.url
+: Address of the remote server to fetch from. Multiple URLs may be
+ specified within a single remote block, listing different
+ destinations which share the same settings. Assuming
+ sufficient threads in the thread pool, Gerrit fetchs from all
+ URLs in parallel, using one thread per URL.
+
+ TODO: This doesn't apply here: you can only replicate from one url
+
+ Within each URL value the magic placeholder `${name}` is
+ replaced with the Gerrit project name. This is a Gerrit
+ specific extension to the otherwise standard Git URL syntax
+ and it must be included in each URL so that Gerrit can figure
+ out where each project needs to be replicated. `${name}` may
+ only be omitted if the remote refers to a single repository
+ (i.e.: Exactly one [remote.NAME.projects][3] and that name's
+ value is a single project match.).
+
+ See [git fetch][1] for details on Git URL syntax.
+
+[1]: http://www.git-scm.com/docs/git-fetch#URLS
+[3]: #remote.NAME.projects
+
+remote.NAME.uploadpack
+: Path of the `git-upload-pack` executable on the remote system,
+ if using the SSH transport.
+
+ Defaults to `git-upload-pack`.
+
+remote.NAME.fetch
+: Standard Git refspec denoting what should be replicated.
+ Setting this to `+refs/heads/*:refs/heads/*` would mirror only
+ the active branches, but not the change refs under
+ `refs/changes/`, or the tags under `refs/tags/`.
+
+ Multiple fetch keys can be supplied, to specify multiple
+ patterns to match against. In the [example above][2], remote
+ "pubmirror" uses two fetch keys to match both `refs/heads/*`
+ and `refs/tags/*`, but excludes all others, including
+ `refs/changes/*`.
+
+ Note that the `refs/meta/config` branch is only replicated
+ when `replicatePermissions` is true, even if the push refspec
+ is 'all refs'.
+
+[2]: #example_file
+
+remote.NAME.timeout
+: Number of seconds to wait for a network read or write to
+ complete before giving up and declaring the remote side is not
+ responding. If 0, there is no timeout, and the push client
+ waits indefinitely.
+
+ A timeout should be large enough to mostly transfer the
+ objects from the other side. 1 second may be too small for
+ larger projects, especially over a WAN link, while 10-30
+ seconds is a much more reasonable timeout value.
+
+ Defaults to 0 seconds, wait indefinitely.
+
+remote.NAME.rescheduleDelay
+: Delay when rescheduling a fetch operation due to an in-flight fetch
+ running for the same project.
+
+ Cannot be set to a value lower than 3 seconds to avoid a tight loop
+ of schedule/run which could cause 1K+ retries per second.
+
+ A configured value lower than 3 seconds will be rounded to 3 seconds.
+
+ By default, 3 seconds.
+
+remote.NAME.replicationRetry
+: Time to wait before scheduling a remote fetch operation previously
+ failed due to a remote server error.
+
+ If a remote fetch operation fails because a remote server was
+ offline, all fetch operations from the same source URL are
+ blocked, and the remote fetch is continuously retried unless
+ the replicationMaxRetries value is set.
+
+ This is a Gerrit specific extension to the Git remote block.
+
+ By default, 1 minute.
+
+remote.NAME.replicationMaxRetries
+: Maximum number of times to retry a fetch operation that previously
+ failed.
+
+ When a fetch operation reaches its maximum number of retries
+ the replication event is discarded from the queue.
+
+ This is a Gerrit specific extension to the Git remote block.
+
+ By default, use replication.maxRetries.
+
+remote.NAME.threads
+: Number of worker threads to dedicate to fetching to the
+ repositories described by this remote. Each thread can fetch
+ one project at a time, from one source URL. Scheduling
+ within the thread pool is done on a per-project basis. If a
+ remote block describes 4 URLs, allocating 4 threads in the
+ pool will permit some level of parallel fetching.
+
+ By default, 1 thread.
+
+remote.NAME.authGroup
+: Specifies the name of a group that the remote should use to
+ access the repositories. Multiple authGroups may be specified
+ within a single remote block to signify a wider access right.
+ In the project administration web interface the read access
+ can be specified for this group to control if a project should
+ be replicated or not to the remote.
+
+ By default, replicates without group control, i.e. replicates
+ everything from all remotes.
+
+remote.NAME.remoteNameStyle
+: Provides possibilities to influence the name of the source
+ repository, e.g. by replacing slashes in the `${name}`
+ placeholder.
+
+ Github and Gitorious do not permit slashes "/" in repository
+ names and will change them to dashes "-" at repository creation
+ time.
+
+ If this setting is set to "dash", slashes will be replaced with
+ dashes in the remote repository name. If set to "underscore",
+ slashes will be replaced with underscores in the repository name.
+
+ Option "basenameOnly" makes `${name}` to be only the basename
+ (the part after the last slash) of the repository path on the
+ Gerrit server, e.g. `${name}` of `foo/bar/my-repo.git` would
+ be `my-repo`.
+
+ By default, "slash", i.e. remote names will contain slashes as
+ they do in Gerrit.
+
+<a name="remote.NAME.projects">remote.NAME.projects</a>
+: Specifies which repositories should be replicated from the
+ remote. It can be provided more than once, and supports three
+ formats: regular expressions, wildcard matching, and single
+ project matching. All three formats match case-sensitive.
+
+ Values starting with a caret `^` are treated as regular
+ expressions. `^foo/(bar|baz)` would match the projects
+ `foo/bar`, and `foo/baz`. Regular expressions have to fully
+ match the project name. So the above example would not match
+ `foo/bar2`, while `^foo/(bar|baz).*` would.
+
+ Projects may be excluded from replication by using a regular
+ expression with inverse match. `^(?:(?!PATTERN).)*$` will
+ exclude any project that matches.
+
+ Values that are not regular expressions and end in `*` are
+ treated as wildcard matches. Wildcards match projects whose
+ name agrees from the beginning until the trailing `*`. So
+ `foo/b*` would match the projects `foo/b`, `foo/bar`, and
+ `foo/baz`, but neither `foobar`, nor `bar/foo/baz`.
+
+ Values that are neither regular expressions nor wildcards are
+ treated as single project matches. So `foo/bar` matches only
+ the project `foo/bar`, but no other project.
+
+ By default, replicates without matching, i.e. replicates
+ everything from all remotes.
+
+File `secure.config`
+--------------------
+
+The optional file `$site_path/secure.config` is a Git-style config
+file that provides secure values that should not be world-readable,
+such as passwords. Passwords for HTTP remotes can be obtained from
+this file.
+
+remote.NAME.username
+: Username to use for HTTP authentication on this remote, if not
+ given in the URL.
+
+remote.NAME.password
+: Password to use for HTTP authentication on this remote.
+
+File `~/.ssh/config`
+--------------------
+
+If present, Gerrit reads and caches `~/.ssh/config` at startup, and
+supports most SSH configuration options. For example:
+
+```
+ Host host-one.example.com
+ IdentityFile ~/.ssh/id_hostone
+ PreferredAuthentications publickey
+
+ Host mirror*.us.some.org
+ User mirror-updater
+ IdentityFile ~/.ssh/id_pubmirror
+ PreferredAuthentications publickey
+```
+
+Supported options:
+
+ * Host
+ * Hostname
+ * User
+ * Port
+ * IdentityFile
+ * PreferredAuthentications
+ * StrictHostKeyChecking
+
+SSH authentication must be by passwordless public key, as there is no
+facility to read passphrases on startup or passwords during the SSH
+connection setup, and SSH agents are not supported from Java.
+
+Host keys for any destination SSH servers must appear in the user's
+`~/.ssh/known_hosts` file, and must be added in advance, before Gerrit
+starts. If a host key is not listed, Gerrit will be unable to connect
+to that destination, and replication to that URL will fail.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
new file mode 100644
index 0000000..f0f1c54
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
@@ -0,0 +1,88 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
+import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FetchGitUpdateProcessingTest {
+ private EventDispatcher dispatcherMock;
+ private GitUpdateProcessing gitUpdateProcessing;
+
+ @Before
+ public void setUp() throws Exception {
+ dispatcherMock = mock(EventDispatcher.class);
+ gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
+ }
+
+ @Test
+ public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
+ FetchRefReplicatedEvent expectedEvent =
+ new FetchRefReplicatedEvent(
+ "someProject",
+ "refs/heads/master",
+ "someHost",
+ RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+
+ gitUpdateProcessing.onOneProjectReplicationDone(
+ "someProject",
+ "refs/heads/master",
+ new URIish("git://someHost/someProject.git"),
+ RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
+ }
+
+ @Test
+ public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
+ FetchRefReplicatedEvent expectedEvent =
+ new FetchRefReplicatedEvent(
+ "someProject",
+ "refs/changes/01/1/1",
+ "someHost",
+ RefFetchResult.FAILED,
+ RefUpdate.Result.REJECTED_OTHER_REASON);
+
+ gitUpdateProcessing.onOneProjectReplicationDone(
+ "someProject",
+ "refs/changes/01/1/1",
+ new URIish("git://someHost/someProject.git"),
+ RefFetchResult.FAILED,
+ RefUpdate.Result.REJECTED_OTHER_REASON);
+ verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
+ }
+
+ @Test
+ public void onAllNodesReplicated() throws PermissionBackendException {
+ FetchRefReplicationDoneEvent expectedDoneEvent =
+ new FetchRefReplicationDoneEvent("someProject", "refs/heads/master", 5);
+
+ gitUpdateProcessing.onRefReplicatedFromAllNodes("someProject", "refs/heads/master", 5);
+ verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
new file mode 100644
index 0000000..c1c0981
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2011 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.Source.encode;
+import static com.googlesource.gerrit.plugins.replication.pull.Source.needsUrlEncoding;
+
+import java.net.URISyntaxException;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+public class FetchReplicationTest {
+
+ @Test
+ public void testNeedsUrlEncoding() throws URISyntaxException {
+ assertThat(needsUrlEncoding(new URIish("http://host/path"))).isTrue();
+ assertThat(needsUrlEncoding(new URIish("https://host/path"))).isTrue();
+ assertThat(needsUrlEncoding(new URIish("amazon-s3://config/bucket/path"))).isTrue();
+
+ assertThat(needsUrlEncoding(new URIish("host:path"))).isFalse();
+ assertThat(needsUrlEncoding(new URIish("user@host:path"))).isFalse();
+ assertThat(needsUrlEncoding(new URIish("git://host/path"))).isFalse();
+ assertThat(needsUrlEncoding(new URIish("ssh://host/path"))).isFalse();
+ }
+
+ @Test
+ public void urlEncoding() {
+ assertThat(encode("foo/bar/thing")).isEqualTo("foo/bar/thing");
+ assertThat(encode("-- All Projects --")).isEqualTo("--%20All%20Projects%20--");
+ assertThat(encode("name/with a space")).isEqualTo("name/with%20a%20space");
+ assertThat(encode("name\nwith-LF")).isEqualTo("name%0Awith-LF");
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateTest.java
new file mode 100644
index 0000000..eb52493
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateTest.java
@@ -0,0 +1,222 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationStateTest {
+
+ private ReplicationState replicationState;
+ private FetchResultProcessing fetchResultProcessingMock;
+
+ @Before
+ public void setUp() throws Exception {
+ fetchResultProcessingMock = mock(FetchResultProcessing.class);
+ replicationState = new ReplicationState(fetchResultProcessingMock);
+ }
+
+ @Test
+ public void shouldNotHavePushTask() {
+ assertThat(replicationState.hasFetchTask()).isFalse();
+ }
+
+ @Test
+ public void shouldHavePushTask() {
+ replicationState.increaseFetchTaskCount("someProject", "someRef");
+ assertThat(replicationState.hasFetchTask()).isTrue();
+ }
+
+ @Test
+ public void shouldFireOneReplicationEventWhenNothingToReplicate() {
+ // actual test
+ replicationState.markAllFetchTasksScheduled();
+
+ // expected event
+ verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(0);
+ }
+
+ @Test
+ public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
+ URIish uri = new URIish("git://someHost/someRepo.git");
+
+ // actual test
+ replicationState.increaseFetchTaskCount("someProject", "someRef");
+ replicationState.markAllFetchTasksScheduled();
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "someRef",
+ uri,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+
+ // expected events
+ fetchResultProcessingMock.onOneProjectReplicationDone(
+ "someProject",
+ "someRef",
+ uri,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+
+ verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "someRef", 1);
+ verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(1);
+ }
+
+ @Test
+ public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
+ throws URISyntaxException {
+ URIish uri1 = new URIish("git://host1/someRepo.git");
+ URIish uri2 = new URIish("git://host2/someRepo.git");
+ URIish uri3 = new URIish("git://host3/someRepo.git");
+
+ // actual test
+ replicationState.increaseFetchTaskCount("someProject", "ref1");
+ replicationState.increaseFetchTaskCount("someProject", "ref1");
+ replicationState.increaseFetchTaskCount("someProject", "ref1");
+ replicationState.increaseFetchTaskCount("someProject", "ref2");
+ replicationState.increaseFetchTaskCount("someProject", "ref2");
+ replicationState.markAllFetchTasksScheduled();
+
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref1",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref1",
+ uri2,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref1",
+ uri3,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref2",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref2",
+ uri2,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+
+ // expected events
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref1",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref1",
+ uri2,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref1",
+ uri3,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref2",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref2",
+ uri2,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+
+ verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref1", 3);
+ verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref2", 2);
+ verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(5);
+ }
+
+ @Test
+ public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
+ throws URISyntaxException {
+ URIish uri1 = new URIish("git://host1/someRepo.git");
+
+ // actual test
+ replicationState.increaseFetchTaskCount("someProject", "ref1");
+ replicationState.increaseFetchTaskCount("someProject", "ref2");
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref1",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ replicationState.notifyRefReplicated(
+ "someProject",
+ "ref2",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ replicationState.markAllFetchTasksScheduled();
+
+ // expected events
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref1",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(fetchResultProcessingMock)
+ .onOneProjectReplicationDone(
+ "someProject",
+ "ref2",
+ uri1,
+ ReplicationState.RefFetchResult.SUCCEEDED,
+ RefUpdate.Result.NEW);
+ verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref1", 1);
+ verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref2", 1);
+ verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(2);
+ }
+
+ @Test
+ public void toStringRefPushResult() throws Exception {
+ assertEquals("failed", ReplicationState.RefFetchResult.FAILED.toString());
+ assertEquals("not-attempted", ReplicationState.RefFetchResult.NOT_ATTEMPTED.toString());
+ assertEquals("succeeded", ReplicationState.RefFetchResult.SUCCEEDED.toString());
+ }
+}