Merge branch 'stable-2.16' into stable-3.0
* stable-2.16:
ReplicationIT: fix flakiness of shouldReplicateNewBranch
ReplicationIT: remove dependency of ReviewDb/NoteDb
ReplicationIT: Add a test that HEAD update is replicated
ReplicationIT#shouldReplicateNewBranch: Create branch on local project
ReplicationIT: Don't swallow exceptions on failed repository operations
ReplicationIT: Fix typo in method name
AdminApi: Reintroduce return value of methods
Store replication tasks instead of ref-update events
ReplicationIT: Don't swallow exceptions on failed repository operations
ReplicationIT: Add explicit test for replication of new branch
ReplicationIT: Change test method name to reflect actual operation
Allow AdminApiFactory to be replaced dynamically
Adapt project creation to use ProjectOperations.
Change-Id: I3d9cd916be200f1935be2a60d9a4e715e4b859a3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
index 79362fd..acbf763 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AdminApi.java
@@ -19,7 +19,7 @@
public interface AdminApi {
public boolean createProject(Project.NameKey project, String head);
- public void deleteProject(Project.NameKey project);
+ public boolean deleteProject(Project.NameKey project);
- public void updateHead(Project.NameKey project, String newHead);
+ public boolean updateHead(Project.NameKey project, String newHead);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 0a6d9d2..2a329e2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -60,6 +60,7 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.servlet.RequestScoped;
import com.googlesource.gerrit.plugins.replication.ReplicationState.RefPushResult;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -105,6 +106,7 @@
private final PerThreadRequestScope.Scoper threadScoper;
private final DestinationConfiguration config;
private final DynamicItem<EventDispatcher> eventDispatcher;
+ private final Provider<ReplicationTasksStorage> replicationTasksStorage;
protected enum RetryReason {
TRANSPORT_ERROR,
@@ -134,6 +136,7 @@
ReplicationStateListeners stateLog,
GroupIncludeCache groupIncludeCache,
DynamicItem<EventDispatcher> eventDispatcher,
+ Provider<ReplicationTasksStorage> rts,
@Assisted DestinationConfiguration cfg) {
this.eventDispatcher = eventDispatcher;
gitManager = gitRepositoryManager;
@@ -141,6 +144,7 @@
this.userProvider = userProvider;
this.projectCache = projectCache;
this.stateLog = stateLog;
+ this.replicationTasksStorage = rts;
config = cfg;
CurrentUser remoteUser;
if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -398,21 +402,21 @@
}
synchronized (stateLock) {
- PushOne e = getPendingPush(uri);
- if (e == null) {
- e = opFactory.create(project, uri);
- addRef(e, ref);
- e.addState(ref, state);
+ PushOne task = getPendingPush(uri);
+ if (task == null) {
+ task = opFactory.create(project, uri);
+ addRef(task, ref);
+ task.addState(ref, state);
@SuppressWarnings("unused")
ScheduledFuture<?> ignored =
- pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
- pending.put(uri, e);
- } else if (!e.getRefs().contains(ref)) {
- addRef(e, ref);
- e.addState(ref, state);
+ pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+ pending.put(uri, task);
+ } else if (!task.getRefs().contains(ref)) {
+ addRef(task, ref);
+ task.addState(ref, state);
}
state.increasePushTaskCount(project.get(), ref);
- repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, e, config.getDelay());
+ repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay());
}
}
@@ -563,12 +567,31 @@
return RunwayStatus.allowed();
}
- void notifyFinished(PushOne op) {
+ void notifyFinished(PushOne task) {
synchronized (stateLock) {
- inFlight.remove(op.getURI());
+ inFlight.remove(task.getURI());
+ if (!task.wasCanceled()) {
+ for (String ref : task.getRefs()) {
+ if (!refHasPendingPush(task.getURI(), ref)) {
+ replicationTasksStorage
+ .get()
+ .delete(
+ new ReplicateRefUpdate(
+ task.getProjectNameKey().get(), ref, task.getURI(), getRemoteConfigName()));
+ }
+ }
+ }
}
}
+ private boolean refHasPendingPush(URIish opUri, String ref) {
+ return pushContainsRef(pending.get(opUri), ref) || pushContainsRef(inFlight.get(opUri), ref);
+ }
+
+ private boolean pushContainsRef(PushOne op, String ref) {
+ return op != null && op.getRefs().contains(ref);
+ }
+
boolean wouldPushProject(Project.NameKey project) {
if (!shouldReplicate(project)) {
return false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
index 55fbed1..aaf2b15 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -75,18 +75,20 @@
}
@Override
- public void deleteProject(Project.NameKey project) {
+ public boolean deleteProject(Project.NameKey project) {
repLog.info("Deleting project {} on {}", project, uri);
String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get()));
try {
httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext());
+ return true;
} catch (IOException e) {
repLog.error("Couldn't perform project deletion on {}", uri, e);
}
+ return false;
}
@Override
- public void updateHead(Project.NameKey project, String newHead) {
+ public boolean updateHead(Project.NameKey project, String newHead) {
repLog.info("Updating head of {} on {}", project, uri);
String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get()));
try {
@@ -95,9 +97,11 @@
new StringEntity(String.format("{\"ref\": \"%s\"}", newHead), Charsets.UTF_8.name()));
req.addHeader(new BasicHeader("Content-Type", "application/json"));
httpClient.execute(req, new HttpResponseHandler(), getContext());
+ return true;
} catch (IOException e) {
repLog.error("Couldn't perform update head on {}", uri, e);
}
+ return false;
}
private HttpClientContext getContext() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
index 9a92d4e..56cff5a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritSshApi.java
@@ -54,7 +54,7 @@
}
@Override
- public void deleteProject(Project.NameKey projectName) {
+ public boolean deleteProject(Project.NameKey projectName) {
if (!withoutDeleteProjectPlugin.contains(uri)) {
OutputStream errStream = sshHelper.newErrorBufferStream();
String cmd = "deleteproject delete --yes-really-delete --force " + projectName.get();
@@ -63,6 +63,7 @@
exitCode = execute(uri, cmd, errStream);
} catch (IOException e) {
logError("deleting", uri, errStream, cmd, e);
+ return false;
}
if (exitCode == 1) {
logger.atInfo().log(
@@ -71,17 +72,20 @@
withoutDeleteProjectPlugin.add(uri);
}
}
+ return true;
}
@Override
- public void updateHead(Project.NameKey projectName, String newHead) {
+ public boolean updateHead(Project.NameKey projectName, String newHead) {
OutputStream errStream = sshHelper.newErrorBufferStream();
String cmd = "gerrit set-head " + projectName.get() + " --new-head " + newHead;
try {
execute(uri, cmd, errStream);
} catch (IOException e) {
logError("updating HEAD of", uri, errStream, cmd, e);
+ return false;
}
+ return true;
}
private URIish toSshUri(URIish uri) throws URISyntaxException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
index 908e7e0..aa6e16c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -52,17 +52,19 @@
}
@Override
- public void deleteProject(Project.NameKey project) {
+ public boolean deleteProject(Project.NameKey project) {
try {
recursivelyDelete(new File(uri.getPath()));
repLog.info("Deleted local repository: {}", uri);
} catch (IOException e) {
repLog.error("Error deleting local repository {}:\n", uri.getPath(), e);
+ return false;
}
+ return true;
}
@Override
- public void updateHead(Project.NameKey project, String newHead) {
+ public boolean updateHead(Project.NameKey project, String newHead) {
try (Repository repo = new FileRepository(uri.getPath())) {
if (newHead != null) {
RefUpdate u = repo.updateRef(Constants.HEAD);
@@ -70,7 +72,9 @@
}
} catch (IOException e) {
repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e);
+ return false;
}
+ return true;
}
private static void recursivelyDelete(File dir) throws IOException {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
index ac0262d..8b0aa3d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/OnStartStop.java
@@ -21,7 +21,6 @@
import com.google.gerrit.server.events.EventDispatcher;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
-import com.googlesource.gerrit.plugins.replication.ReplicationState.Factory;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -32,20 +31,17 @@
private final PushAll.Factory pushAll;
private final ReplicationConfig config;
private final DynamicItem<EventDispatcher> eventDispatcher;
- private final Factory replicationStateFactory;
@Inject
protected OnStartStop(
ServerInformation srvInfo,
PushAll.Factory pushAll,
ReplicationConfig config,
- DynamicItem<EventDispatcher> eventDispatcher,
- ReplicationState.Factory replicationStateFactory) {
+ DynamicItem<EventDispatcher> eventDispatcher) {
this.srvInfo = srvInfo;
this.pushAll = pushAll;
this.config = config;
this.eventDispatcher = eventDispatcher;
- this.replicationStateFactory = replicationStateFactory;
this.pushAllFuture = Atomics.newReference();
}
@@ -53,8 +49,7 @@
public void start() {
if (srvInfo.getState() == ServerInformation.State.STARTUP
&& config.isReplicateAllOnPluginStart()) {
- ReplicationState state =
- replicationStateFactory.create(new GitUpdateProcessing(eventDispatcher.get()));
+ ReplicationState state = new ReplicationState(new GitUpdateProcessing(eventDispatcher.get()));
pushAllFuture.set(
pushAll
.create(null, ReplicationFilter.all(), state, false)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
index 6dfa117..ee9d4c0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -60,7 +60,7 @@
}
@Override
- public void deleteProject(Project.NameKey project) {
+ public boolean deleteProject(Project.NameKey project) {
String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
String cmd = "rm -rf " + quotedPath;
OutputStream errStream = sshHelper.newErrorBufferStream();
@@ -78,11 +78,13 @@
cmd,
errStream,
e);
+ return false;
}
+ return true;
}
@Override
- public void updateHead(Project.NameKey project, String newHead) {
+ public boolean updateHead(Project.NameKey project, String newHead) {
String quotedPath = QuotedString.BOURNE.quote(uri.getPath());
String cmd =
"cd " + quotedPath + " && git symbolic-ref HEAD " + QuotedString.BOURNE.quote(newHead);
@@ -101,6 +103,8 @@
cmd,
errStream,
e);
+ return false;
}
+ return true;
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
index f9a2b2c..835d068 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationModule.java
@@ -56,7 +56,6 @@
.to(StartReplicationCapability.class);
install(new FactoryModuleBuilder().build(PushAll.Factory.class));
- install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
DynamicSet.setOf(binder(), ReplicationStateListener.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 6229686..63ae355 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -25,7 +25,10 @@
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing;
import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +47,7 @@
private final WorkQueue workQueue;
private final DynamicItem<EventDispatcher> dispatcher;
private final ReplicationConfig config;
- private final ReplicationState.Factory replicationStateFactory;
- private final EventsStorage eventsStorage;
+ private final ReplicationTasksStorage replicationTasksStorage;
private volatile boolean running;
private volatile boolean replaying;
@@ -55,14 +57,12 @@
ReplicationConfig rc,
DynamicItem<EventDispatcher> dis,
ReplicationStateListeners sl,
- ReplicationState.Factory rsf,
- EventsStorage es) {
+ ReplicationTasksStorage rts) {
workQueue = wq;
dispatcher = dis;
config = rc;
stateLog = sl;
- replicationStateFactory = rsf;
- eventsStorage = es;
+ replicationTasksStorage = rts;
}
@Override
@@ -117,8 +117,7 @@
}
private void onGitReferenceUpdated(String projectName, String refName) {
- ReplicationState state =
- replicationStateFactory.create(new GitUpdateProcessing(dispatcher.get()));
+ ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
stateLog.warn("Replication plugin did not finish startup before event", state);
return;
@@ -127,9 +126,9 @@
Project.NameKey project = new Project.NameKey(projectName);
for (Destination cfg : config.getDestinations(FilterType.ALL)) {
if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) {
- String eventKey = eventsStorage.persist(projectName, refName);
- state.setEventKey(eventKey);
for (URIish uri : cfg.getURIs(project, null)) {
+ replicationTasksStorage.persist(
+ new ReplicateRefUpdate(projectName, refName, uri, cfg.getRemoteConfigName()));
cfg.schedule(project, refName, uri, state);
}
}
@@ -140,9 +139,15 @@
private void firePendingEvents() {
replaying = true;
try {
- for (EventsStorage.ReplicateRefUpdate e : eventsStorage.list()) {
- repLog.info("Firing pending event {}", e);
- onGitReferenceUpdated(e.project, e.ref);
+ Set<String> eventsReplayed = new HashSet<>();
+ replaying = true;
+ for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.list()) {
+ String eventKey = String.format("%s:%s", t.project, t.ref);
+ if (!eventsReplayed.contains(eventKey)) {
+ repLog.info("Firing pending task {}", eventKey);
+ onGitReferenceUpdated(t.project, t.ref);
+ eventsReplayed.add(eventKey);
+ }
}
} finally {
replaying = false;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
index 6f0803a..df8f3f4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationState.java
@@ -16,8 +16,6 @@
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
-import com.google.inject.assistedinject.Assisted;
-import com.google.inject.assistedinject.AssistedInject;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -26,12 +24,7 @@
public class ReplicationState {
- public interface Factory {
- ReplicationState create(PushResultProcessing processing);
- }
-
private boolean allScheduled;
- private final EventsStorage eventsStorage;
private final PushResultProcessing pushResultProcessing;
private final Lock countingLock = new ReentrantLock();
@@ -57,11 +50,7 @@
private int totalPushTasksCount;
private int finishedPushTasksCount;
- private String eventKey;
-
- @AssistedInject
- ReplicationState(EventsStorage storage, @Assisted PushResultProcessing processing) {
- eventsStorage = storage;
+ ReplicationState(PushResultProcessing processing) {
pushResultProcessing = processing;
statusByProjectRef = HashBasedTable.create();
}
@@ -86,7 +75,6 @@
URIish uri,
RefPushResult status,
RemoteRefUpdate.Status refUpdateStatus) {
- deleteEvent();
pushResultProcessing.onRefReplicatedToOneNode(project, ref, uri, status, refUpdateStatus);
RefReplicationStatus completedRefStatus = null;
@@ -116,12 +104,6 @@
}
}
- private void deleteEvent() {
- if (eventKey != null) {
- eventsStorage.delete(eventKey);
- }
- }
-
public void markAllPushTasksScheduled() {
countingLock.lock();
try {
@@ -192,8 +174,4 @@
return name().toLowerCase().replace("_", "-");
}
}
-
- public void setEventKey(String eventKey) {
- this.eventKey = eventKey;
- }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
similarity index 69%
rename from src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
rename to src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index 0efa726..a8d075d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/EventsStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -29,18 +29,28 @@
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.transport.URIish;
@Singleton
-public class EventsStorage {
+public class ReplicationTasksStorage {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public static class ReplicateRefUpdate {
- public String project;
- public String ref;
+ public final String project;
+ public final String ref;
+ public final String uri;
+ public final String remote;
+
+ public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) {
+ this.project = project;
+ this.ref = ref;
+ this.uri = uri.toASCIIString();
+ this.remote = remote;
+ }
@Override
public String toString() {
- return "ref-update " + project + ":" + ref;
+ return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote;
}
}
@@ -49,15 +59,11 @@
private final Path refUpdates;
@Inject
- EventsStorage(ReplicationConfig config) {
+ ReplicationTasksStorage(ReplicationConfig config) {
refUpdates = config.getEventsDirectory().resolve("ref-updates");
}
- public String persist(String project, String ref) {
- ReplicateRefUpdate r = new ReplicateRefUpdate();
- r.project = project;
- r.ref = ref;
-
+ public String persist(ReplicateRefUpdate r) {
String json = GSON.toJson(r) + "\n";
String eventKey = sha1(json).name();
Path file = refUpdates().resolve(eventKey);
@@ -67,20 +73,24 @@
}
try {
+ logger.atFine().log("CREATE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
Files.write(file, json.getBytes(UTF_8));
} catch (IOException e) {
- logger.atWarning().log("Couldn't persist event %s", json);
+ logger.atWarning().withCause(e).log("Couldn't persist event %s", json);
}
return eventKey;
}
- public void delete(String eventKey) {
- if (eventKey != null) {
- try {
- Files.delete(refUpdates().resolve(eventKey));
- } catch (IOException e) {
- logger.atSevere().withCause(e).log("Error while deleting event %s", eventKey);
- }
+ public void delete(ReplicateRefUpdate r) {
+ String taskJson = GSON.toJson(r) + "\n";
+ String taskKey = sha1(taskJson).name();
+ Path file = refUpdates().resolve(taskKey);
+
+ try {
+ logger.atFine().log("DELETE %s (%s:%s => %s)", file, r.project, r.ref, r.uri);
+ Files.delete(file);
+ } catch (IOException e) {
+ logger.atSevere().withCause(e).log("Error while deleting event %s", taskKey);
}
}
@@ -91,7 +101,6 @@
if (Files.isRegularFile(e)) {
String json = new String(Files.readAllBytes(e), UTF_8);
result.add(GSON.fromJson(json, ReplicateRefUpdate.class));
- Files.delete(e);
}
}
} catch (IOException e) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 20bdf65..7115d5b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -51,8 +51,6 @@
@Inject private PushAll.Factory pushFactory;
- @Inject private ReplicationState.Factory replicationStateFactory;
-
private final Object lock = new Object();
@Override
@@ -61,7 +59,7 @@
throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
}
- ReplicationState state = replicationStateFactory.create(new CommandProcessing(this));
+ ReplicationState state = new ReplicationState(new CommandProcessing(this));
Future<?> future = null;
ReplicationFilter projectFilter;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index fe800b0..7517c14 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -15,22 +15,39 @@
package com.googlesource.gerrit.plugins.replication;
import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
import com.google.common.base.Stopwatch;
+import com.google.common.flogger.FluentLogger;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.PushOneCommit.Result;
import com.google.gerrit.acceptance.TestPlugin;
import com.google.gerrit.acceptance.UseLocalDisk;
import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.extensions.api.projects.BranchInput;
import com.google.gerrit.extensions.common.ProjectInfo;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.SitePaths;
+import com.google.gson.Gson;
import com.google.inject.Inject;
+import com.google.inject.Key;
+import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
@@ -43,30 +60,46 @@
name = "replication",
sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
public class ReplicationIT extends LightweightPluginDaemonTest {
- private static final int TEST_REPLICATION_DELAY = 1;
+ private static final Optional<String> ALL_PROJECTS = Optional.empty();
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final int TEST_REPLICATION_DELAY = 5;
private static final Duration TEST_TIMEMOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 10);
@Inject private SitePaths sitePaths;
@Inject private ProjectOperations projectOperations;
+ private Path pluginDataDir;
private Path gitPath;
+ private Path storagePath;
private FileBasedConfig config;
+ private Gson GSON = new Gson();
@Override
public void setUpTestPlugin() throws Exception {
- config =
- new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
- config.save();
-
gitPath = sitePaths.site_path.resolve("git");
+ config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+ setReplicationDestination(
+ "remote1",
+ "suffix1",
+ Optional.of("not-used-project")); // Simulates a full replication.config initialization
+ config.save();
+
super.setUpTestPlugin();
+
+ pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class));
+ storagePath = pluginDataDir.resolve("ref-updates");
}
@Test
public void shouldReplicateNewProject() throws Exception {
- setReplicationDestination("foo", "replica");
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+ waitForEmptyTasks();
Project.NameKey sourceProject = projectOperations.newProject().name("foo").create();
+
+ assertThat(listReplicationTasks("refs/meta/config")).hasSize(1);
waitUntil(() -> gitPath.resolve(sourceProject + "replica.git").toFile().isDirectory());
ProjectInfo replicaProject = gApi.projects().name(sourceProject + "replica").get();
@@ -74,51 +107,159 @@
}
@Test
- public void shouldReplicateNewBranch() throws Exception {
- setReplicationDestination("foo", "replica");
-
+ public void shouldReplicateNewChangeRef() throws Exception {
Project.NameKey targetProject =
projectOperations.newProject().name(project + "replica").create();
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+ waitForEmptyTasks();
+
Result pushResult = createChange();
RevCommit sourceCommit = pushResult.getCommit();
String sourceRef = pushResult.getPatchSet().getRefName();
- waitUntil(() -> getRef(getRepo(targetProject), sourceRef) != null);
+ assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(1);
- Ref targetBranchRef = getRef(getRepo(targetProject), sourceRef);
- assertThat(targetBranchRef).isNotNull();
- assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
- }
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
- private Repository getRepo(Project.NameKey targetProject) {
- try {
- return repoManager.openRepository(targetProject);
- } catch (Exception e) {
- e.printStackTrace();
- return null;
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
}
}
- private Ref getRef(Repository repo, String branchName) {
+ @Test
+ public void shouldReplicateNewBranch() throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+ waitForEmptyTasks();
+
+ Project.NameKey targetProject =
+ projectOperations.newProject().name(project + "replica").create();
+ String newBranch = "refs/heads/mybranch";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newBranch).create(input);
+
+ assertThat(listReplicationTasks("refs/heads/(mybranch|master)")).hasSize(2);
+
+ try (Repository repo = repoManager.openRepository(targetProject);
+ Repository sourceRepo = repoManager.openRepository(project)) {
+ waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+ Ref masterRef = getRef(sourceRepo, master);
+ Ref targetBranchRef = getRef(repo, newBranch);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(masterRef.getObjectId());
+ }
+ }
+
+ @Test
+ public void shouldReplicateNewBranchToTwoRemotes() throws Exception {
+ Project.NameKey targetProject1 =
+ projectOperations.newProject().name(project + "replica1").create();
+ Project.NameKey targetProject2 =
+ projectOperations.newProject().name(project + "replica2").create();
+
+ setReplicationDestination("foo1", "replica1", ALL_PROJECTS);
+ setReplicationDestination("foo2", "replica2", ALL_PROJECTS);
+ reloadConfig();
+ waitForEmptyTasks();
+
+ Result pushResult = createChange();
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(2);
+
+ try (Repository repo1 = repoManager.openRepository(targetProject1);
+ Repository repo2 = repoManager.openRepository(targetProject2)) {
+ waitUntil(
+ () ->
+ (checkedGetRef(repo1, sourceRef) != null && checkedGetRef(repo2, sourceRef) != null));
+
+ Ref targetBranchRef1 = getRef(repo1, sourceRef);
+ assertThat(targetBranchRef1).isNotNull();
+ assertThat(targetBranchRef1.getObjectId()).isEqualTo(sourceCommit.getId());
+
+ Ref targetBranchRef2 = getRef(repo2, sourceRef);
+ assertThat(targetBranchRef2).isNotNull();
+ assertThat(targetBranchRef2.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
+ @Test
+ public void shouldCreateIndividualReplicationTasksForEveryRemoteUrlPair() throws Exception {
+ List<String> replicaSuffixes = Arrays.asList("replica1", "replica2");
+ projectOperations.newProject().name(project + "replica1").create();
+ projectOperations.newProject().name(project + "replica2").create();
+
+ setReplicationDestination("foo1", replicaSuffixes, ALL_PROJECTS);
+ setReplicationDestination("foo2", replicaSuffixes, ALL_PROJECTS);
+ reloadConfig();
+ waitForEmptyTasks();
+
+ createChange();
+
+ assertThat(listReplicationTasks("refs/changes/\\d*/\\d*/\\d*")).hasSize(4);
+ }
+
+ @Test
+ public void shouldReplicateHeadUpdate() throws Exception {
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ Project.NameKey targetProject =
+ projectOperations.newProject().name(project + "replica").create();
+ String newHead = "refs/heads/newhead";
+ String master = "refs/heads/master";
+ BranchInput input = new BranchInput();
+ input.revision = master;
+ gApi.projects().name(project.get()).branch(newHead).create(input);
+ gApi.projects().name(project.get()).head(newHead);
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, newHead) != null);
+
+ Ref targetProjectHead = getRef(repo, Constants.HEAD);
+ assertThat(targetProjectHead).isNotNull();
+ assertThat(targetProjectHead.getTarget().getName()).isEqualTo(newHead);
+ }
+ }
+
+ private Ref getRef(Repository repo, String branchName) throws IOException {
+ return repo.getRefDatabase().exactRef(branchName);
+ }
+
+ private Ref checkedGetRef(Repository repo, String branchName) {
try {
return repo.getRefDatabase().exactRef(branchName);
- } catch (IOException e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
return null;
}
}
- private void setReplicationDestination(String remoteName, String replicaSuffix)
+ private void setReplicationDestination(
+ String remoteName, String replicaSuffix, Optional<String> project) throws IOException {
+ setReplicationDestination(remoteName, Arrays.asList(replicaSuffix), project);
+ }
+
+ private void setReplicationDestination(
+ String remoteName, List<String> replicaSuffixes, Optional<String> project)
throws IOException {
- config.setString(
- "remote",
- remoteName,
- "url",
- gitPath.resolve("${name}" + replicaSuffix + ".git").toString());
+
+ List<String> replicaUrls =
+ replicaSuffixes.stream()
+ .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+ .collect(toList());
+ config.setStringList("remote", remoteName, "url", replicaUrls);
config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ project.ifPresent(prj -> config.setString("remote", remoteName, "projects", prj));
config.save();
- reloadConfig();
}
private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
@@ -131,4 +272,40 @@
private void reloadConfig() {
plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
}
+
+ private void waitForEmptyTasks() throws InterruptedException {
+ waitUntil(
+ () -> {
+ try {
+ return listReplicationTasks(".*").size() == 0;
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("Failed to list replication tasks");
+ throw new IllegalStateException(e);
+ }
+ });
+ }
+
+ private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) throws IOException {
+ Pattern refmaskPattern = Pattern.compile(refRegex);
+ List<ReplicateRefUpdate> tasks = new ArrayList<>();
+ try (DirectoryStream<Path> files = Files.newDirectoryStream(storagePath)) {
+ for (Path path : files) {
+ ReplicateRefUpdate task = readTask(path);
+ if (refmaskPattern.matcher(task.ref).matches()) {
+ tasks.add(readTask(path));
+ }
+ }
+ }
+
+ return tasks;
+ }
+
+ private ReplicateRefUpdate readTask(Path file) {
+ try (BufferedReader reader = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
+ return GSON.fromJson(reader, ReplicateRefUpdate.class);
+ } catch (Exception e) {
+ logger.atSevere().withCause(e).log("failed to read replication task %s", file);
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
index cf6715e..2767f53 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStateTest.java
@@ -32,15 +32,15 @@
private ReplicationState replicationState;
private PushResultProcessing pushResultProcessingMock;
- private EventsStorage eventsStorage;
+ private ReplicationTasksStorage eventsStorage;
@Before
public void setUp() throws Exception {
pushResultProcessingMock = createNiceMock(PushResultProcessing.class);
replay(pushResultProcessingMock);
- eventsStorage = createNiceMock(EventsStorage.class);
+ eventsStorage = createNiceMock(ReplicationTasksStorage.class);
replay(eventsStorage);
- replicationState = new ReplicationState(eventsStorage, pushResultProcessingMock);
+ replicationState = new ReplicationState(pushResultProcessingMock);
}
@Test