|  | # Copyright (C) 2019 The Android Open Source Project | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #      http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | """Unittests for the project.py module.""" | 
|  |  | 
|  | import contextlib | 
|  | import os | 
|  | import shutil | 
|  | import subprocess | 
|  | import tempfile | 
|  | import unittest | 
|  |  | 
|  | import error | 
|  | import git_command | 
|  | import git_config | 
|  | import platform_utils | 
|  | import project | 
|  |  | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def TempGitTree(): | 
|  | """Create a new empty git checkout for testing.""" | 
|  | # TODO(vapier): Convert this to tempfile.TemporaryDirectory once we drop | 
|  | # Python 2 support entirely. | 
|  | try: | 
|  | tempdir = tempfile.mkdtemp(prefix='repo-tests') | 
|  |  | 
|  | # Tests need to assume, that main is default branch at init, | 
|  | # which is not supported in config until 2.28. | 
|  | cmd = ['git', 'init'] | 
|  | if git_command.git_require((2, 28, 0)): | 
|  | cmd += ['--initial-branch=main'] | 
|  | else: | 
|  | # Use template dir for init. | 
|  | templatedir = tempfile.mkdtemp(prefix='.test-template') | 
|  | with open(os.path.join(templatedir, 'HEAD'), 'w') as fp: | 
|  | fp.write('ref: refs/heads/main\n') | 
|  | cmd += ['--template', templatedir] | 
|  | subprocess.check_call(cmd, cwd=tempdir) | 
|  | yield tempdir | 
|  | finally: | 
|  | platform_utils.rmtree(tempdir) | 
|  |  | 
|  |  | 
|  | class FakeProject(object): | 
|  | """A fake for Project for basic functionality.""" | 
|  |  | 
|  | def __init__(self, worktree): | 
|  | self.worktree = worktree | 
|  | self.gitdir = os.path.join(worktree, '.git') | 
|  | self.name = 'fakeproject' | 
|  | self.work_git = project.Project._GitGetByExec( | 
|  | self, bare=False, gitdir=self.gitdir) | 
|  | self.bare_git = project.Project._GitGetByExec( | 
|  | self, bare=True, gitdir=self.gitdir) | 
|  | self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir) | 
|  |  | 
|  |  | 
|  | class ReviewableBranchTests(unittest.TestCase): | 
|  | """Check ReviewableBranch behavior.""" | 
|  |  | 
|  | def test_smoke(self): | 
|  | """A quick run through everything.""" | 
|  | with TempGitTree() as tempdir: | 
|  | fakeproj = FakeProject(tempdir) | 
|  |  | 
|  | # Generate some commits. | 
|  | with open(os.path.join(tempdir, 'readme'), 'w') as fp: | 
|  | fp.write('txt') | 
|  | fakeproj.work_git.add('readme') | 
|  | fakeproj.work_git.commit('-mAdd file') | 
|  | fakeproj.work_git.checkout('-b', 'work') | 
|  | fakeproj.work_git.rm('-f', 'readme') | 
|  | fakeproj.work_git.commit('-mDel file') | 
|  |  | 
|  | # Start off with the normal details. | 
|  | rb = project.ReviewableBranch( | 
|  | fakeproj, fakeproj.config.GetBranch('work'), 'main') | 
|  | self.assertEqual('work', rb.name) | 
|  | self.assertEqual(1, len(rb.commits)) | 
|  | self.assertIn('Del file', rb.commits[0]) | 
|  | d = rb.unabbrev_commits | 
|  | self.assertEqual(1, len(d)) | 
|  | short, long = next(iter(d.items())) | 
|  | self.assertTrue(long.startswith(short)) | 
|  | self.assertTrue(rb.base_exists) | 
|  | # Hard to assert anything useful about this. | 
|  | self.assertTrue(rb.date) | 
|  |  | 
|  | # Now delete the tracking branch! | 
|  | fakeproj.work_git.branch('-D', 'main') | 
|  | rb = project.ReviewableBranch( | 
|  | fakeproj, fakeproj.config.GetBranch('work'), 'main') | 
|  | self.assertEqual(0, len(rb.commits)) | 
|  | self.assertFalse(rb.base_exists) | 
|  | # Hard to assert anything useful about this. | 
|  | self.assertTrue(rb.date) | 
|  |  | 
|  |  | 
|  | class CopyLinkTestCase(unittest.TestCase): | 
|  | """TestCase for stub repo client checkouts. | 
|  |  | 
|  | It'll have a layout like: | 
|  | tempdir/          # self.tempdir | 
|  | checkout/       # self.topdir | 
|  | git-project/  # self.worktree | 
|  |  | 
|  | Attributes: | 
|  | tempdir: A dedicated temporary directory. | 
|  | worktree: The top of the repo client checkout. | 
|  | topdir: The top of a project checkout. | 
|  | """ | 
|  |  | 
|  | def setUp(self): | 
|  | self.tempdir = tempfile.mkdtemp(prefix='repo_tests') | 
|  | self.topdir = os.path.join(self.tempdir, 'checkout') | 
|  | self.worktree = os.path.join(self.topdir, 'git-project') | 
|  | os.makedirs(self.topdir) | 
|  | os.makedirs(self.worktree) | 
|  |  | 
|  | def tearDown(self): | 
|  | shutil.rmtree(self.tempdir, ignore_errors=True) | 
|  |  | 
|  | @staticmethod | 
|  | def touch(path): | 
|  | with open(path, 'w'): | 
|  | pass | 
|  |  | 
|  | def assertExists(self, path, msg=None): | 
|  | """Make sure |path| exists.""" | 
|  | if os.path.exists(path): | 
|  | return | 
|  |  | 
|  | if msg is None: | 
|  | msg = ['path is missing: %s' % path] | 
|  | while path != '/': | 
|  | path = os.path.dirname(path) | 
|  | if not path: | 
|  | # If we're given something like "foo", abort once we get to "". | 
|  | break | 
|  | result = os.path.exists(path) | 
|  | msg.append('\tos.path.exists(%s): %s' % (path, result)) | 
|  | if result: | 
|  | msg.append('\tcontents: %r' % os.listdir(path)) | 
|  | break | 
|  | msg = '\n'.join(msg) | 
|  |  | 
|  | raise self.failureException(msg) | 
|  |  | 
|  |  | 
|  | class CopyFile(CopyLinkTestCase): | 
|  | """Check _CopyFile handling.""" | 
|  |  | 
|  | def CopyFile(self, src, dest): | 
|  | return project._CopyFile(self.worktree, src, self.topdir, dest) | 
|  |  | 
|  | def test_basic(self): | 
|  | """Basic test of copying a file from a project to the toplevel.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | cf = self.CopyFile('foo.txt', 'foo') | 
|  | cf._Copy() | 
|  | self.assertExists(os.path.join(self.topdir, 'foo')) | 
|  |  | 
|  | def test_src_subdir(self): | 
|  | """Copy a file from a subdir of a project.""" | 
|  | src = os.path.join(self.worktree, 'bar', 'foo.txt') | 
|  | os.makedirs(os.path.dirname(src)) | 
|  | self.touch(src) | 
|  | cf = self.CopyFile('bar/foo.txt', 'new.txt') | 
|  | cf._Copy() | 
|  | self.assertExists(os.path.join(self.topdir, 'new.txt')) | 
|  |  | 
|  | def test_dest_subdir(self): | 
|  | """Copy a file to a subdir of a checkout.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | cf = self.CopyFile('foo.txt', 'sub/dir/new.txt') | 
|  | self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub'))) | 
|  | cf._Copy() | 
|  | self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt')) | 
|  |  | 
|  | def test_update(self): | 
|  | """Make sure changed files get copied again.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | dest = os.path.join(self.topdir, 'bar') | 
|  | with open(src, 'w') as f: | 
|  | f.write('1st') | 
|  | cf = self.CopyFile('foo.txt', 'bar') | 
|  | cf._Copy() | 
|  | self.assertExists(dest) | 
|  | with open(dest) as f: | 
|  | self.assertEqual(f.read(), '1st') | 
|  |  | 
|  | with open(src, 'w') as f: | 
|  | f.write('2nd!') | 
|  | cf._Copy() | 
|  | with open(dest) as f: | 
|  | self.assertEqual(f.read(), '2nd!') | 
|  |  | 
|  | def test_src_block_symlink(self): | 
|  | """Do not allow reading from a symlinked path.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | sym = os.path.join(self.worktree, 'sym') | 
|  | self.touch(src) | 
|  | platform_utils.symlink('foo.txt', sym) | 
|  | self.assertExists(sym) | 
|  | cf = self.CopyFile('sym', 'foo') | 
|  | self.assertRaises(error.ManifestInvalidPathError, cf._Copy) | 
|  |  | 
|  | def test_src_block_symlink_traversal(self): | 
|  | """Do not allow reading through a symlink dir.""" | 
|  | realfile = os.path.join(self.tempdir, 'file.txt') | 
|  | self.touch(realfile) | 
|  | src = os.path.join(self.worktree, 'bar', 'file.txt') | 
|  | platform_utils.symlink(self.tempdir, os.path.join(self.worktree, 'bar')) | 
|  | self.assertExists(src) | 
|  | cf = self.CopyFile('bar/file.txt', 'foo') | 
|  | self.assertRaises(error.ManifestInvalidPathError, cf._Copy) | 
|  |  | 
|  | def test_src_block_copy_from_dir(self): | 
|  | """Do not allow copying from a directory.""" | 
|  | src = os.path.join(self.worktree, 'dir') | 
|  | os.makedirs(src) | 
|  | cf = self.CopyFile('dir', 'foo') | 
|  | self.assertRaises(error.ManifestInvalidPathError, cf._Copy) | 
|  |  | 
|  | def test_dest_block_symlink(self): | 
|  | """Do not allow writing to a symlink.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | platform_utils.symlink('dest', os.path.join(self.topdir, 'sym')) | 
|  | cf = self.CopyFile('foo.txt', 'sym') | 
|  | self.assertRaises(error.ManifestInvalidPathError, cf._Copy) | 
|  |  | 
|  | def test_dest_block_symlink_traversal(self): | 
|  | """Do not allow writing through a symlink dir.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | platform_utils.symlink(tempfile.gettempdir(), | 
|  | os.path.join(self.topdir, 'sym')) | 
|  | cf = self.CopyFile('foo.txt', 'sym/foo.txt') | 
|  | self.assertRaises(error.ManifestInvalidPathError, cf._Copy) | 
|  |  | 
|  | def test_src_block_copy_to_dir(self): | 
|  | """Do not allow copying to a directory.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | os.makedirs(os.path.join(self.topdir, 'dir')) | 
|  | cf = self.CopyFile('foo.txt', 'dir') | 
|  | self.assertRaises(error.ManifestInvalidPathError, cf._Copy) | 
|  |  | 
|  |  | 
|  | class LinkFile(CopyLinkTestCase): | 
|  | """Check _LinkFile handling.""" | 
|  |  | 
|  | def LinkFile(self, src, dest): | 
|  | return project._LinkFile(self.worktree, src, self.topdir, dest) | 
|  |  | 
|  | def test_basic(self): | 
|  | """Basic test of linking a file from a project into the toplevel.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | lf = self.LinkFile('foo.txt', 'foo') | 
|  | lf._Link() | 
|  | dest = os.path.join(self.topdir, 'foo') | 
|  | self.assertExists(dest) | 
|  | self.assertTrue(os.path.islink(dest)) | 
|  | self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) | 
|  |  | 
|  | def test_src_subdir(self): | 
|  | """Link to a file in a subdir of a project.""" | 
|  | src = os.path.join(self.worktree, 'bar', 'foo.txt') | 
|  | os.makedirs(os.path.dirname(src)) | 
|  | self.touch(src) | 
|  | lf = self.LinkFile('bar/foo.txt', 'foo') | 
|  | lf._Link() | 
|  | self.assertExists(os.path.join(self.topdir, 'foo')) | 
|  |  | 
|  | def test_src_self(self): | 
|  | """Link to the project itself.""" | 
|  | dest = os.path.join(self.topdir, 'foo', 'bar') | 
|  | lf = self.LinkFile('.', 'foo/bar') | 
|  | lf._Link() | 
|  | self.assertExists(dest) | 
|  | self.assertEqual(os.path.join('..', 'git-project'), os.readlink(dest)) | 
|  |  | 
|  | def test_dest_subdir(self): | 
|  | """Link a file to a subdir of a checkout.""" | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar') | 
|  | self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub'))) | 
|  | lf._Link() | 
|  | self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar')) | 
|  |  | 
|  | def test_src_block_relative(self): | 
|  | """Do not allow relative symlinks.""" | 
|  | BAD_SOURCES = ( | 
|  | './', | 
|  | '..', | 
|  | '../', | 
|  | 'foo/.', | 
|  | 'foo/./bar', | 
|  | 'foo/..', | 
|  | 'foo/../foo', | 
|  | ) | 
|  | for src in BAD_SOURCES: | 
|  | lf = self.LinkFile(src, 'foo') | 
|  | self.assertRaises(error.ManifestInvalidPathError, lf._Link) | 
|  |  | 
|  | def test_update(self): | 
|  | """Make sure changed targets get updated.""" | 
|  | dest = os.path.join(self.topdir, 'sym') | 
|  |  | 
|  | src = os.path.join(self.worktree, 'foo.txt') | 
|  | self.touch(src) | 
|  | lf = self.LinkFile('foo.txt', 'sym') | 
|  | lf._Link() | 
|  | self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) | 
|  |  | 
|  | # Point the symlink somewhere else. | 
|  | os.unlink(dest) | 
|  | platform_utils.symlink(self.tempdir, dest) | 
|  | lf._Link() | 
|  | self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) |