Merge branch 'stable-3.11' into stable-3.12 * stable-3.11: Fix concurrent pushes to the same target URI Change-Id: I7b5778de0a757023180e5d02f934bbf57e354936
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index f9e1081..030ba1c 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -126,7 +126,6 @@ private final URIish uri; private final Set<ImmutableSet<String>> refBatchesToPush = Sets.newConcurrentHashSet(); private boolean pushAllRefs; - private Repository git; private boolean isCollision; private boolean retrying; private int retryCount; @@ -429,12 +428,11 @@ repLog.atInfo().log("Replication to %s started...", uri); Timer1.Context<String> destinationContext = metrics.start(config.getName()); - try { + try (Repository git = gitManager.openRepository(projectName)) { long startedAt = destinationContext.getStartTime(); long delay = NANOSECONDS.toMillis(startedAt - createdAt); metrics.record(config.getName(), delay, retryCount); - git = gitManager.openRepository(projectName); - runImpl(); + runImpl(git); long elapsed = NANOSECONDS.toMillis(destinationContext.stop()); if (elapsed > SECONDS.toMillis(pool.getSlowLatencyThreshold())) { @@ -505,9 +503,6 @@ stateLog.error("Unexpected error during replication to " + uri, e, getStatesAsArray()); } finally { pool.notifyFinished(this); - if (git != null) { - git.close(); - } } } @@ -518,13 +513,16 @@ private void createRepository() { if (pool.isCreateMissingRepos()) { try { - Ref head = git.exactRef(Constants.HEAD); - if (createProject(projectName, head != null ? getName(head) : null)) { - repLog.atWarning().log("Missing repository created; retry replication to %s", uri); - pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING); - } else { - repLog.atWarning().log( - "Missing repository could not be created when replicating %s", uri); + try (Repository git = gitManager.openRepository(projectName)) { + Ref head = git.exactRef(Constants.HEAD); + + if (createProject(projectName, head != null ? getName(head) : null)) { + repLog.atWarning().log("Missing repository created; retry replication to %s", uri); + pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING); + } else { + repLog.atWarning().log( + "Missing repository could not be created when replicating %s", uri); + } } } catch (IOException ioe) { stateLog.error( @@ -549,19 +547,20 @@ return target.getName(); } - private void runImpl() throws IOException, PermissionBackendException { + private void runImpl(Repository git) throws IOException, PermissionBackendException { PushResult res; try (Transport tn = transportFactory.open(git, uri)) { - res = pushVia(tn); + res = pushVia(git, tn); } updateStates(res.getRemoteUpdates()); } - private PushResult pushVia(Transport tn) throws IOException, PermissionBackendException { + private PushResult pushVia(Repository git, Transport tn) + throws IOException, PermissionBackendException { tn.applyConfig(config); tn.setCredentialsProvider(credentialsFactory.create(config.getName())); - List<RemoteRefUpdate> todo = generateUpdates(tn); + List<RemoteRefUpdate> todo = generateUpdates(git, tn); if (todo.isEmpty()) { // If we have no commands selected, we have nothing to do. // Calling JGit at this point would just redo the work we @@ -640,7 +639,7 @@ return b ? "yes" : "no"; } - private List<RemoteRefUpdate> generateUpdates(Transport tn) + private List<RemoteRefUpdate> generateUpdates(Repository git, Transport tn) throws IOException, PermissionBackendException { Optional<ProjectState> projectState = projectCache.get(projectName); if (!projectState.isPresent()) { @@ -680,14 +679,15 @@ } List<RemoteRefUpdate> remoteUpdatesList = - pushAllRefs ? doPushAll(tn, local) : doPushDelta(local); + pushAllRefs ? doPushAll(git, tn, local) : doPushDelta(git, local); return replicationPushFilter == null || replicationPushFilter.get() == null ? remoteUpdatesList : replicationPushFilter.get().filter(projectName.get(), remoteUpdatesList); } - private List<RemoteRefUpdate> doPushAll(Transport tn, Map<String, Ref> local) throws IOException { + private List<RemoteRefUpdate> doPushAll(Repository git, Transport tn, Map<String, Ref> local) + throws IOException { List<RemoteRefUpdate> cmds = new ArrayList<>(); boolean noPerms = !pool.isReplicatePermissions(); Map<String, Ref> remote = listRemote(tn); @@ -702,7 +702,7 @@ Ref dst = remote.get(spec.getDestination()); if (dst == null || !src.getObjectId().equals(dst.getObjectId())) { // Doesn't exist yet, or isn't the same value, request to push. - push(cmds, spec, src); + push(git, cmds, spec, src); } } } @@ -716,14 +716,15 @@ RefSpec spec = matchDst(ref.getName()); if (spec != null && !local.containsKey(spec.getSource())) { // No longer on local side, request removal. - delete(cmds, spec); + delete(git, cmds, spec); } } } return cmds; } - private List<RemoteRefUpdate> doPushDelta(Map<String, Ref> local) throws IOException { + private List<RemoteRefUpdate> doPushDelta(Repository git, Map<String, Ref> local) + throws IOException { List<RemoteRefUpdate> cmds = new ArrayList<>(); boolean noPerms = !pool.isReplicatePermissions(); Set<String> refs = flattenRefBatchesToPush(); @@ -739,13 +740,13 @@ } if (srcRef != null) { - if (canPushRef(srcRef.getName(), noPerms)) { - push(cmds, spec, srcRef); + if (canPushRef(src, noPerms)) { + push(git, cmds, spec, srcRef); } else { repLog.atFine().log("Skipping push of ref %s", srcRef.getName()); } } else if (config.isMirror()) { - delete(cmds, spec); + delete(git, cmds, spec); } } } @@ -787,13 +788,13 @@ } @VisibleForTesting - void push(List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException { + void push(Repository git, List<RemoteRefUpdate> cmds, RefSpec spec, Ref src) throws IOException { String dst = spec.getDestination(); boolean force = spec.isForceUpdate(); cmds.add(new RemoteRefUpdate(git, src, dst, force, null, null)); } - private void delete(List<RemoteRefUpdate> cmds, RefSpec spec) throws IOException { + private void delete(Repository git, List<RemoteRefUpdate> cmds, RefSpec spec) throws IOException { String dst = spec.getDestination(); boolean force = spec.isForceUpdate(); cmds.add(new RemoteRefUpdate(git, (Ref) null, dst, force, null, null));
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index 3b07f49..339123a 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -262,7 +262,7 @@ isCallFinished.await(10, TimeUnit.SECONDS); verify(transportMock, atLeastOnce()).push(any(), any()); - verify(pushOne, times(2)).push(any(), any(), any()); + verify(pushOne, times(2)).push(any(), any(), any(), any()); } @Test @@ -292,7 +292,7 @@ isCallFinished.await(10, TimeUnit.SECONDS); verify(transportMock, atLeastOnce()).push(any(), any()); - verify(pushOne, times(1)).push(any(), any(), any()); + verify(pushOne, times(1)).push(any(), any(), any(), any()); } @Test @@ -325,7 +325,7 @@ ArgumentCaptor<Ref> refCaptor = ArgumentCaptor.forClass(Ref.class); verify(transportMock, atLeastOnce()).push(any(), any()); - verify(pushOne, times(1)).push(any(), any(), refCaptor.capture()); + verify(pushOne, times(1)).push(any(), any(), any(), refCaptor.capture()); assertThat(refCaptor.getValue().getName()).isEqualTo("refs/heads/master"); } @@ -347,7 +347,7 @@ isCallFinished.await(10, TimeUnit.SECONDS); verify(transportMock, times(1)).push(any(), any()); - verify(pushOne, times(1)).push(any(), any(), any()); + verify(pushOne, times(1)).push(any(), any(), any(), any()); } @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java index ba4a958..11b7718 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationDaemon.java
@@ -26,7 +26,6 @@ import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.api.changes.NotifyHandling; import com.google.gerrit.extensions.events.ProjectDeletedListener; -import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.git.LocalDiskRepositoryManager; import com.google.inject.Inject; import java.io.IOException; @@ -68,8 +67,8 @@ (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + TEST_PROJECT_CREATION_SECONDS); - @Inject protected SitePaths sitePaths; @Inject private ProjectOperations projectOperations; + @Inject private LocalDiskRepositoryManager localDiskRepositoryManager; protected Path gitPath; protected FileBasedConfig config; @@ -79,7 +78,7 @@ } protected String getProjectUri(Project.NameKey project) throws Exception { - return ((LocalDiskRepositoryManager) repoManager) + return localDiskRepositoryManager .getBasePath(project) .resolve(project.get() + ".git") .toString();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index 4414cec..e25a5d6 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -34,7 +34,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -266,30 +266,32 @@ .schedule(0, TimeUnit.SECONDS); CountDownLatch latch = new CountDownLatch(1); - Executor service = Executors.newSingleThreadExecutor(); - service.execute( - new Runnable() { - @Override - public void run() { - try { - future.get(); - state.waitForReplication(); - latch.countDown(); - } catch (Exception e) { - // fails the test because we don't countDown + try (ExecutorService service = Executors.newSingleThreadExecutor()) { + service.execute( + new Runnable() { + @Override + public void run() { + try { + future.get(); + state.waitForReplication(); + latch.countDown(); + } catch (Exception e) { + // fails the test because we don't countDown + } } - } - }); + }); - // Cancel the replication task - waitUntil(() -> getProjectTasks().size() != 0); - WorkQueue.Task<?> task = getProjectTasks().get(0); - assertThat(task.getState()).isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING); - task.cancel(false); + // Cancel the replication task + waitUntil(() -> getProjectTasks().size() != 0); + WorkQueue.Task<?> task = getProjectTasks().get(0); + assertThat(task.getState()) + .isAnyOf(WorkQueue.Task.State.READY, WorkQueue.Task.State.SLEEPING); + task.cancel(false); - // Confirm our waiting thread completed - boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout - assertThat(receivedSignal).isTrue(); + // Confirm our waiting thread completed + boolean receivedSignal = latch.await(5, TimeUnit.SECONDS); // FIXME Choose a good timeout + assertThat(receivedSignal).isTrue(); + } } private List<WorkQueue.Task<?>> getProjectTasks() {