| /* |
| * Copyright (C) 2016, Google Inc. |
| * and other copyright owners as documented in the project's IP log. |
| * |
| * This program and the accompanying materials are made available |
| * under the terms of the Eclipse Distribution License v1.0 which |
| * accompanies this distribution, is reproduced below, and is |
| * available at http://www.eclipse.org/org/documents/edl-v10.php |
| * |
| * All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or |
| * without modification, are permitted provided that the following |
| * conditions are met: |
| * |
| * - Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * |
| * - Redistributions in binary form must reproduce the above |
| * copyright notice, this list of conditions and the following |
| * disclaimer in the documentation and/or other materials provided |
| * with the distribution. |
| * |
| * - Neither the name of the Eclipse Foundation, Inc. nor the |
| * names of its contributors may be used to endorse or promote |
| * products derived from this software without specific prior |
| * written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND |
| * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, |
| * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
| * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
| * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
| * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
| * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
| * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, |
| * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF |
| * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| package org.eclipse.jgit.internal.ketch; |
| |
| import static org.eclipse.jgit.internal.ketch.Proposal.State.RUNNING; |
| |
| import java.io.IOException; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeoutException; |
| import java.util.stream.Collectors; |
| |
| import org.eclipse.jgit.annotations.Nullable; |
| import org.eclipse.jgit.internal.storage.reftree.Command; |
| import org.eclipse.jgit.internal.storage.reftree.RefTree; |
| import org.eclipse.jgit.lib.CommitBuilder; |
| import org.eclipse.jgit.lib.ObjectId; |
| import org.eclipse.jgit.lib.ObjectInserter; |
| import org.eclipse.jgit.lib.PersonIdent; |
| import org.eclipse.jgit.lib.Ref; |
| import org.eclipse.jgit.lib.Repository; |
| import org.eclipse.jgit.revwalk.RevCommit; |
| import org.eclipse.jgit.revwalk.RevWalk; |
| import org.eclipse.jgit.transport.ReceiveCommand; |
| import org.eclipse.jgit.util.time.ProposedTimestamp; |
| |
| /** A {@link Round} that aggregates and sends user {@link Proposal}s. */ |
| class ProposalRound extends Round { |
| private final List<Proposal> todo; |
| private RefTree queuedTree; |
| |
| ProposalRound(KetchLeader leader, LogIndex head, List<Proposal> todo, |
| @Nullable RefTree tree) { |
| super(leader, head); |
| this.todo = todo; |
| |
| if (tree != null && canCombine(todo)) { |
| this.queuedTree = tree; |
| } else { |
| leader.roundHoldsReferenceToRefTree = false; |
| } |
| } |
| |
| private static boolean canCombine(List<Proposal> todo) { |
| Proposal first = todo.get(0); |
| for (int i = 1; i < todo.size(); i++) { |
| if (!canCombine(first, todo.get(i))) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private static boolean canCombine(Proposal a, Proposal b) { |
| String aMsg = nullToEmpty(a.getMessage()); |
| String bMsg = nullToEmpty(b.getMessage()); |
| return aMsg.equals(bMsg) && canCombine(a.getAuthor(), b.getAuthor()); |
| } |
| |
| private static String nullToEmpty(@Nullable String str) { |
| return str != null ? str : ""; //$NON-NLS-1$ |
| } |
| |
| private static boolean canCombine(@Nullable PersonIdent a, |
| @Nullable PersonIdent b) { |
| if (a != null && b != null) { |
| // Same name and email address. Combine timestamp as the two |
| // proposals are running concurrently and appear together or |
| // not at all from the point of view of an outside reader. |
| return a.getName().equals(b.getName()) |
| && a.getEmailAddress().equals(b.getEmailAddress()); |
| } |
| |
| // If a and b are null, both will be the system identity. |
| return a == null && b == null; |
| } |
| |
| @Override |
| void start() throws IOException { |
| for (Proposal p : todo) { |
| p.notifyState(RUNNING); |
| } |
| try { |
| ObjectId id; |
| try (Repository git = leader.openRepository(); |
| ProposedTimestamp ts = getSystem().getClock().propose()) { |
| id = insertProposals(git, ts); |
| blockUntil(ts); |
| } |
| runAsync(id); |
| } catch (NoOp e) { |
| for (Proposal p : todo) { |
| p.success(); |
| } |
| leader.lock.lock(); |
| try { |
| leader.nextRound(); |
| } finally { |
| leader.lock.unlock(); |
| } |
| } catch (IOException e) { |
| abort(); |
| throw e; |
| } |
| } |
| |
| private ObjectId insertProposals(Repository git, ProposedTimestamp ts) |
| throws IOException, NoOp { |
| ObjectId id; |
| try (ObjectInserter inserter = git.newObjectInserter()) { |
| // TODO(sop) Process signed push certificates. |
| |
| if (queuedTree != null) { |
| id = insertSingleProposal(git, ts, inserter); |
| } else { |
| id = insertMultiProposal(git, ts, inserter); |
| } |
| |
| stageCommands = makeStageList(git, inserter); |
| inserter.flush(); |
| } |
| return id; |
| } |
| |
| private ObjectId insertSingleProposal(Repository git, ProposedTimestamp ts, |
| ObjectInserter inserter) throws IOException, NoOp { |
| // Fast path: tree is passed in with all proposals applied. |
| ObjectId treeId = queuedTree.writeTree(inserter); |
| queuedTree = null; |
| leader.roundHoldsReferenceToRefTree = false; |
| |
| if (!ObjectId.zeroId().equals(acceptedOldIndex)) { |
| try (RevWalk rw = new RevWalk(git)) { |
| RevCommit c = rw.parseCommit(acceptedOldIndex); |
| if (treeId.equals(c.getTree())) { |
| throw new NoOp(); |
| } |
| } |
| } |
| |
| Proposal p = todo.get(0); |
| CommitBuilder b = new CommitBuilder(); |
| b.setTreeId(treeId); |
| if (!ObjectId.zeroId().equals(acceptedOldIndex)) { |
| b.setParentId(acceptedOldIndex); |
| } |
| b.setCommitter(leader.getSystem().newCommitter(ts)); |
| b.setAuthor(p.getAuthor() != null ? p.getAuthor() : b.getCommitter()); |
| b.setMessage(message(p)); |
| return inserter.insert(b); |
| } |
| |
| private ObjectId insertMultiProposal(Repository git, ProposedTimestamp ts, |
| ObjectInserter inserter) throws IOException, NoOp { |
| // The tree was not passed in, or there are multiple proposals |
| // each needing their own commit. Reset the tree and replay each |
| // proposal in order as individual commits. |
| ObjectId lastIndex = acceptedOldIndex; |
| ObjectId oldTreeId; |
| RefTree tree; |
| if (ObjectId.zeroId().equals(lastIndex)) { |
| oldTreeId = ObjectId.zeroId(); |
| tree = RefTree.newEmptyTree(); |
| } else { |
| try (RevWalk rw = new RevWalk(git)) { |
| RevCommit c = rw.parseCommit(lastIndex); |
| oldTreeId = c.getTree(); |
| tree = RefTree.read(rw.getObjectReader(), c.getTree()); |
| } |
| } |
| |
| PersonIdent committer = leader.getSystem().newCommitter(ts); |
| for (Proposal p : todo) { |
| if (!tree.apply(p.getCommands())) { |
| // This should not occur, previously during queuing the |
| // commands were successfully applied to the pending tree. |
| // Abort the entire round. |
| throw new IOException( |
| KetchText.get().queuedProposalFailedToApply); |
| } |
| |
| ObjectId treeId = tree.writeTree(inserter); |
| if (treeId.equals(oldTreeId)) { |
| continue; |
| } |
| |
| CommitBuilder b = new CommitBuilder(); |
| b.setTreeId(treeId); |
| if (!ObjectId.zeroId().equals(lastIndex)) { |
| b.setParentId(lastIndex); |
| } |
| b.setAuthor(p.getAuthor() != null ? p.getAuthor() : committer); |
| b.setCommitter(committer); |
| b.setMessage(message(p)); |
| lastIndex = inserter.insert(b); |
| } |
| if (lastIndex.equals(acceptedOldIndex)) { |
| throw new NoOp(); |
| } |
| return lastIndex; |
| } |
| |
| private String message(Proposal p) { |
| StringBuilder m = new StringBuilder(); |
| String msg = p.getMessage(); |
| if (msg != null && !msg.isEmpty()) { |
| m.append(msg); |
| while (m.length() < 2 || m.charAt(m.length() - 2) != '\n' |
| || m.charAt(m.length() - 1) != '\n') { |
| m.append('\n'); |
| } |
| } |
| m.append(KetchConstants.TERM.getName()) |
| .append(": ") //$NON-NLS-1$ |
| .append(leader.getTerm()); |
| return m.toString(); |
| } |
| |
| void abort() { |
| for (Proposal p : todo) { |
| p.abort(); |
| } |
| } |
| |
| @Override |
| void success() { |
| for (Proposal p : todo) { |
| p.success(); |
| } |
| } |
| |
| private List<ReceiveCommand> makeStageList(Repository git, |
| ObjectInserter inserter) throws IOException { |
| // For each branch, collapse consecutive updates to only most recent, |
| // avoiding sending multiple objects in a rapid fast-forward chain, or |
| // rewritten content. |
| Map<String, ObjectId> byRef = new HashMap<>(); |
| for (Proposal p : todo) { |
| for (Command c : p.getCommands()) { |
| Ref n = c.getNewRef(); |
| if (n != null && !n.isSymbolic()) { |
| byRef.put(n.getName(), n.getObjectId()); |
| } |
| } |
| } |
| if (byRef.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| |
| Set<ObjectId> newObjs = new HashSet<>(byRef.values()); |
| StageBuilder b = new StageBuilder( |
| leader.getSystem().getTxnStage(), |
| acceptedNewIndex); |
| return b.makeStageList(newObjs, git, inserter); |
| } |
| |
| private void blockUntil(ProposedTimestamp ts) |
| throws TimeIsUncertainException { |
| List<ProposedTimestamp> times = todo.stream() |
| .flatMap(p -> p.getProposedTimestamps().stream()) |
| .collect(Collectors.toCollection(ArrayList::new)); |
| times.add(ts); |
| |
| try { |
| Duration maxWait = getSystem().getMaxWaitForMonotonicClock(); |
| ProposedTimestamp.blockUntil(times, maxWait); |
| } catch (InterruptedException | TimeoutException e) { |
| throw new TimeIsUncertainException(e); |
| } |
| } |
| |
| private static class NoOp extends Exception { |
| private static final long serialVersionUID = 1L; |
| } |
| } |