| # Copyright (C) 2019 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Unittests for the project.py module.""" |
| |
| import contextlib |
| import os |
| from pathlib import Path |
| import subprocess |
| import tempfile |
| import unittest |
| |
| import error |
| import git_command |
| import git_config |
| import manifest_xml |
| import platform_utils |
| import project |
| |
| |
| @contextlib.contextmanager |
| def TempGitTree(): |
| """Create a new empty git checkout for testing.""" |
| with tempfile.TemporaryDirectory(prefix="repo-tests") as tempdir: |
| # Tests need to assume, that main is default branch at init, |
| # which is not supported in config until 2.28. |
| cmd = ["git", "init"] |
| if git_command.git_require((2, 28, 0)): |
| cmd += ["--initial-branch=main"] |
| else: |
| # Use template dir for init. |
| templatedir = tempfile.mkdtemp(prefix=".test-template") |
| with open(os.path.join(templatedir, "HEAD"), "w") as fp: |
| fp.write("ref: refs/heads/main\n") |
| cmd += ["--template", templatedir] |
| subprocess.check_call(cmd, cwd=tempdir) |
| yield tempdir |
| |
| |
| class FakeProject: |
| """A fake for Project for basic functionality.""" |
| |
| def __init__(self, worktree): |
| self.worktree = worktree |
| self.gitdir = os.path.join(worktree, ".git") |
| self.name = "fakeproject" |
| self.work_git = project.Project._GitGetByExec( |
| self, bare=False, gitdir=self.gitdir |
| ) |
| self.bare_git = project.Project._GitGetByExec( |
| self, bare=True, gitdir=self.gitdir |
| ) |
| self.config = git_config.GitConfig.ForRepository(gitdir=self.gitdir) |
| |
| |
| class ReviewableBranchTests(unittest.TestCase): |
| """Check ReviewableBranch behavior.""" |
| |
| def test_smoke(self): |
| """A quick run through everything.""" |
| with TempGitTree() as tempdir: |
| fakeproj = FakeProject(tempdir) |
| |
| # Generate some commits. |
| with open(os.path.join(tempdir, "readme"), "w") as fp: |
| fp.write("txt") |
| fakeproj.work_git.add("readme") |
| fakeproj.work_git.commit("-mAdd file") |
| fakeproj.work_git.checkout("-b", "work") |
| fakeproj.work_git.rm("-f", "readme") |
| fakeproj.work_git.commit("-mDel file") |
| |
| # Start off with the normal details. |
| rb = project.ReviewableBranch( |
| fakeproj, fakeproj.config.GetBranch("work"), "main" |
| ) |
| self.assertEqual("work", rb.name) |
| self.assertEqual(1, len(rb.commits)) |
| self.assertIn("Del file", rb.commits[0]) |
| d = rb.unabbrev_commits |
| self.assertEqual(1, len(d)) |
| short, long = next(iter(d.items())) |
| self.assertTrue(long.startswith(short)) |
| self.assertTrue(rb.base_exists) |
| # Hard to assert anything useful about this. |
| self.assertTrue(rb.date) |
| |
| # Now delete the tracking branch! |
| fakeproj.work_git.branch("-D", "main") |
| rb = project.ReviewableBranch( |
| fakeproj, fakeproj.config.GetBranch("work"), "main" |
| ) |
| self.assertEqual(0, len(rb.commits)) |
| self.assertFalse(rb.base_exists) |
| # Hard to assert anything useful about this. |
| self.assertTrue(rb.date) |
| |
| |
| class ProjectTests(unittest.TestCase): |
| """Check Project behavior.""" |
| |
| def test_encode_patchset_description(self): |
| self.assertEqual( |
| project.Project._encode_patchset_description("abcd00!! +"), |
| "abcd00%21%21_%2b", |
| ) |
| |
| |
| class CopyLinkTestCase(unittest.TestCase): |
| """TestCase for stub repo client checkouts. |
| |
| It'll have a layout like this: |
| tempdir/ # self.tempdir |
| checkout/ # self.topdir |
| git-project/ # self.worktree |
| |
| Attributes: |
| tempdir: A dedicated temporary directory. |
| worktree: The top of the repo client checkout. |
| topdir: The top of a project checkout. |
| """ |
| |
| def setUp(self): |
| self.tempdirobj = tempfile.TemporaryDirectory(prefix="repo_tests") |
| self.tempdir = self.tempdirobj.name |
| self.topdir = os.path.join(self.tempdir, "checkout") |
| self.worktree = os.path.join(self.topdir, "git-project") |
| os.makedirs(self.topdir) |
| os.makedirs(self.worktree) |
| |
| def tearDown(self): |
| self.tempdirobj.cleanup() |
| |
| @staticmethod |
| def touch(path): |
| with open(path, "w"): |
| pass |
| |
| def assertExists(self, path, msg=None): |
| """Make sure |path| exists.""" |
| if os.path.exists(path): |
| return |
| |
| if msg is None: |
| msg = ["path is missing: %s" % path] |
| while path != "/": |
| path = os.path.dirname(path) |
| if not path: |
| # If we're given something like "foo", abort once we get to |
| # "". |
| break |
| result = os.path.exists(path) |
| msg.append(f"\tos.path.exists({path}): {result}") |
| if result: |
| msg.append("\tcontents: %r" % os.listdir(path)) |
| break |
| msg = "\n".join(msg) |
| |
| raise self.failureException(msg) |
| |
| |
| class CopyFile(CopyLinkTestCase): |
| """Check _CopyFile handling.""" |
| |
| def CopyFile(self, src, dest): |
| return project._CopyFile(self.worktree, src, self.topdir, dest) |
| |
| def test_basic(self): |
| """Basic test of copying a file from a project to the toplevel.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| cf = self.CopyFile("foo.txt", "foo") |
| cf._Copy() |
| self.assertExists(os.path.join(self.topdir, "foo")) |
| |
| def test_src_subdir(self): |
| """Copy a file from a subdir of a project.""" |
| src = os.path.join(self.worktree, "bar", "foo.txt") |
| os.makedirs(os.path.dirname(src)) |
| self.touch(src) |
| cf = self.CopyFile("bar/foo.txt", "new.txt") |
| cf._Copy() |
| self.assertExists(os.path.join(self.topdir, "new.txt")) |
| |
| def test_dest_subdir(self): |
| """Copy a file to a subdir of a checkout.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| cf = self.CopyFile("foo.txt", "sub/dir/new.txt") |
| self.assertFalse(os.path.exists(os.path.join(self.topdir, "sub"))) |
| cf._Copy() |
| self.assertExists(os.path.join(self.topdir, "sub", "dir", "new.txt")) |
| |
| def test_update(self): |
| """Make sure changed files get copied again.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| dest = os.path.join(self.topdir, "bar") |
| with open(src, "w") as f: |
| f.write("1st") |
| cf = self.CopyFile("foo.txt", "bar") |
| cf._Copy() |
| self.assertExists(dest) |
| with open(dest) as f: |
| self.assertEqual(f.read(), "1st") |
| |
| with open(src, "w") as f: |
| f.write("2nd!") |
| cf._Copy() |
| with open(dest) as f: |
| self.assertEqual(f.read(), "2nd!") |
| |
| def test_src_block_symlink(self): |
| """Do not allow reading from a symlinked path.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| sym = os.path.join(self.worktree, "sym") |
| self.touch(src) |
| platform_utils.symlink("foo.txt", sym) |
| self.assertExists(sym) |
| cf = self.CopyFile("sym", "foo") |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_src_block_symlink_traversal(self): |
| """Do not allow reading through a symlink dir.""" |
| realfile = os.path.join(self.tempdir, "file.txt") |
| self.touch(realfile) |
| src = os.path.join(self.worktree, "bar", "file.txt") |
| platform_utils.symlink(self.tempdir, os.path.join(self.worktree, "bar")) |
| self.assertExists(src) |
| cf = self.CopyFile("bar/file.txt", "foo") |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_src_block_copy_from_dir(self): |
| """Do not allow copying from a directory.""" |
| src = os.path.join(self.worktree, "dir") |
| os.makedirs(src) |
| cf = self.CopyFile("dir", "foo") |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_dest_block_symlink(self): |
| """Do not allow writing to a symlink.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| platform_utils.symlink("dest", os.path.join(self.topdir, "sym")) |
| cf = self.CopyFile("foo.txt", "sym") |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_dest_block_symlink_traversal(self): |
| """Do not allow writing through a symlink dir.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| platform_utils.symlink( |
| tempfile.gettempdir(), os.path.join(self.topdir, "sym") |
| ) |
| cf = self.CopyFile("foo.txt", "sym/foo.txt") |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| def test_src_block_copy_to_dir(self): |
| """Do not allow copying to a directory.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| os.makedirs(os.path.join(self.topdir, "dir")) |
| cf = self.CopyFile("foo.txt", "dir") |
| self.assertRaises(error.ManifestInvalidPathError, cf._Copy) |
| |
| |
| class LinkFile(CopyLinkTestCase): |
| """Check _LinkFile handling.""" |
| |
| def LinkFile(self, src, dest): |
| return project._LinkFile(self.worktree, src, self.topdir, dest) |
| |
| def test_basic(self): |
| """Basic test of linking a file from a project into the toplevel.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| lf = self.LinkFile("foo.txt", "foo") |
| lf._Link() |
| dest = os.path.join(self.topdir, "foo") |
| self.assertExists(dest) |
| self.assertTrue(os.path.islink(dest)) |
| self.assertEqual( |
| os.path.join("git-project", "foo.txt"), os.readlink(dest) |
| ) |
| |
| def test_src_subdir(self): |
| """Link to a file in a subdir of a project.""" |
| src = os.path.join(self.worktree, "bar", "foo.txt") |
| os.makedirs(os.path.dirname(src)) |
| self.touch(src) |
| lf = self.LinkFile("bar/foo.txt", "foo") |
| lf._Link() |
| self.assertExists(os.path.join(self.topdir, "foo")) |
| |
| def test_src_self(self): |
| """Link to the project itself.""" |
| dest = os.path.join(self.topdir, "foo", "bar") |
| lf = self.LinkFile(".", "foo/bar") |
| lf._Link() |
| self.assertExists(dest) |
| self.assertEqual(os.path.join("..", "git-project"), os.readlink(dest)) |
| |
| def test_dest_subdir(self): |
| """Link a file to a subdir of a checkout.""" |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| lf = self.LinkFile("foo.txt", "sub/dir/foo/bar") |
| self.assertFalse(os.path.exists(os.path.join(self.topdir, "sub"))) |
| lf._Link() |
| self.assertExists(os.path.join(self.topdir, "sub", "dir", "foo", "bar")) |
| |
| def test_src_block_relative(self): |
| """Do not allow relative symlinks.""" |
| BAD_SOURCES = ( |
| "./", |
| "..", |
| "../", |
| "foo/.", |
| "foo/./bar", |
| "foo/..", |
| "foo/../foo", |
| ) |
| for src in BAD_SOURCES: |
| lf = self.LinkFile(src, "foo") |
| self.assertRaises(error.ManifestInvalidPathError, lf._Link) |
| |
| def test_update(self): |
| """Make sure changed targets get updated.""" |
| dest = os.path.join(self.topdir, "sym") |
| |
| src = os.path.join(self.worktree, "foo.txt") |
| self.touch(src) |
| lf = self.LinkFile("foo.txt", "sym") |
| lf._Link() |
| self.assertEqual( |
| os.path.join("git-project", "foo.txt"), os.readlink(dest) |
| ) |
| |
| # Point the symlink somewhere else. |
| os.unlink(dest) |
| platform_utils.symlink(self.tempdir, dest) |
| lf._Link() |
| self.assertEqual( |
| os.path.join("git-project", "foo.txt"), os.readlink(dest) |
| ) |
| |
| |
| class MigrateWorkTreeTests(unittest.TestCase): |
| """Check _MigrateOldWorkTreeGitDir handling.""" |
| |
| _SYMLINKS = { |
| "config", |
| "description", |
| "hooks", |
| "info", |
| "logs", |
| "objects", |
| "packed-refs", |
| "refs", |
| "rr-cache", |
| "shallow", |
| "svn", |
| } |
| _FILES = { |
| "COMMIT_EDITMSG", |
| "FETCH_HEAD", |
| "HEAD", |
| "index", |
| "ORIG_HEAD", |
| "unknown-file-should-be-migrated", |
| } |
| _CLEAN_FILES = { |
| "a-vim-temp-file~", |
| "#an-emacs-temp-file#", |
| } |
| |
| @classmethod |
| @contextlib.contextmanager |
| def _simple_layout(cls): |
| """Create a simple repo client checkout to test against.""" |
| with tempfile.TemporaryDirectory() as tempdir: |
| tempdir = Path(tempdir) |
| |
| gitdir = tempdir / ".repo/projects/src/test.git" |
| gitdir.mkdir(parents=True) |
| cmd = ["git", "init", "--bare", str(gitdir)] |
| subprocess.check_call(cmd) |
| |
| dotgit = tempdir / "src/test/.git" |
| dotgit.mkdir(parents=True) |
| for name in cls._SYMLINKS: |
| (dotgit / name).symlink_to( |
| f"../../../.repo/projects/src/test.git/{name}" |
| ) |
| for name in cls._FILES | cls._CLEAN_FILES: |
| (dotgit / name).write_text(name) |
| |
| yield tempdir |
| |
| def test_standard(self): |
| """Migrate a standard checkout that we expect.""" |
| with self._simple_layout() as tempdir: |
| dotgit = tempdir / "src/test/.git" |
| project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) |
| |
| # Make sure the dir was transformed into a symlink. |
| self.assertTrue(dotgit.is_symlink()) |
| self.assertEqual( |
| os.readlink(dotgit), |
| os.path.normpath("../../.repo/projects/src/test.git"), |
| ) |
| |
| # Make sure files were moved over. |
| gitdir = tempdir / ".repo/projects/src/test.git" |
| for name in self._FILES: |
| self.assertEqual(name, (gitdir / name).read_text()) |
| # Make sure files were removed. |
| for name in self._CLEAN_FILES: |
| self.assertFalse((gitdir / name).exists()) |
| |
| def test_unknown(self): |
| """A checkout with unknown files should abort.""" |
| with self._simple_layout() as tempdir: |
| dotgit = tempdir / "src/test/.git" |
| (tempdir / ".repo/projects/src/test.git/random-file").write_text( |
| "one" |
| ) |
| (dotgit / "random-file").write_text("two") |
| with self.assertRaises(error.GitError): |
| project.Project._MigrateOldWorkTreeGitDir(str(dotgit)) |
| |
| # Make sure no content was actually changed. |
| self.assertTrue(dotgit.is_dir()) |
| for name in self._FILES: |
| self.assertTrue((dotgit / name).is_file()) |
| for name in self._CLEAN_FILES: |
| self.assertTrue((dotgit / name).is_file()) |
| for name in self._SYMLINKS: |
| self.assertTrue((dotgit / name).is_symlink()) |
| |
| |
| class ManifestPropertiesFetchedCorrectly(unittest.TestCase): |
| """Ensure properties are fetched properly.""" |
| |
| def setUpManifest(self, tempdir): |
| repodir = os.path.join(tempdir, ".repo") |
| manifest_dir = os.path.join(repodir, "manifests") |
| manifest_file = os.path.join(repodir, manifest_xml.MANIFEST_FILE_NAME) |
| os.mkdir(repodir) |
| os.mkdir(manifest_dir) |
| manifest = manifest_xml.XmlManifest(repodir, manifest_file) |
| |
| return project.ManifestProject( |
| manifest, "test/manifest", os.path.join(tempdir, ".git"), tempdir |
| ) |
| |
| def test_manifest_config_properties(self): |
| """Test we are fetching the manifest config properties correctly.""" |
| |
| with TempGitTree() as tempdir: |
| fakeproj = self.setUpManifest(tempdir) |
| |
| # Set property using the expected Set method, then ensure |
| # the porperty functions are using the correct Get methods. |
| fakeproj.config.SetString( |
| "manifest.standalone", "https://chicken/manifest.git" |
| ) |
| self.assertEqual( |
| fakeproj.standalone_manifest_url, "https://chicken/manifest.git" |
| ) |
| |
| fakeproj.config.SetString( |
| "manifest.groups", "test-group, admin-group" |
| ) |
| self.assertEqual( |
| fakeproj.manifest_groups, "test-group, admin-group" |
| ) |
| |
| fakeproj.config.SetString("repo.reference", "mirror/ref") |
| self.assertEqual(fakeproj.reference, "mirror/ref") |
| |
| fakeproj.config.SetBoolean("repo.dissociate", False) |
| self.assertFalse(fakeproj.dissociate) |
| |
| fakeproj.config.SetBoolean("repo.archive", False) |
| self.assertFalse(fakeproj.archive) |
| |
| fakeproj.config.SetBoolean("repo.mirror", False) |
| self.assertFalse(fakeproj.mirror) |
| |
| fakeproj.config.SetBoolean("repo.worktree", False) |
| self.assertFalse(fakeproj.use_worktree) |
| |
| fakeproj.config.SetBoolean("repo.clonebundle", False) |
| self.assertFalse(fakeproj.clone_bundle) |
| |
| fakeproj.config.SetBoolean("repo.submodules", False) |
| self.assertFalse(fakeproj.submodules) |
| |
| fakeproj.config.SetBoolean("repo.git-lfs", False) |
| self.assertFalse(fakeproj.git_lfs) |
| |
| fakeproj.config.SetBoolean("repo.superproject", False) |
| self.assertFalse(fakeproj.use_superproject) |
| |
| fakeproj.config.SetBoolean("repo.partialclone", False) |
| self.assertFalse(fakeproj.partial_clone) |
| |
| fakeproj.config.SetString("repo.depth", "48") |
| self.assertEqual(fakeproj.depth, 48) |
| |
| fakeproj.config.SetString("repo.depth", "invalid_depth") |
| self.assertEqual(fakeproj.depth, None) |
| |
| fakeproj.config.SetString("repo.clonefilter", "blob:limit=10M") |
| self.assertEqual(fakeproj.clone_filter, "blob:limit=10M") |
| |
| fakeproj.config.SetString( |
| "repo.partialcloneexclude", "third_party/big_repo" |
| ) |
| self.assertEqual( |
| fakeproj.partial_clone_exclude, "third_party/big_repo" |
| ) |
| |
| fakeproj.config.SetString("manifest.platform", "auto") |
| self.assertEqual(fakeproj.manifest_platform, "auto") |