Kill the MergeQueue
The MergeQueue is a historical artifact of Gerrit from the time when
accepting submissions and merging was performed on different machines.
Nowadays this is done in the same process, so we do not require an
intermediate queue anymore (also the SUBMITTED state is superfluous).
This change will directly integrate a submission or refuse to do so
and complain to the user. Many user complaints about Gerrit at Google can
be translated to the existence of the MergeQueue as when something goes
wrong in the back end, all the user sees is a "SUBMITTED, MERGE PENDING"
state with delayed information when this is actually merged or if
there is a problem they need to address. This is an unfortunate user
experience, as patience is hard.
A common work flow is to submit a set of changes on one branch by submitting
the later changes of the branch first, such that these enter the merge
pending state and on submitting the first patch of the branch, all the
changes get integrated at once (such as by a single merge commit performed
by Gerrit).
This behavior doesn't work any more with this change, but you'd rather get
a notification when trying to merge a later change of the branch.
That work flow however will be enabled again in another patch of this
topic, when change submission also submits all changes it depends on.
I just don't want to bundle that with this change (as (a) this is already
a large change and (b) it is changing fundamental behavior).
Change-Id: I0c43b16a216db2a82aa0e59375a8a43482bb3b45
diff --git a/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/GerritServer.java b/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/GerritServer.java
index de6cd16..948f178 100644
--- a/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/GerritServer.java
+++ b/gerrit-acceptance-tests/src/test/java/com/google/gerrit/acceptance/GerritServer.java
@@ -24,7 +24,6 @@
import com.google.gerrit.server.config.FactoryModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.AsyncReceiveCommits;
-import com.google.gerrit.server.git.SubmoduleOp;
import com.google.gerrit.server.index.ChangeSchemas;
import com.google.gerrit.server.ssh.NoSshModule;
import com.google.gerrit.server.util.SocketUtil;
@@ -207,7 +206,6 @@
protected void configure() {
bind(AccountCreator.class);
factory(PushOneCommit.Factory.class);
- factory(SubmoduleOp.Factory.class);
install(InProcessProtocol.module());
install(new NoSshModule());
install(new AsyncReceiveCommits.Module());
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
index cb8e9ba..dcb6c85 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
@@ -51,7 +51,6 @@
import com.google.gerrit.server.config.CanonicalWebUrlProvider;
import com.google.gerrit.server.config.GerritGlobalModule;
import com.google.gerrit.server.config.GerritServerConfig;
-import com.google.gerrit.server.config.MasterNodeStartup;
import com.google.gerrit.server.config.RestCacheAdminModule;
import com.google.gerrit.server.contact.ContactStoreModule;
import com.google.gerrit.server.contact.HttpContactStoreConnection;
@@ -359,9 +358,6 @@
} else {
modules.add(NoSshKeyCache.module());
}
- if (!slave) {
- modules.add(new MasterNodeStartup());
- }
modules.add(new AbstractModule() {
@Override
protected void configure() {
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/change/Submit.java b/gerrit-server/src/main/java/com/google/gerrit/server/change/Submit.java
index bb0e594..e13b199 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/change/Submit.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/change/Submit.java
@@ -55,11 +55,13 @@
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.LabelNormalizer;
-import com.google.gerrit.server.git.MergeQueue;
+import com.google.gerrit.server.git.MergeException;
+import com.google.gerrit.server.git.MergeOp;
import com.google.gerrit.server.git.VersionedMetaData.BatchMetaDataUpdate;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.project.ChangeControl;
+import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.project.SubmitRuleEvaluator;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.query.change.InternalChangeQuery;
@@ -126,7 +128,7 @@
private final ChangeUpdate.Factory updateFactory;
private final ApprovalsUtil approvalsUtil;
private final ChangeMessagesUtil cmUtil;
- private final MergeQueue mergeQueue;
+ private final MergeOp.Factory mergeOpFactory;
private final ChangeIndexer indexer;
private final LabelNormalizer labelNormalizer;
private final AccountsCollection accounts;
@@ -147,7 +149,7 @@
ChangeUpdate.Factory updateFactory,
ApprovalsUtil approvalsUtil,
ChangeMessagesUtil cmUtil,
- MergeQueue mergeQueue,
+ MergeOp.Factory mergeOpFactory,
AccountsCollection accounts,
ChangesCollection changes,
ChangeIndexer indexer,
@@ -162,7 +164,7 @@
this.updateFactory = updateFactory;
this.approvalsUtil = approvalsUtil;
this.cmUtil = cmUtil;
- this.mergeQueue = mergeQueue;
+ this.mergeOpFactory = mergeOpFactory;
this.accounts = accounts;
this.changes = changes;
this.indexer = indexer;
@@ -212,17 +214,21 @@
List<Change> submittedChanges = submit(rsrc, caller, false);
- if (input.waitForMerge) {
- for (Change c : submittedChanges) {
- // TODO(sbeller): We should make schedule return a Future, then we
- // could do these all in parallel and still block until they're done.
- mergeQueue.merge(c.getDest());
+ try {
+ if (input.waitForMerge) {
+ for (Change c : submittedChanges) {
+ // TODO(sbeller): We should make schedule return a Future, then we
+ // could do these all in parallel and still block until they're done.
+ mergeOpFactory.create(c.getDest()).merge();
+ }
+ change = dbProvider.get().changes().get(change.getId());
+ } else {
+ for (Change c : submittedChanges) {
+ mergeOpFactory.create(c.getDest()).merge();
+ }
}
- change = dbProvider.get().changes().get(change.getId());
- } else {
- for (Change c : submittedChanges) {
- mergeQueue.schedule(c.getDest());
- }
+ } catch (MergeException | NoSuchChangeException e) {
+ throw new OrmException("Submission failed", e);
}
if (change == null) {
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritGlobalModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritGlobalModule.java
index 908a388..82372cc 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritGlobalModule.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritGlobalModule.java
@@ -73,13 +73,13 @@
import com.google.gerrit.server.change.MergeabilityCacheImpl;
import com.google.gerrit.server.events.EventFactory;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
-import com.google.gerrit.server.git.ChangeMergeQueue;
import com.google.gerrit.server.git.GitModule;
-import com.google.gerrit.server.git.MergeQueue;
+import com.google.gerrit.server.git.MergeOp;
import com.google.gerrit.server.git.MergeUtil;
import com.google.gerrit.server.git.NotesBranchUtil;
import com.google.gerrit.server.git.ReceivePackInitializer;
import com.google.gerrit.server.git.SignedPushModule;
+import com.google.gerrit.server.git.SubmoduleOp;
import com.google.gerrit.server.git.TagCache;
import com.google.gerrit.server.git.TransferConfig;
import com.google.gerrit.server.git.validators.CommitValidationListener;
@@ -224,8 +224,6 @@
bind(ChangeCleanupConfig.class);
bind(ApprovalsUtil.class);
- bind(ChangeMergeQueue.class).in(SINGLETON);
- bind(MergeQueue.class).to(ChangeMergeQueue.class).in(SINGLETON);
bind(RuntimeInstance.class)
.toProvider(VelocityRuntimeProvider.class)
@@ -235,6 +233,8 @@
bind(Boolean.class).annotatedWith(DisableReverseDnsLookup.class)
.toProvider(DisableReverseDnsLookupProvider.class).in(SINGLETON);
+ factory(MergeOp.Factory.class);
+ factory(SubmoduleOp.Factory.class);
bind(PatchSetInfoFactory.class);
bind(IdentifiedUser.GenericFactory.class).in(SINGLETON);
bind(ChangeControl.GenericFactory.class);
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java
index cdfad8d..d481b44 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java
@@ -18,8 +18,6 @@
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.RequestCleanup;
-import com.google.gerrit.server.git.MergeOp;
-import com.google.gerrit.server.git.SubmoduleOp;
import com.google.gerrit.server.project.PerRequestProjectControlCache;
import com.google.gerrit.server.project.ProjectControl;
import com.google.inject.servlet.RequestScoped;
@@ -34,8 +32,5 @@
bind(PerRequestProjectControlCache.class).in(RequestScoped.class);
bind(ProjectControl.Factory.class).in(SINGLETON);
-
- factory(SubmoduleOp.Factory.class);
- factory(MergeOp.Factory.class);
}
}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/config/MasterNodeStartup.java b/gerrit-server/src/main/java/com/google/gerrit/server/config/MasterNodeStartup.java
deleted file mode 100644
index a7360de..0000000
--- a/gerrit-server/src/main/java/com/google/gerrit/server/config/MasterNodeStartup.java
+++ /dev/null
@@ -1,72 +0,0 @@
-// Copyright (C) 2009 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.gerrit.server.config;
-
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.lifecycle.LifecycleModule;
-import com.google.gerrit.server.git.ReloadSubmitQueueOp;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import org.eclipse.jgit.lib.Config;
-
-import java.util.concurrent.ScheduledFuture;
-
-/** Configuration for a master node in a cluster of servers. */
-public class MasterNodeStartup extends LifecycleModule {
- @Override
- public void configure() {
- listener().to(Lifecycle.class);
- }
-
- @Singleton
- static class Lifecycle implements LifecycleListener {
- private static final int INITIAL_DELAY_S = 15;
-
- private final ReloadSubmitQueueOp submit;
- private final long delay;
- private volatile ScheduledFuture<?> handle;
-
- @Inject
- Lifecycle(ReloadSubmitQueueOp submit,
- @GerritServerConfig Config config) {
- this.submit = submit;
- this.delay = ConfigUtil.getTimeUnit(config,
- "changeMerge", null, "checkFrequency",
- SECONDS.convert(5, MINUTES), SECONDS);
- }
-
- @Override
- public void start() {
- if (delay > 0) {
- handle = submit.startWithFixedDelay(INITIAL_DELAY_S, delay, SECONDS);
- } else {
- handle = submit.start(INITIAL_DELAY_S, SECONDS);
- }
- }
-
- @Override
- public void stop() {
- ScheduledFuture<?> f = handle;
- if (f != null) {
- handle = null;
- f.cancel(true);
- }
- }
- }
-}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeMergeQueue.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeMergeQueue.java
deleted file mode 100644
index be3902c..0000000
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ChangeMergeQueue.java
+++ /dev/null
@@ -1,277 +0,0 @@
-// Copyright (C) 2009 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.gerrit.server.git;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-import com.google.gerrit.common.TimeUtil;
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.reviewdb.client.Project;
-import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.CurrentUser;
-import com.google.gerrit.server.RemotePeer;
-import com.google.gerrit.server.config.GerritRequestModule;
-import com.google.gerrit.server.config.RequestScopedReviewDbProvider;
-import com.google.gerrit.server.ssh.SshInfo;
-import com.google.gerrit.server.util.RequestContext;
-import com.google.gerrit.server.util.RequestScopePropagator;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.OutOfScopeException;
-import com.google.inject.Provider;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import com.google.inject.servlet.RequestScoped;
-
-import com.jcraft.jsch.HostKey;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-@Singleton
-public class ChangeMergeQueue implements MergeQueue {
- private static final Logger log =
- LoggerFactory.getLogger(ChangeMergeQueue.class);
-
- private final Map<Branch.NameKey, MergeEntry> active = new HashMap<>();
- private final Map<Branch.NameKey, RecheckJob> recheck = new HashMap<>();
-
- private final WorkQueue workQueue;
- private final Provider<MergeOp.Factory> bgFactory;
- private final PerThreadRequestScope.Scoper threadScoper;
-
- @Inject
- ChangeMergeQueue(final WorkQueue wq, Injector parent) {
- workQueue = wq;
-
- Injector child = parent.createChildInjector(new AbstractModule() {
- @Override
- protected void configure() {
- bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
- bind(RequestScopePropagator.class)
- .to(PerThreadRequestScope.Propagator.class);
- bind(PerThreadRequestScope.Propagator.class);
- install(new GerritRequestModule());
-
- bind(SocketAddress.class).annotatedWith(RemotePeer.class).toProvider(
- new Provider<SocketAddress>() {
- @Override
- public SocketAddress get() {
- throw new OutOfScopeException("No remote peer on merge thread");
- }
- });
- bind(SshInfo.class).toInstance(new SshInfo() {
- @Override
- public List<HostKey> getHostKeys() {
- return Collections.emptyList();
- }
- });
- }
-
- @Provides
- public PerThreadRequestScope.Scoper provideScoper(
- final PerThreadRequestScope.Propagator propagator,
- final Provider<RequestScopedReviewDbProvider> dbProvider) {
- final RequestContext requestContext = new RequestContext() {
- @Override
- public CurrentUser getCurrentUser() {
- throw new OutOfScopeException("No user on merge thread");
- }
-
- @Override
- public Provider<ReviewDb> getReviewDbProvider() {
- return dbProvider.get();
- }
- };
- return new PerThreadRequestScope.Scoper() {
- @Override
- public <T> Callable<T> scope(Callable<T> callable) {
- return propagator.scope(requestContext, callable);
- }
- };
- }
- });
- bgFactory = child.getProvider(MergeOp.Factory.class);
- threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
- }
-
- @Override
- public void merge(Branch.NameKey branch) {
- if (start(branch)) {
- mergeImpl(branch);
- }
- }
-
- private synchronized boolean start(final Branch.NameKey branch) {
- final MergeEntry e = active.get(branch);
- if (e == null) {
- // Let the caller attempt this merge, its the only one interested
- // in processing this branch right now.
- //
- active.put(branch, new MergeEntry(branch));
- return true;
- } else {
- // Request that the job queue handle this merge later.
- //
- e.needMerge = true;
- return false;
- }
- }
-
- @Override
- public synchronized void schedule(final Branch.NameKey branch) {
- MergeEntry e = active.get(branch);
- if (e == null) {
- e = new MergeEntry(branch);
- active.put(branch, e);
- e.needMerge = true;
- scheduleJob(e);
- } else {
- e.needMerge = true;
- }
- }
-
- @Override
- public synchronized void recheckAfter(final Branch.NameKey branch,
- final long delay, final TimeUnit delayUnit) {
- final long now = TimeUtil.nowMs();
- final long at = now + MILLISECONDS.convert(delay, delayUnit);
- RecheckJob e = recheck.get(branch);
- if (e == null) {
- e = new RecheckJob(branch);
- workQueue.getDefaultQueue().schedule(e, now - at, MILLISECONDS);
- recheck.put(branch, e);
- }
- e.recheckAt = Math.max(at, e.recheckAt);
- }
-
- private synchronized void finish(final Branch.NameKey branch) {
- final MergeEntry e = active.get(branch);
- if (e == null) {
- // Not registered? Shouldn't happen but ignore it.
- //
- return;
- }
-
- if (!e.needMerge) {
- // No additional merges are in progress, we can delete it.
- //
- active.remove(branch);
- return;
- }
-
- scheduleJob(e);
- }
-
- private void scheduleJob(final MergeEntry e) {
- if (!e.jobScheduled) {
- // No job has been scheduled to execute this branch, but it needs
- // to run a merge again.
- //
- e.jobScheduled = true;
- workQueue.getDefaultQueue().schedule(e, 0, TimeUnit.SECONDS);
- }
- }
-
- private synchronized void unschedule(final MergeEntry e) {
- e.jobScheduled = false;
- e.needMerge = false;
- }
-
- private void mergeImpl(final Branch.NameKey branch) {
- try {
- threadScoper.scope(new Callable<Void>(){
- @Override
- public Void call() throws Exception {
- bgFactory.get().create(branch).merge();
- return null;
- }
- }).call();
- } catch (Throwable e) {
- log.error("Merge attempt for " + branch + " failed", e);
- } finally {
- finish(branch);
- }
- }
-
- private synchronized void recheck(final RecheckJob e) {
- final long remainingDelay = e.recheckAt - TimeUtil.nowMs();
- if (MILLISECONDS.convert(10, SECONDS) < remainingDelay) {
- // Woke up too early, the job deadline was pushed back.
- // Reschedule for the new deadline. We allow for a small
- // amount of fuzz due to multiple reschedule attempts in
- // a short period of time being caused by MergeOp.
- //
- workQueue.getDefaultQueue().schedule(e, remainingDelay, MILLISECONDS);
- } else {
- // Schedule a merge attempt on this branch to see if we can
- // actually complete it this time.
- //
- schedule(e.dest);
- }
- }
-
- private class MergeEntry implements Runnable {
- final Branch.NameKey dest;
- boolean needMerge;
- boolean jobScheduled;
-
- MergeEntry(final Branch.NameKey d) {
- dest = d;
- }
-
- @Override
- public void run() {
- unschedule(this);
- mergeImpl(dest);
- }
-
- @Override
- public String toString() {
- final Project.NameKey project = dest.getParentKey();
- return "submit " + project.get() + " " + dest.getShortName();
- }
- }
-
- private class RecheckJob implements Runnable {
- final Branch.NameKey dest;
- long recheckAt;
-
- RecheckJob(final Branch.NameKey d) {
- dest = d;
- }
-
- @Override
- public void run() {
- recheck(this);
- }
-
- @Override
- public String toString() {
- final Project.NameKey project = dest.getParentKey();
- return "recheck " + project.get() + " " + dest.getShortName();
- }
- }
-}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeOp.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeOp.java
index 2f53ce4..b2223f8 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeOp.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeOp.java
@@ -17,7 +17,6 @@
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.eclipse.jgit.lib.RefDatabase.ALL;
import com.google.common.collect.ArrayListMultimap;
@@ -43,8 +42,12 @@
import com.google.gerrit.server.ApprovalsUtil;
import com.google.gerrit.server.ChangeMessagesUtil;
import com.google.gerrit.server.ChangeUtil;
+import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.RemotePeer;
import com.google.gerrit.server.account.AccountCache;
+import com.google.gerrit.server.config.GerritRequestModule;
+import com.google.gerrit.server.config.RequestScopedReviewDbProvider;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.strategy.SubmitStrategy;
import com.google.gerrit.server.git.strategy.SubmitStrategyFactory;
@@ -65,13 +68,22 @@
import com.google.gerrit.server.project.SubmitRuleEvaluator;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.query.change.InternalChangeQuery;
+import com.google.gerrit.server.ssh.SshInfo;
+import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.AbstractModule;
import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.OutOfScopeException;
import com.google.inject.Provider;
+import com.google.inject.Provides;
import com.google.inject.assistedinject.Assisted;
+import com.google.inject.servlet.RequestScoped;
+
+import com.jcraft.jsch.HostKey;
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
@@ -91,7 +103,9 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -99,6 +113,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.Callable;
/**
* Merges changes in submission order into a single branch.
@@ -125,9 +140,6 @@
private static final long DEPENDENCY_DELAY =
MILLISECONDS.convert(15, MINUTES);
- private static final long LOCK_FAILURE_RETRY_DELAY =
- MILLISECONDS.convert(15, SECONDS);
-
private static final long MAX_SUBMIT_WINDOW =
MILLISECONDS.convert(12, HOURS);
@@ -145,12 +157,10 @@
private final IdentifiedUser.GenericFactory identifiedUserFactory;
private final MergedSender.Factory mergedSenderFactory;
private final MergeFailSender.Factory mergeFailSenderFactory;
- private final MergeQueue mergeQueue;
private final MergeValidators.Factory mergeValidatorsFactory;
private final PatchSetInfoFactory patchSetInfoFactory;
private final ProjectCache projectCache;
private final Provider<InternalChangeQuery> queryProvider;
- private final RequestScopePropagator requestScopePropagator;
private final SchemaFactory<ReviewDb> schemaFactory;
private final SubmitStrategyFactory submitStrategyFactory;
private final SubmoduleOp.Factory subOpFactory;
@@ -163,6 +173,7 @@
private final List<CodeReviewCommit> potentiallyStillSubmittable;
private final Map<Change.Id, CodeReviewCommit> commits;
private final List<Change> toUpdate;
+ private final PerThreadRequestScope.Scoper threadScoper;
private ProjectState destProject;
private ReviewDb db;
@@ -181,6 +192,7 @@
ChangeData.Factory changeDataFactory,
ChangeHooks hooks,
ChangeIndexer indexer,
+ Injector injector,
ChangeMessagesUtil cmUtil,
ChangeNotes.Factory notesFactory,
ChangeUpdate.Factory updateFactory,
@@ -189,12 +201,10 @@
IdentifiedUser.GenericFactory identifiedUserFactory,
MergedSender.Factory mergedSenderFactory,
MergeFailSender.Factory mergeFailSenderFactory,
- MergeQueue mergeQueue,
MergeValidators.Factory mergeValidatorsFactory,
PatchSetInfoFactory patchSetInfoFactory,
ProjectCache projectCache,
Provider<InternalChangeQuery> queryProvider,
- RequestScopePropagator requestScopePropagator,
SchemaFactory<ReviewDb> schemaFactory,
SubmitStrategyFactory submitStrategyFactory,
SubmoduleOp.Factory subOpFactory,
@@ -215,12 +225,10 @@
this.identifiedUserFactory = identifiedUserFactory;
this.mergedSenderFactory = mergedSenderFactory;
this.mergeFailSenderFactory = mergeFailSenderFactory;
- this.mergeQueue = mergeQueue;
this.mergeValidatorsFactory = mergeValidatorsFactory;
this.patchSetInfoFactory = patchSetInfoFactory;
this.projectCache = projectCache;
this.queryProvider = queryProvider;
- this.requestScopePropagator = requestScopePropagator;
this.schemaFactory = schemaFactory;
this.submitStrategyFactory = submitStrategyFactory;
this.subOpFactory = subOpFactory;
@@ -233,6 +241,55 @@
potentiallyStillSubmittable = new ArrayList<>();
commits = new HashMap<>();
toUpdate = Lists.newArrayList();
+
+ Injector child = injector.createChildInjector(new AbstractModule() {
+ @Override
+ protected void configure() {
+ bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
+ bind(RequestScopePropagator.class)
+ .to(PerThreadRequestScope.Propagator.class);
+ bind(PerThreadRequestScope.Propagator.class);
+ install(new GerritRequestModule());
+
+ bind(SocketAddress.class).annotatedWith(RemotePeer.class).toProvider(
+ new Provider<SocketAddress>() {
+ @Override
+ public SocketAddress get() {
+ throw new OutOfScopeException("No remote peer on merge thread");
+ }
+ });
+ bind(SshInfo.class).toInstance(new SshInfo() {
+ @Override
+ public List<HostKey> getHostKeys() {
+ return Collections.emptyList();
+ }
+ });
+ }
+
+ @Provides
+ public PerThreadRequestScope.Scoper provideScoper(
+ final PerThreadRequestScope.Propagator propagator,
+ final Provider<RequestScopedReviewDbProvider> dbProvider) {
+ final RequestContext requestContext = new RequestContext() {
+ @Override
+ public CurrentUser getCurrentUser() {
+ throw new OutOfScopeException("No user on merge thread");
+ }
+
+ @Override
+ public Provider<ReviewDb> getReviewDbProvider() {
+ return dbProvider.get();
+ }
+ };
+ return new PerThreadRequestScope.Scoper() {
+ @Override
+ public <T> Callable<T> scope(Callable<T> callable) {
+ return propagator.scope(requestContext, callable);
+ }
+ };
+ }
+ });
+ threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
}
private void setDestProject() throws MergeException {
@@ -277,7 +334,7 @@
}
SubmitStrategy strategy = createStrategy(submitType);
MergeTip mergeTip = preMerge(strategy, toMerge.get(submitType));
- RefUpdate update = updateBranch(strategy, branchUpdate);
+ RefUpdate update = updateBranch(branchUpdate);
reopen = true;
updateChangeStatus(toSubmit.get(submitType), mergeTip);
@@ -656,8 +713,8 @@
}
}
- private RefUpdate updateBranch(SubmitStrategy strategy,
- RefUpdate branchUpdate) throws MergeException {
+ private RefUpdate updateBranch(RefUpdate branchUpdate)
+ throws MergeException {
CodeReviewCommit currentTip =
mergeTip != null ? mergeTip.getCurrentTip() : null;
if (Objects.equals(branchTip, currentTip)) {
@@ -712,17 +769,7 @@
return branchUpdate;
case LOCK_FAILURE:
- String msg;
- if (strategy.retryOnLockFailure()) {
- mergeQueue.recheckAfter(destBranch, LOCK_FAILURE_RETRY_DELAY,
- MILLISECONDS);
- msg = "will retry";
- } else {
- msg = "will not retry";
- }
- // TODO(dborowitz): Implement RefUpdate.toString().
- throw new IOException(branchUpdate.getResult().name() + ", " + msg
- + '\n' + branchUpdate);
+ throw new MergeException("Failed to lock " + branchUpdate.getName());
default:
throw new IOException(branchUpdate.getResult().name()
+ '\n' + branchUpdate);
@@ -855,7 +902,8 @@
}
}
- private Capable isSubmitStillPossible(CodeReviewCommit commit) {
+ private Capable isSubmitStillPossible(CodeReviewCommit commit)
+ throws MergeException {
Capable capable;
Change c = commit.change();
boolean submitStillPossible =
@@ -866,11 +914,7 @@
long recheckIn = waitUntil - now;
logDebug("Submit for {} is still possible; rechecking in {}ms",
c.getId(), recheckIn);
- // If we waited a short while we might still be able to get
- // this change submitted. Reschedule an attempt in a bit.
- //
- mergeQueue.recheckAfter(destBranch, recheckIn, MILLISECONDS);
- capable = Capable.OK;
+ throw new MergeException("Cannot integrate " + c);
} else if (submitStillPossible) {
// It would be possible to submit the change if the missing
// dependencies are also submitted. Perhaps the user just
@@ -961,7 +1005,7 @@
throws OrmException, IOException {
logDebug("Setting change {} merged", c.getId());
ChangeUpdate update = null;
- PatchSetApproval submitter;
+ final PatchSetApproval submitter;
PatchSet merged;
try {
db.changes().beginTransaction(c.getId());
@@ -988,7 +1032,19 @@
db.rollback();
}
update.commit();
- sendMergedEmail(c, submitter);
+ final Change change = c;
+ try {
+ threadScoper.scope(new Callable<Void>(){
+ @Override
+ public Void call() throws Exception {
+ sendMergedEmail(change, submitter);
+ return null;
+ }
+ }).call();
+ } catch (Exception e) {
+ logError("internal server error", e);
+ }
+
indexer.index(db, c);
if (submitter != null && mergeResultRev != null) {
try {
@@ -1025,12 +1081,17 @@
private void sendMergedEmail(final Change c, final PatchSetApproval from) {
workQueue.getDefaultQueue()
- .submit(requestScopePropagator.wrap(new Runnable() {
+ .submit(new Runnable() {
@Override
public void run() {
PatchSet patchSet;
- try (ReviewDb reviewDb = schemaFactory.open()) {
- patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
+ try {
+ ReviewDb reviewDb = schemaFactory.open();
+ try {
+ patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
+ } finally {
+ reviewDb.close();
+ }
} catch (Exception e) {
logError("Cannot send email for submitted patch set " + c.getId(), e);
return;
@@ -1052,7 +1113,7 @@
public String toString() {
return "send-email merged";
}
- }));
+ });
}
private ChangeControl changeControl(Change c) throws NoSuchChangeException {
@@ -1181,36 +1242,51 @@
indexer.index(db, change);
final PatchSetApproval from = submitter;
- workQueue.getDefaultQueue()
- .submit(requestScopePropagator.wrap(new Runnable() {
- @Override
- public void run() {
- PatchSet patchSet;
- try (ReviewDb reviewDb = schemaFactory.open()) {
- patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
- } catch (Exception e) {
- logError("Cannot send email notifications about merge failure", e);
- return;
- }
+ try {
+ threadScoper.scope(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ workQueue.getDefaultQueue()
+ .submit(new Runnable() {
+ @Override
+ public void run() {
+ PatchSet patchSet;
+ try {
+ ReviewDb reviewDb = schemaFactory.open();
+ try {
+ patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
+ } finally {
+ reviewDb.close();
+ }
+ } catch (Exception e) {
+ logError("Cannot send email notifications about merge failure", e);
+ return;
+ }
- try {
- MergeFailSender cm = mergeFailSenderFactory.create(c.getId());
- if (from != null) {
- cm.setFrom(from.getAccountId());
- }
- cm.setPatchSet(patchSet);
- cm.setChangeMessage(msg);
- cm.send();
- } catch (Exception e) {
- logError("Cannot send email notifications about merge failure", e);
- }
- }
+ try {
+ MergeFailSender cm = mergeFailSenderFactory.create(c.getId());
+ if (from != null) {
+ cm.setFrom(from.getAccountId());
+ }
+ cm.setPatchSet(patchSet);
+ cm.setChangeMessage(msg);
+ cm.send();
+ } catch (Exception e) {
+ logError("Cannot send email notifications about merge failure", e);
+ }
+ }
- @Override
- public String toString() {
- return "send-email merge-failed";
- }
- }));
+ @Override
+ public String toString() {
+ return "send-email merge-failed";
+ }
+ });
+ return null;
+ }
+ }).call();
+ } catch (Exception e) {
+ logError("internal server error", e);
+ }
if (submitter != null) {
try {
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeQueue.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeQueue.java
deleted file mode 100644
index a53b04c..0000000
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/MergeQueue.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (C) 2009 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.gerrit.server.git;
-
-import com.google.gerrit.reviewdb.client.Branch;
-
-import java.util.concurrent.TimeUnit;
-
-public interface MergeQueue {
- void merge(Branch.NameKey branch);
- void schedule(Branch.NameKey branch);
- void recheckAfter(Branch.NameKey branch, long delay, TimeUnit delayUnit);
-}
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
index 323b01b..af14964 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java
@@ -329,7 +329,7 @@
private final SubmoduleOp.Factory subOpFactory;
private final Provider<Submit> submitProvider;
- private final MergeQueue mergeQueue;
+ private final MergeOp.Factory mergeFactory;
private final DynamicMap<ProjectConfigEntry> pluginConfigEntries;
private final NotesMigration notesMigration;
private final ChangeEditUtil editUtil;
@@ -379,7 +379,7 @@
@Assisted final Repository repo,
final SubmoduleOp.Factory subOpFactory,
final Provider<Submit> submitProvider,
- final MergeQueue mergeQueue,
+ final MergeOp.Factory mergeFactory,
final ChangeKindCache changeKindCache,
final DynamicMap<ProjectConfigEntry> pluginConfigEntries,
final NotesMigration notesMigration,
@@ -426,7 +426,7 @@
this.subOpFactory = subOpFactory;
this.submitProvider = submitProvider;
- this.mergeQueue = mergeQueue;
+ this.mergeFactory = mergeFactory;
this.pluginConfigEntries = pluginConfigEntries;
this.notesMigration = notesMigration;
@@ -1763,7 +1763,7 @@
}
private void submit(ChangeControl changeCtl, PatchSet ps)
- throws OrmException, IOException {
+ throws OrmException, IOException{
Submit submit = submitProvider.get();
RevisionResource rsrc = new RevisionResource(changes.parse(changeCtl), ps);
List<Change> changes;
@@ -1775,7 +1775,11 @@
}
addMessage("");
for (Change c : changes) {
- mergeQueue.merge(c.getDest());
+ try {
+ mergeFactory.create(c.getDest()).merge();
+ } catch (MergeException | NoSuchChangeException e) {
+ throw new OrmException(e);
+ }
c = db.changes().get(c.getId());
switch (c.getStatus()) {
case SUBMITTED:
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReloadSubmitQueueOp.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReloadSubmitQueueOp.java
deleted file mode 100644
index 734512f..0000000
--- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReloadSubmitQueueOp.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright (C) 2009 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.gerrit.server.git;
-
-import com.google.gerrit.reviewdb.client.Branch;
-import com.google.gerrit.server.query.change.ChangeData;
-import com.google.gerrit.server.query.change.InternalChangeQuery;
-import com.google.gerrit.server.util.OneOffRequestContext;
-import com.google.gwtorm.server.OrmException;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-
-@Singleton
-public class ReloadSubmitQueueOp extends DefaultQueueOp {
- private static final Logger log =
- LoggerFactory.getLogger(ReloadSubmitQueueOp.class);
-
- private final OneOffRequestContext requestContext;
- private final Provider<InternalChangeQuery> queryProvider;
- private final MergeQueue mergeQueue;
-
- @Inject
- ReloadSubmitQueueOp(
- OneOffRequestContext rc,
- WorkQueue wq,
- Provider<InternalChangeQuery> qp,
- MergeQueue mq) {
- super(wq);
- requestContext = rc;
- queryProvider = qp;
- mergeQueue = mq;
- }
-
- @Override
- public void run() {
- try (AutoCloseable ctx = requestContext.open()) {
- HashSet<Branch.NameKey> pending = new HashSet<>();
- for (ChangeData cd : queryProvider.get().allSubmitted()) {
- try {
- pending.add(cd.change().getDest());
- } catch (OrmException e) {
- log.error("Error reading submitted change", e);
- }
- }
-
- for (Branch.NameKey branch : pending) {
- mergeQueue.schedule(branch);
- }
- } catch (Exception e) {
- log.error("Cannot reload MergeQueue", e);
- }
- }
-
- @Override
- public String toString() {
- return "Reload Submit Queue";
- }
-}
diff --git a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
index dde4923..8968c81 100644
--- a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
+++ b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
@@ -35,7 +35,6 @@
import com.google.gerrit.server.config.GerritGlobalModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.GerritServerConfigModule;
-import com.google.gerrit.server.config.MasterNodeStartup;
import com.google.gerrit.server.config.RestCacheAdminModule;
import com.google.gerrit.server.config.SitePath;
import com.google.gerrit.server.contact.ContactStoreModule;
@@ -316,7 +315,6 @@
}
});
modules.add(SshKeyCacheImpl.module());
- modules.add(new MasterNodeStartup());
modules.add(new AbstractModule() {
@Override
protected void configure() {