| # Copyright (C) 2019 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Unittests for the project.py module.""" |
| |
| import contextlib |
| import os |
| from pathlib import Path |
| import subprocess |
| import tempfile |
| import unittest |
| |
| import error |
| import manifest_xml |
| import git_command |
| import git_config |
| import platform_utils |
| import project |
| |
| |
| @contextlib.contextmanager |
| def TempGitTree(): |
| """Create a new empty git checkout for testing.""" |
| with tempfile.TemporaryDirectory(prefix='repo-tests') as tempdir: |
| # Tests need to assume, that main is default branch at init, |
| # which is not supported in config until 2.28. |
| cmd = ['git', 'init'] |
| if git_command.git_require((2, 28, 0)): |
| cmd += ['--initial-branch=main'] |
| else: |
| # Use template dir for init. |
| templatedir = tempfile.mkdtemp(prefix='.test-template') |
| with open(os.path.join(templatedir, 'HEAD'), 'w') as fp: |
| fp.write('ref: refs/heads/main\n') |
| cmd += ['--template', templatedir] |
| subprocess.check_call(cmd, cwd=tempdir) |
| yield tempdir |
| |
| |
| class FakeProject(object): |
| """A fake for Project for basic functionality.""" |
| |
| def __init__(self, worktree): |
| self.worktree = worktree |
| self.gitdir = os.path.join(worktree, '.git') |
| self.name = 'fakeproject' |
| self.work_git = project.Project._GitGetByExec( |
| self, bare=False, gitdir=self.gitdir) |
| self.bare_git = project.Project._GitGetByExec( |
| self, bare=True, gitdir=self.gitdir) |
| self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir) |
| |
| |
| class ReviewableBranchTests(unittest.TestCase): |
| """Check ReviewableBranch behavior.""" |
| |
| def test_smoke(self): |
| """A quick run through everything.""" |
| with TempGitTree() as tempdir: |
| fakeproj = FakeProject(tempdir) |
| |
| # Generate some commits. |
| with open(os.path.join(tempdir, 'readme'), 'w') as fp: |
| fp.write('txt') |
| fakeproj.work_git.add('readme') |
| fakeproj.work_git.commit('-mAdd file') |
| fakeproj.work_git.checkout('-b', 'work') |
| fakeproj.work_git.rm('-f', 'readme') |
| fakeproj.work_git.commit('-mDel file') |
| |
| # Start off with the normal details. |
| rb = project.ReviewableBranch( |
| fakeproj, fakeproj.config.GetBranch('work'), 'main') |
| self.assertEqual('work', rb.name) |
| self.assertEqual(1, len(rb.commits)) |
| self.assertIn('Del file', rb.commits[0]) |
| d = rb.unabbrev_commits |
| self.assertEqual(1, len(d)) |
| short, long = next(iter(d.items())) |
| self.assertTrue(long.startswith(short)) |
| self.assertTrue(rb.base_exists) |
| # Hard to assert anything useful about this. |
| self.assertTrue(rb.date) |
| |
| # Now delete the tracking branch! |
| fakeproj.work_git.branch('-D', 'main') |
| rb = project.ReviewableBranch( |
| fakeproj, fakeproj.config.GetBranch('work'), 'main') |
| self.assertEqual(0, len(rb.commits)) |
| self.assertFalse(rb.base_exists) |
| # Hard to assert anything useful about this. |
| self.assertTrue(rb.date) |
| |
| |
| class CopyLinkTestCase(unittest.TestCase): |
| """TestCase for stub repo client checkouts. |
| |
| It'll have a layout like this: |
| tempdir/ # self.tempdir |
| checkout/ # self.topdir |
| git-project/ # self.worktree |
| |
| Attributes: |
| tempdir: A dedicated temporary directory. |
| worktree: The top of the repo client checkout. |
| topdir: The top of a project checkout. |
| """ |
| |
| def setUp(self): |
| self.tempdirobj = tempfile.TemporaryDirectory(prefix='repo_tests') |
| self.tempdir = self.tempdirobj.name |
| self.topdir = os.path.join(self.tempdir, 'checkout') |
| self.worktree = os.path.join(self.topdir, 'git-project') |
| os.makedirs(self.topdir) |
| os.makedirs(self.worktree) |
| |
| def tearDown(self): |
| self.tempdirobj.cleanup() |
| |
| @staticmethod |
| def touch(path): |
| with open(path, 'w'): |
| pass |
| |
| def assertExists(self, path, msg=None): |
| """Make sure |path| exists.""" |
| if os.path.exists(path): |
| return |
| |
| if msg is None: |
| msg = ['path is missing: %s' % path] |
| while path != '/': |
| path = os.path.dirname(path) |
| if not path: |
| # If we're given something like "foo", abort once we get to "". |
| break |
| result = os.path.exists(path) |
| msg.append('\tos.path.exists(%s): %s' % (path, result)) |
| if result: |
| msg.append('\tcontents: %r' % os.listdir(path)) |
| break |
| msg = '\n'.join(msg) |
| |
| raise self.failureException(msg) |
| |
| |
| class CopyFile(CopyLinkTestCase): |
| """Check _CopyFile handling.""" |
| |
| def CopyFile(self, src, dest): |
| return project._CopyFile(self.worktree, src, self.topdir, dest) |
| |
| def test_basic(self): |
| """Basic test of copying a file from a project to the toplevel.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| cf = self.CopyFile('foo.txt', 'foo') |
| cf._Copy() |
| self.assertExists(os.path.join(self.topdir, 'foo')) |
| |
| def test_src_subdir(self): |
| """Copy a file from a subdir of a project.""" |
| src = os.path.join(self.worktree, 'bar', 'foo.txt') |
| os.makedirs(os.path.dirname(src)) |
| self.touch(src) |
| cf = self.CopyFile('bar/foo.txt', 'new.txt') |
| cf._Copy() |
| self.assertExists(os.path.join(self.topdir, 'new.txt')) |
| |
| def test_dest_subdir(self): |
| """Copy a file to a subdir of a checkout.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| cf = self.CopyFile('foo.txt', 'sub/dir/new.txt') |
| self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub'))) |
| cf._Copy() |
| self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'new.txt')) |
| |
| def test_update(self): |
| """Make sure changed files get copied again.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| dest = os.path.join(self.topdir, 'bar') |
| with open(src, 'w') as f: |
| f.write('1st') |
| cf = self.CopyFile('foo.txt', 'bar') |
| cf._Copy() |
| self.assertExists(dest) |
| with open(dest) as f: |
| self.assertEqual(f.read(), '1st') |
| |
| with open(src, 'w') as f: |
| f.write('2nd!') |
| cf._Copy() |
| with open(dest) as f: |
| self.assertEqual(f.read(), '2nd!') |
| |
| def test_src_block_symlink(self): |
| """Do not allow reading from a symlinked path.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| sym = os.path.join(self.worktree, 'sym') |
| self.touch(src) |
| platform_utils.symlink('foo.txt', sym) |
| self.assertExists(sym) |
| cf = self.CopyFile('sym', 'foo') |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_src_block_symlink_traversal(self): |
| """Do not allow reading through a symlink dir.""" |
| realfile = os.path.join(self.tempdir, 'file.txt') |
| self.touch(realfile) |
| src = os.path.join(self.worktree, 'bar', 'file.txt') |
| platform_utils.symlink(self.tempdir, os.path.join(self.worktree, 'bar')) |
| self.assertExists(src) |
| cf = self.CopyFile('bar/file.txt', 'foo') |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_src_block_copy_from_dir(self): |
| """Do not allow copying from a directory.""" |
| src = os.path.join(self.worktree, 'dir') |
| os.makedirs(src) |
| cf = self.CopyFile('dir', 'foo') |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_dest_block_symlink(self): |
| """Do not allow writing to a symlink.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| platform_utils.symlink('dest', os.path.join(self.topdir, 'sym')) |
| cf = self.CopyFile('foo.txt', 'sym') |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_dest_block_symlink_traversal(self): |
| """Do not allow writing through a symlink dir.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| platform_utils.symlink(tempfile.gettempdir(), |
| os.path.join(self.topdir, 'sym')) |
| cf = self.CopyFile('foo.txt', 'sym/foo.txt') |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_src_block_copy_to_dir(self): |
| """Do not allow copying to a directory.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| os.makedirs(os.path.join(self.topdir, 'dir')) |
| cf = self.CopyFile('foo.txt', 'dir') |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| |
| class LinkFile(CopyLinkTestCase): |
| """Check _LinkFile handling.""" |
| |
| def LinkFile(self, src, dest): |
| return project._LinkFile(self.worktree, src, self.topdir, dest) |
| |
| def test_basic(self): |
| """Basic test of linking a file from a project into the toplevel.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| lf = self.LinkFile('foo.txt', 'foo') |
| lf._Link() |
| dest = os.path.join(self.topdir, 'foo') |
| self.assertExists(dest) |
| self.assertTrue(os.path.islink(dest)) |
| self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) |
| |
| def test_src_subdir(self): |
| """Link to a file in a subdir of a project.""" |
| src = os.path.join(self.worktree, 'bar', 'foo.txt') |
| os.makedirs(os.path.dirname(src)) |
| self.touch(src) |
| lf = self.LinkFile('bar/foo.txt', 'foo') |
| lf._Link() |
| self.assertExists(os.path.join(self.topdir, 'foo')) |
| |
| def test_src_self(self): |
| """Link to the project itself.""" |
| dest = os.path.join(self.topdir, 'foo', 'bar') |
| lf = self.LinkFile('.', 'foo/bar') |
| lf._Link() |
| self.assertExists(dest) |
| self.assertEqual(os.path.join('..', 'git-project'), os.readlink(dest)) |
| |
| def test_dest_subdir(self): |
| """Link a file to a subdir of a checkout.""" |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| lf = self.LinkFile('foo.txt', 'sub/dir/foo/bar') |
| self.assertFalse(os.path.exists(os.path.join(self.topdir, 'sub'))) |
| lf._Link() |
| self.assertExists(os.path.join(self.topdir, 'sub', 'dir', 'foo', 'bar')) |
| |
| def test_src_block_relative(self): |
| """Do not allow relative symlinks.""" |
| BAD_SOURCES = ( |
| './', |
| '..', |
| '../', |
| 'foo/.', |
| 'foo/./bar', |
| 'foo/..', |
| 'foo/../foo', |
| ) |
| for src in BAD_SOURCES: |
| lf = self.LinkFile(src, 'foo') |
| self.assertRaises(error.ManifestInvalidPathError, lf._Link) |
| |
| def test_update(self): |
| """Make sure changed targets get updated.""" |
| dest = os.path.join(self.topdir, 'sym') |
| |
| src = os.path.join(self.worktree, 'foo.txt') |
| self.touch(src) |
| lf = self.LinkFile('foo.txt', 'sym') |
| lf._Link() |
| self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) |
| |
| # Point the symlink somewhere else. |
| os.unlink(dest) |
| platform_utils.symlink(self.tempdir, dest) |
| lf._Link() |
| self.assertEqual(os.path.join('git-project', 'foo.txt'), os.readlink(dest)) |
| |
| |
| class MigrateWorkTreeTests(unittest.TestCase): |
| """Check _MigrateOldWorkTreeGitDir handling.""" |
| |
| _SYMLINKS = { |
| 'config', 'description', 'hooks', 'info', 'logs', 'objects', |
| 'packed-refs', 'refs', 'rr-cache', 'shallow', 'svn', |
| } |
| _FILES = { |
| 'COMMIT_EDITMSG', 'FETCH_HEAD', 'HEAD', 'index', 'ORIG_HEAD', |
| 'unknown-file-should-be-migrated', |
| } |
| _CLEAN_FILES = { |
| 'a-vim-temp-file~', '#an-emacs-temp-file#', |
| } |
| |
| @classmethod |
| @contextlib.contextmanager |
| def _simple_layout(cls): |
| """Create a simple repo client checkout to test against.""" |
| with tempfile.TemporaryDirectory() as tempdir: |
| tempdir = Path(tempdir) |
| |
| gitdir = tempdir / '.repo/projects/src/test.git' |
| gitdir.mkdir(parents=True) |
| cmd = ['git', 'init', '--bare', str(gitdir)] |
| subprocess.check_call(cmd) |
| |
| dotgit = tempdir / 'src/test/.git' |
| dotgit.mkdir(parents=True) |
| for name in cls._SYMLINKS: |
| (dotgit / name).symlink_to(f'../../../.repo/projects/src/test.git/{name}') |
| for name in cls._FILES | cls._CLEAN_FILES: |
| (dotgit / name).write_text(name) |
| |
| yield tempdir |
| |
| def test_standard(self): |
| """Migrate a standard checkout that we expect.""" |
| with self._simple_layout() as tempdir: |
| dotgit = tempdir / 'src/test/.git' |
| project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) |
| |
| # Make sure the dir was transformed into a symlink. |
| self.assertTrue(dotgit.is_symlink()) |
| self.assertEqual(os.readlink(dotgit), os.path.normpath('../../.repo/projects/src/test.git')) |
| |
| # Make sure files were moved over. |
| gitdir = tempdir / '.repo/projects/src/test.git' |
| for name in self._FILES: |
| self.assertEqual(name, (gitdir / name).read_text()) |
| # Make sure files were removed. |
| for name in self._CLEAN_FILES: |
| self.assertFalse((gitdir / name).exists()) |
| |
| def test_unknown(self): |
| """A checkout with unknown files should abort.""" |
| with self._simple_layout() as tempdir: |
| dotgit = tempdir / 'src/test/.git' |
| (tempdir / '.repo/projects/src/test.git/random-file').write_text('one') |
| (dotgit / 'random-file').write_text('two') |
| with self.assertRaises(error.GitError): |
| project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) |
| |
| # Make sure no content was actually changed. |
| self.assertTrue(dotgit.is_dir()) |
| for name in self._FILES: |
| self.assertTrue((dotgit / name).is_file()) |
| for name in self._CLEAN_FILES: |
| self.assertTrue((dotgit / name).is_file()) |
| for name in self._SYMLINKS: |
| self.assertTrue((dotgit / name).is_symlink()) |
| |
| |
| class ManifestPropertiesFetchedCorrectly(unittest.TestCase): |
| """Ensure properties are fetched properly.""" |
| |
| def setUpManifest(self, tempdir): |
| repodir = os.path.join(tempdir, '.repo') |
| manifest_dir = os.path.join(repodir, 'manifests') |
| manifest_file = os.path.join( |
| repodir, manifest_xml.MANIFEST_FILE_NAME) |
| local_manifest_dir = os.path.join( |
| repodir, manifest_xml.LOCAL_MANIFESTS_DIR_NAME) |
| os.mkdir(repodir) |
| os.mkdir(manifest_dir) |
| manifest = manifest_xml.XmlManifest(repodir, manifest_file) |
| |
| return project.ManifestProject( |
| manifest, 'test/manifest', os.path.join(tempdir, '.git'), tempdir) |
| |
| def test_manifest_config_properties(self): |
| """Test we are fetching the manifest config properties correctly.""" |
| |
| with TempGitTree() as tempdir: |
| fakeproj = self.setUpManifest(tempdir) |
| |
| # Set property using the expected Set method, then ensure |
| # the porperty functions are using the correct Get methods. |
| fakeproj.config.SetString( |
| 'manifest.standalone', 'https://chicken/manifest.git') |
| self.assertEqual( |
| fakeproj.standalone_manifest_url, 'https://chicken/manifest.git') |
| |
| fakeproj.config.SetString('manifest.groups', 'test-group, admin-group') |
| self.assertEqual(fakeproj.manifest_groups, 'test-group, admin-group') |
| |
| fakeproj.config.SetString('repo.reference', 'mirror/ref') |
| self.assertEqual(fakeproj.reference, 'mirror/ref') |
| |
| fakeproj.config.SetBoolean('repo.dissociate', False) |
| self.assertFalse(fakeproj.dissociate) |
| |
| fakeproj.config.SetBoolean('repo.archive', False) |
| self.assertFalse(fakeproj.archive) |
| |
| fakeproj.config.SetBoolean('repo.mirror', False) |
| self.assertFalse(fakeproj.mirror) |
| |
| fakeproj.config.SetBoolean('repo.worktree', False) |
| self.assertFalse(fakeproj.use_worktree) |
| |
| fakeproj.config.SetBoolean('repo.clonebundle', False) |
| self.assertFalse(fakeproj.clone_bundle) |
| |
| fakeproj.config.SetBoolean('repo.submodules', False) |
| self.assertFalse(fakeproj.submodules) |
| |
| fakeproj.config.SetBoolean('repo.git-lfs', False) |
| self.assertFalse(fakeproj.git_lfs) |
| |
| fakeproj.config.SetBoolean('repo.superproject', False) |
| self.assertFalse(fakeproj.use_superproject) |
| |
| fakeproj.config.SetBoolean('repo.partialclone', False) |
| self.assertFalse(fakeproj.partial_clone) |
| |
| fakeproj.config.SetString('repo.depth', '48') |
| self.assertEqual(fakeproj.depth, '48') |
| |
| fakeproj.config.SetString('repo.clonefilter', 'blob:limit=10M') |
| self.assertEqual(fakeproj.clone_filter, 'blob:limit=10M') |
| |
| fakeproj.config.SetString('repo.partialcloneexclude', 'third_party/big_repo') |
| self.assertEqual(fakeproj.partial_clone_exclude, 'third_party/big_repo') |
| |
| fakeproj.config.SetString('manifest.platform', 'auto') |
| self.assertEqual(fakeproj.manifest_platform, 'auto') |