Merge branch 'stable-3.11' into stable-3.12
* stable-3.11:
Fix concurrent pushes to the same target URI
Change-Id: I7b5778de0a757023180e5d02f934bbf57e354936
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index f9e1081..030ba1c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -126,7 +126,6 @@
private final URIish uri;
private final Set<ImmutableSet<String>> refBatchesToPush = Sets.newConcurrentHashSet();
private boolean pushAllRefs;
- private Repository git;
private boolean isCollision;
private boolean retrying;
private int retryCount;
@@ -429,12 +428,11 @@
repLog.atInfo().log("Replication to %s started...", uri);
Timer1.Context<String> destinationContext = metrics.start(config.getName());
- try {
+ try (Repository git = gitManager.openRepository(projectName)) {
long startedAt = destinationContext.getStartTime();
long delay = NANOSECONDS.toMillis(startedAt - createdAt);
metrics.record(config.getName(), delay, retryCount);
- git = gitManager.openRepository(projectName);
- runImpl();
+ runImpl(git);
long elapsed = NANOSECONDS.toMillis(destinationContext.stop());
if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) {
@@ -505,9 +503,6 @@
stateLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray());
} finally {
pool.notifyFinished(this);
- if (git != null) {
- git.close();
- }
}
}
@@ -518,13 +513,16 @@
private void createRepository() {
if (pool.isCreateMissingRepos()) {
try {
- Ref head = git.exactRef(Constants.HEAD);
- if (createProject(projectName, head != null ? getName(head) : null)) {
- repLog.atWarning().log("Missing repository created; retry replication to %s", uri);
- pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
- } else {
- repLog.atWarning().log(
- "Missing repository could not be created when replicating %s", uri);
+ try (Repository git = gitManager.openRepository(projectName)) {
+ Ref head = git.exactRef(Constants.HEAD);
+
+ if (createProject(projectName, head != null ? getName(head) : null)) {
+ repLog.atWarning().log("Missing repository created; retry replication to %s", uri);
+ pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING);
+ } else {
+ repLog.atWarning().log(
+ "Missing repository could not be created when replicating %s", uri);
+ }
}
} catch (IOException ioe) {
stateLog.error(
@@ -549,19 +547,20 @@
return target.getName();
}
- private void runImpl() throws IOException, PermissionBackendException {
+ private void runImpl(Repository git) throws IOException, PermissionBackendException {
PushResult res;
try (Transport tn = transportFactory.open(git, uri)) {
- res = pushVia(tn);
+ res = pushVia(git, tn);
}
updateStates(res.getRemoteUpdates());
}
- private PushResult pushVia(Transport tn) throws IOException, PermissionBackendException {
+ private PushResult pushVia(Repository git, Transport tn)
+ throws IOException, PermissionBackendException {
tn.applyConfig(config);
tn.setCredentialsProvider(credentialsFactory.create(config.getName()));
- List<RemoteRefUpdate> todo = generateUpdates(tn);
+ List<RemoteRefUpdate> todo = generateUpdates(git, tn);
if (todo.isEmpty()) {
// If we have no commands selected, we have nothing to do.
// Calling JGit at this point would just redo the work we
@@ -640,7 +639,7 @@
return b ? "yes" : "no";
}
- private List<RemoteRefUpdate> generateUpdates(Transport tn)
+ private List<RemoteRefUpdate> generateUpdates(Repository git, Transport tn)
throws IOException, PermissionBackendException {
Optional<ProjectState> projectState = projectCache.get(projectName);
if (!projectState.isPresent()) {
@@ -680,14 +679,15 @@
}
List<RemoteRefUpdate> remoteUpdatesList =
- pushAllRefs ? doPushAll(tn, local) : doPushDelta(local);
+ pushAllRefs ? doPushAll(git, tn, local) : doPushDelta(git, local);
return replicationPushFilter == null || replicationPushFilter.get() == null
? remoteUpdatesList
: replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList);
}
- private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local) throws IOException {
+ private List<RemoteRefUpdate> doPushAll(Repository git, Transport tn, Map<String, Ref> local)
+ throws IOException {
List<RemoteRefUpdate> cmds = new ArrayList<>();
boolean noPerms = !pool.isReplicatePermissions();
Map<String, Ref> remote = listRemote(tn);
@@ -702,7 +702,7 @@
Ref dst = remote.get(spec.getDestination());
if (dst == null || !src.getObjectId().equals(dst.getObjectId())) {
// Doesn't exist yet, or isn't the same value, request to push.
- push(cmds, spec, src);
+ push(git, cmds, spec, src);
}
}
}
@@ -716,14 +716,15 @@
RefSpec spec = matchDst(ref.getName());
if (spec != null && !local.containsKey(spec.getSource())) {
// No longer on local side, request removal.
- delete(cmds, spec);
+ delete(git, cmds, spec);
}
}
}
return cmds;
}
- private List<RemoteRefUpdate> doPushDelta(Map<String, Ref> local) throws IOException {
+ private List<RemoteRefUpdate> doPushDelta(Repository git, Map<String, Ref> local)
+ throws IOException {
List<RemoteRefUpdate> cmds = new ArrayList<>();
boolean noPerms = !pool.isReplicatePermissions();
Set<String> refs = flattenRefBatchesToPush();
@@ -739,13 +740,13 @@
}
if (srcRef != null) {
- if (canPushRef(srcRef.getName(), noPerms)) {
- push(cmds, spec, srcRef);
+ if (canPushRef(src, noPerms)) {
+ push(git, cmds, spec, srcRef);
} else {
repLog.atFine().log("Skipping push of ref %s", srcRef.getName());
}
} else if (config.isMirror()) {
- delete(cmds, spec);
+ delete(git, cmds, spec);
}
}
}
@@ -787,13 +788,13 @@
}
@VisibleForTesting
- void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
+ void push(Repository git, List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, src, dst, force, null, null));
}
- private void delete(List<RemoteRefUpdate> cmds, RefSpec spec) throws IOException {
+ private void delete(Repository git, List<RemoteRefUpdate> cmds, RefSpec spec) throws IOException {
String dst = spec.getDestination();
boolean force = spec.isForceUpdate();
cmds.add(new RemoteRefUpdate(git, (Ref) null, dst, force, null, null));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
index 3b07f49..339123a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -262,7 +262,7 @@
isCallFinished.await(10, TimeUnit.SECONDS);
verify(transportMock, atLeastOnce()).push(any(), any());
- verify(pushOne, times(2)).push(any(), any(), any());
+ verify(pushOne, times(2)).push(any(), any(), any(), any());
}
@Test
@@ -292,7 +292,7 @@
isCallFinished.await(10, TimeUnit.SECONDS);
verify(transportMock, atLeastOnce()).push(any(), any());
- verify(pushOne, times(1)).push(any(), any(), any());
+ verify(pushOne, times(1)).push(any(), any(), any(), any());
}
@Test
@@ -325,7 +325,7 @@
ArgumentCaptor<Ref> refCaptor = ArgumentCaptor.forClass(Ref.class);
verify(transportMock, atLeastOnce()).push(any(), any());
- verify(pushOne, times(1)).push(any(), any(), refCaptor.capture());
+ verify(pushOne, times(1)).push(any(), any(), any(), refCaptor.capture());
assertThat(refCaptor.getValue().getName()).isEqualTo("refs/heads/master");
}
@@ -347,7 +347,7 @@
isCallFinished.await(10, TimeUnit.SECONDS);
verify(transportMock, times(1)).push(any(), any());
- verify(pushOne, times(1)).push(any(), any(), any());
+ verify(pushOne, times(1)).push(any(), any(), any(), any());
}
@Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
index ba4a958..11b7718 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -26,7 +26,6 @@
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.api.changes.NotifyHandling;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
-import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.LocalDiskRepositoryManager;
import com.google.inject.Inject;
import java.io.IOException;
@@ -68,8 +67,8 @@
(TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60)
+ TEST_PROJECT_CREATION_SECONDS);
- @Inject protected SitePaths sitePaths;
@Inject private ProjectOperations projectOperations;
+ @Inject private LocalDiskRepositoryManager localDiskRepositoryManager;
protected Path gitPath;
protected FileBasedConfig config;
@@ -79,7 +78,7 @@
}
protected String getProjectUri(Project.NameKey project) throws Exception {
- return ((LocalDiskRepositoryManager) repoManager)
+ return localDiskRepositoryManager
.getBasePath(project)
.resolve(project.get() + ".git")
.toString();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 4414cec..e25a5d6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -34,7 +34,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -266,30 +266,32 @@
.schedule(0, TimeUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(1);
- Executor service = Executors.newSingleThreadExecutor();
- service.execute(
- new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- state.waitForReplication();
- latch.countDown();
- } catch (Exception e) {
- // fails the test because we don't countDown
+ try (ExecutorService service = Executors.newSingleThreadExecutor()) {
+ service.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ state.waitForReplication();
+ latch.countDown();
+ } catch (Exception e) {
+ // fails the test because we don't countDown
+ }
}
- }
- });
+ });
- // Cancel the replication task
- waitUntil(() -> getProjectTasks().size() != 0);
- WorkQueue.Task<?> task = getProjectTasks().get(0);
- assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
- task.cancel(false);
+ // Cancel the replication task
+ waitUntil(() -> getProjectTasks().size() != 0);
+ WorkQueue.Task<?> task = getProjectTasks().get(0);
+ assertThat(task.getState())
+ .isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING);
+ task.cancel(false);
- // Confirm our waiting thread completed
- boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
- assertThat(receivedSignal).isTrue();
+ // Confirm our waiting thread completed
+ boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout
+ assertThat(receivedSignal).isTrue();
+ }
}
private List<WorkQueue.Task<?>> getProjectTasks() {