| /* |
| * Copyright (C) 2016, Google Inc. and others |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Distribution License v. 1.0 which is available at |
| * https://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * SPDX-License-Identifier: BSD-3-Clause |
| */ |
| |
| package org.eclipse.jgit.internal.ketch; |
| |
| import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE; |
| import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER; |
| import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN; |
| import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY; |
| import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED; |
| |
| import java.io.IOException; |
| import java.text.MessageFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.eclipse.jgit.internal.storage.reftree.RefTree; |
| import org.eclipse.jgit.lib.ObjectId; |
| import org.eclipse.jgit.lib.Repository; |
| import org.eclipse.jgit.revwalk.RevCommit; |
| import org.eclipse.jgit.revwalk.RevWalk; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A leader managing consensus across remote followers. |
| * <p> |
| * A leader instance starts up in |
| * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries |
| * to begin a new term by sending an |
| * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its |
| * term starts if a majority of replicas have accepted this leader instance for |
| * the term. |
| * <p> |
| * Once elected by a majority the instance enters |
| * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs |
| * proposals offered to {@link #queueProposal(Proposal)}. This continues until |
| * the leader is timed out for inactivity, or is deposed by a competing leader |
| * gaining its own majority. |
| * <p> |
| * Once timed out or deposed this {@code KetchLeader} instance should be |
| * discarded, and a new instance takes over. |
| * <p> |
| * Each leader instance coordinates a group of |
| * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are |
| * owned by the leader instance and must be discarded when the leader is |
| * discarded. |
| * <p> |
| * In Ketch all push requests are issued through the leader. The steps are as |
| * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an |
| * example): |
| * <ul> |
| * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the |
| * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push. |
| * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance. |
| * <li>Wait for consensus with |
| * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}. |
| * <li>To examine the status of the push, check |
| * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at |
| * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}. |
| * </ul> |
| * <p> |
| * The leader gains consensus by first pushing the needed objects and a |
| * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the |
| * desired target repository state to the {@code refs/txn/accepted} branch on |
| * each of the replicas. Once a majority has succeeded, the leader commits the |
| * state by either pushing the {@code refs/txn/accepted} value to |
| * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates |
| * to {@code refs/heads/master}, etc. for stock Git replicas. |
| * <p> |
| * Internally, the actual transport to replicas is performed on background |
| * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s |
| * executor service. For performance, the |
| * {@link org.eclipse.jgit.internal.ketch.KetchLeader}, |
| * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and |
| * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state, |
| * and may invoke each other's methods on different threads. This access is |
| * protected by the leader's {@link #lock} object. Care must be taken to prevent |
| * concurrent access by correctly obtaining the leader's lock. |
| */ |
| public abstract class KetchLeader { |
| private static final Logger log = LoggerFactory.getLogger(KetchLeader.class); |
| |
| /** Current state of the leader instance. */ |
| public static enum State { |
| /** Newly created instance trying to elect itself leader. */ |
| CANDIDATE, |
| |
| /** Leader instance elected by a majority. */ |
| LEADER, |
| |
| /** Instance has been deposed by another with a more recent term. */ |
| DEPOSED, |
| |
| /** Leader has been gracefully shutdown, e.g. due to inactivity. */ |
| SHUTDOWN; |
| } |
| |
| private final KetchSystem system; |
| |
| /** Leader's knowledge of replicas for this repository. */ |
| private KetchReplica[] voters; |
| private KetchReplica[] followers; |
| private LocalReplica self; |
| |
| /** |
| * Lock protecting all data within this leader instance. |
| * <p> |
| * This lock extends into the {@link KetchReplica} instances used by the |
| * leader. They share the same lock instance to simplify concurrency. |
| */ |
| final Lock lock; |
| |
| private State state = CANDIDATE; |
| |
| /** Term of this leader, once elected. */ |
| private long term; |
| |
| /** |
| * Pending proposals accepted into the queue in FIFO order. |
| * <p> |
| * These proposals were preflighted and do not contain any conflicts with |
| * each other and their expectations matched the leader's local view of the |
| * agreed upon {@code refs/txn/accepted} tree. |
| */ |
| private final List<Proposal> queued; |
| |
| /** |
| * State of the repository's RefTree after applying all entries in |
| * {@link #queued}. New proposals must be consistent with this tree to be |
| * appended to the end of {@link #queued}. |
| * <p> |
| * Must be deep-copied with {@link RefTree#copy()} if |
| * {@link #roundHoldsReferenceToRefTree} is {@code true}. |
| */ |
| private RefTree refTree; |
| |
| /** |
| * If {@code true} {@link #refTree} must be duplicated before queuing the |
| * next proposal. The {@link #refTree} was passed into the constructor of a |
| * {@link ProposalRound}, and that external reference to the {@link RefTree} |
| * object is held by the proposal until it materializes the tree object in |
| * the object store. This field is set {@code true} when the proposal begins |
| * execution and set {@code false} once tree objects are persisted in the |
| * local repository's object store or {@link #refTree} is replaced with a |
| * copy to isolate it from any running rounds. |
| * <p> |
| * If proposals arrive less frequently than the {@code RefTree} is written |
| * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior |
| * avoids duplicating {@link #refTree}, reducing both time and memory used. |
| * However if proposals arrive more frequently {@link #refTree} must be |
| * duplicated to prevent newly queued proposals from corrupting the |
| * {@link #runningRound}. |
| */ |
| volatile boolean roundHoldsReferenceToRefTree; |
| |
| /** End of the leader's log. */ |
| private LogIndex headIndex; |
| |
| /** Leader knows this (and all prior) states are committed. */ |
| private LogIndex committedIndex; |
| |
| /** |
| * Is the leader idle with no work pending? If {@code true} there is no work |
| * for the leader (normal state). This field is {@code false} when the |
| * leader thread is scheduled for execution, or while {@link #runningRound} |
| * defines a round in progress. |
| */ |
| private boolean idle; |
| |
| /** Current round the leader is preparing and waiting for a vote on. */ |
| private Round runningRound; |
| |
| /** |
| * Construct a leader for a Ketch instance. |
| * |
| * @param system |
| * Ketch system configuration the leader must adhere to. |
| */ |
| protected KetchLeader(KetchSystem system) { |
| this.system = system; |
| this.lock = new ReentrantLock(true /* fair */); |
| this.queued = new ArrayList<>(4); |
| this.idle = true; |
| } |
| |
| /** @return system configuration. */ |
| KetchSystem getSystem() { |
| return system; |
| } |
| |
| /** |
| * Configure the replicas used by this Ketch instance. |
| * <p> |
| * Replicas should be configured once at creation before any proposals are |
| * executed. Once elections happen, <b>reconfiguration is a complicated |
| * concept that is not currently supported</b>. |
| * |
| * @param replicas |
| * members participating with the same repository. |
| */ |
| public void setReplicas(Collection<KetchReplica> replicas) { |
| List<KetchReplica> v = new ArrayList<>(5); |
| List<KetchReplica> f = new ArrayList<>(5); |
| for (KetchReplica r : replicas) { |
| switch (r.getParticipation()) { |
| case FULL: |
| v.add(r); |
| break; |
| |
| case FOLLOWER_ONLY: |
| f.add(r); |
| break; |
| } |
| } |
| |
| Collection<Integer> validVoters = validVoterCounts(); |
| if (!validVoters.contains(Integer.valueOf(v.size()))) { |
| throw new IllegalArgumentException(MessageFormat.format( |
| KetchText.get().unsupportedVoterCount, |
| Integer.valueOf(v.size()), |
| validVoters)); |
| } |
| |
| LocalReplica me = findLocal(v); |
| if (me == null) { |
| throw new IllegalArgumentException( |
| KetchText.get().localReplicaRequired); |
| } |
| |
| lock.lock(); |
| try { |
| voters = v.toArray(new KetchReplica[0]); |
| followers = f.toArray(new KetchReplica[0]); |
| self = me; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private static Collection<Integer> validVoterCounts() { |
| @SuppressWarnings("boxing") |
| Integer[] valid = { |
| // An odd number of voting replicas is required. |
| 1, 3, 5, 7, 9 }; |
| return Arrays.asList(valid); |
| } |
| |
| private static LocalReplica findLocal(Collection<KetchReplica> voters) { |
| for (KetchReplica r : voters) { |
| if (r instanceof LocalReplica) { |
| return (LocalReplica) r; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Get an instance of the repository for use by a leader thread. |
| * <p> |
| * The caller will close the repository. |
| * |
| * @return opened repository for use by the leader thread. |
| * @throws java.io.IOException |
| * cannot reopen the repository for the leader. |
| */ |
| protected abstract Repository openRepository() throws IOException; |
| |
| /** |
| * Queue a reference update proposal for consensus. |
| * <p> |
| * This method does not wait for consensus to be reached. The proposal is |
| * checked to look for risks of conflicts, and then submitted into the queue |
| * for distribution as soon as possible. |
| * <p> |
| * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()} |
| * to see if the proposal is done. |
| * |
| * @param proposal |
| * the proposed reference updates to queue for consideration. |
| * Once execution is complete the individual reference result |
| * fields will be populated with the outcome. |
| * @throws java.lang.InterruptedException |
| * current thread was interrupted. The proposal may have been |
| * aborted if it was not yet queued for execution. |
| * @throws java.io.IOException |
| * unrecoverable error preventing proposals from being attempted |
| * by this leader. |
| */ |
| public void queueProposal(Proposal proposal) |
| throws InterruptedException, IOException { |
| try { |
| lock.lockInterruptibly(); |
| } catch (InterruptedException e) { |
| proposal.abort(); |
| throw e; |
| } |
| try { |
| if (refTree == null) { |
| initialize(); |
| for (Proposal p : queued) { |
| refTree.apply(p.getCommands()); |
| } |
| } else if (roundHoldsReferenceToRefTree) { |
| refTree = refTree.copy(); |
| roundHoldsReferenceToRefTree = false; |
| } |
| |
| if (!refTree.apply(proposal.getCommands())) { |
| // A conflict exists so abort the proposal. |
| proposal.abort(); |
| return; |
| } |
| |
| queued.add(proposal); |
| proposal.notifyState(QUEUED); |
| |
| if (idle) { |
| scheduleLeader(); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void initialize() throws IOException { |
| try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) { |
| self.initialize(git); |
| |
| ObjectId accepted = self.getTxnAccepted(); |
| if (!ObjectId.zeroId().equals(accepted)) { |
| RevCommit c = rw.parseCommit(accepted); |
| headIndex = LogIndex.unknown(accepted); |
| refTree = RefTree.read(rw.getObjectReader(), c.getTree()); |
| } else { |
| headIndex = LogIndex.unknown(ObjectId.zeroId()); |
| refTree = RefTree.newEmptyTree(); |
| } |
| } |
| } |
| |
| private void scheduleLeader() { |
| idle = false; |
| system.getExecutor().execute(this::runLeader); |
| } |
| |
| private void runLeader() { |
| Round round; |
| lock.lock(); |
| try { |
| switch (state) { |
| case CANDIDATE: |
| round = new ElectionRound(this, headIndex); |
| break; |
| |
| case LEADER: |
| round = newProposalRound(); |
| break; |
| |
| case DEPOSED: |
| case SHUTDOWN: |
| default: |
| log.warn("Leader cannot run {}", state); //$NON-NLS-1$ |
| // TODO(sop): Redirect proposals. |
| return; |
| } |
| } finally { |
| lock.unlock(); |
| } |
| |
| try { |
| round.start(); |
| } catch (IOException e) { |
| // TODO(sop) Depose leader if it cannot use its repository. |
| log.error(KetchText.get().leaderFailedToStore, e); |
| lock.lock(); |
| try { |
| nextRound(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| private ProposalRound newProposalRound() { |
| List<Proposal> todo = new ArrayList<>(queued); |
| queued.clear(); |
| roundHoldsReferenceToRefTree = true; |
| return new ProposalRound(this, headIndex, todo, refTree); |
| } |
| |
| /** @return term of this leader's reign. */ |
| long getTerm() { |
| return term; |
| } |
| |
| /** @return end of the leader's log. */ |
| LogIndex getHead() { |
| return headIndex; |
| } |
| |
| /** |
| * @return state leader knows it has committed across a quorum of replicas. |
| */ |
| LogIndex getCommitted() { |
| return committedIndex; |
| } |
| |
| boolean isIdle() { |
| return idle; |
| } |
| |
| void runAsync(Round round) { |
| lock.lock(); |
| try { |
| // End of the log is this round. Once transport begins it is |
| // reasonable to assume at least one replica will eventually get |
| // this, and there is reasonable probability it commits. |
| headIndex = round.acceptedNewIndex; |
| runningRound = round; |
| |
| for (KetchReplica replica : voters) { |
| replica.pushTxnAcceptedAsync(round); |
| } |
| for (KetchReplica replica : followers) { |
| replica.pushTxnAcceptedAsync(round); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Asynchronous signal from a replica after completion. |
| * <p> |
| * Must be called while {@link #lock} is held by the replica. |
| * |
| * @param replica |
| * replica posting a completion event. |
| */ |
| void onReplicaUpdate(KetchReplica replica) { |
| if (log.isDebugEnabled()) { |
| log.debug("Replica {} finished:\n{}", //$NON-NLS-1$ |
| replica.describeForLog(), snapshot()); |
| } |
| |
| if (replica.getParticipation() == FOLLOWER_ONLY) { |
| // Followers cannot vote, so votes haven't changed. |
| return; |
| } else if (runningRound == null) { |
| // No round running, no need to tally votes. |
| return; |
| } |
| |
| assert headIndex.equals(runningRound.acceptedNewIndex); |
| int matching = 0; |
| for (KetchReplica r : voters) { |
| if (r.hasAccepted(headIndex)) { |
| matching++; |
| } |
| } |
| |
| int quorum = voters.length / 2 + 1; |
| boolean success = matching >= quorum; |
| if (!success) { |
| return; |
| } |
| |
| switch (state) { |
| case CANDIDATE: |
| term = ((ElectionRound) runningRound).getTerm(); |
| state = LEADER; |
| if (log.isDebugEnabled()) { |
| log.debug("Won election, running term " + term); //$NON-NLS-1$ |
| } |
| |
| //$FALL-THROUGH$ |
| case LEADER: |
| committedIndex = headIndex; |
| if (log.isDebugEnabled()) { |
| log.debug("Committed {} in term {}", //$NON-NLS-1$ |
| committedIndex.describeForLog(), |
| Long.valueOf(term)); |
| } |
| nextRound(); |
| commitAsync(replica); |
| notifySuccess(runningRound); |
| if (log.isDebugEnabled()) { |
| log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$ |
| } |
| break; |
| |
| default: |
| log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$ |
| break; |
| } |
| } |
| |
| private void notifySuccess(Round round) { |
| // Drop the leader lock while notifying Proposal listeners. |
| lock.unlock(); |
| try { |
| round.success(); |
| } finally { |
| lock.lock(); |
| } |
| } |
| |
| private void commitAsync(KetchReplica caller) { |
| for (KetchReplica r : voters) { |
| if (r == caller) { |
| continue; |
| } |
| if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) { |
| r.pushCommitAsync(committedIndex); |
| } |
| } |
| for (KetchReplica r : followers) { |
| if (r == caller) { |
| continue; |
| } |
| if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) { |
| r.pushCommitAsync(committedIndex); |
| } |
| } |
| } |
| |
| /** Schedule the next round; invoked while {@link #lock} is held. */ |
| void nextRound() { |
| runningRound = null; |
| |
| if (queued.isEmpty()) { |
| idle = true; |
| } else { |
| // Caller holds lock. Reschedule leader on a new thread so |
| // the call stack can unwind and lock is not held unexpectedly |
| // during prepare for the next round. |
| scheduleLeader(); |
| } |
| } |
| |
| /** |
| * Snapshot this leader |
| * |
| * @return snapshot of this leader |
| */ |
| public LeaderSnapshot snapshot() { |
| lock.lock(); |
| try { |
| LeaderSnapshot s = new LeaderSnapshot(); |
| s.state = state; |
| s.term = term; |
| s.headIndex = headIndex; |
| s.committedIndex = committedIndex; |
| s.idle = isIdle(); |
| for (KetchReplica r : voters) { |
| s.replicas.add(r.snapshot()); |
| } |
| for (KetchReplica r : followers) { |
| s.replicas.add(r.snapshot()); |
| } |
| return s; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Gracefully shutdown this leader and cancel outstanding operations. |
| */ |
| public void shutdown() { |
| lock.lock(); |
| try { |
| if (state != SHUTDOWN) { |
| state = SHUTDOWN; |
| for (KetchReplica r : voters) { |
| r.shutdown(); |
| } |
| for (KetchReplica r : followers) { |
| r.shutdown(); |
| } |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String toString() { |
| return snapshot().toString(); |
| } |
| } |