summaryrefslogtreecommitdiff
path: root/test/test_repo.py
diff options
context:
space:
mode:
authorKonrad Weihmann <kweihmann@outlook.com>2020-07-10 18:41:02 +0200
committerKonrad Weihmann <kweihmann@outlook.com>2020-07-12 15:01:21 +0200
commit0374d7cf84ecd8182b74a639fcfdb9eafddcfd15 (patch)
treef462fb9fc434f197b39213b53f33f4e09fc0329e /test/test_repo.py
parent9cb7ae8d9721e1269f5bacd6dbc33ecdec4659c0 (diff)
downloadgitpython-0374d7cf84ecd8182b74a639fcfdb9eafddcfd15.tar.gz
tests: move to root dir
This should ensure that tests are NOT packaged into release package by setuptools, as tests are development only + fixtures after moving Signed-off-by: Konrad Weihmann <kweihmann@outlook.com>
Diffstat (limited to 'test/test_repo.py')
-rw-r--r--test/test_repo.py1031
1 files changed, 1031 insertions, 0 deletions
diff --git a/test/test_repo.py b/test/test_repo.py
new file mode 100644
index 00000000..0809175f
--- /dev/null
+++ b/test/test_repo.py
@@ -0,0 +1,1031 @@
+# -*- coding: utf-8 -*-
+# test_repo.py
+# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
+#
+# This module is part of GitPython and is released under
+# the BSD License: http://www.opensource.org/licenses/bsd-license.php
+import glob
+import io
+from io import BytesIO
+import itertools
+import os
+import pathlib
+import pickle
+import tempfile
+from unittest import mock, skipIf, SkipTest
+
+from git import (
+ InvalidGitRepositoryError,
+ Repo,
+ NoSuchPathError,
+ Head,
+ Commit,
+ Object,
+ Tree,
+ IndexFile,
+ Git,
+ Reference,
+ GitDB,
+ Submodule,
+ GitCmdObjectDB,
+ Remote,
+ BadName,
+ GitCommandError
+)
+from git.exc import (
+ BadObject,
+)
+from git.repo.fun import touch
+from test.lib import (
+ TestBase,
+ with_rw_repo,
+ fixture
+)
+from git.util import HIDE_WINDOWS_KNOWN_ERRORS, cygpath
+from test.lib import with_rw_directory
+from git.util import join_path_native, rmtree, rmfile, bin_to_hex
+
+import os.path as osp
+
+
+def iter_flatten(lol):
+ for items in lol:
+ for item in items:
+ yield item
+
+
+def flatten(lol):
+ return list(iter_flatten(lol))
+
+
+_tc_lock_fpaths = osp.join(osp.dirname(__file__), '../../.git/*.lock')
+
+
+def _rm_lock_files():
+ for lfp in glob.glob(_tc_lock_fpaths):
+ rmfile(lfp)
+
+
+class TestRepo(TestBase):
+
+ def setUp(self):
+ _rm_lock_files()
+
+ def tearDown(self):
+ for lfp in glob.glob(_tc_lock_fpaths):
+ if osp.isfile(lfp):
+ raise AssertionError('Previous TC left hanging git-lock file: {}'.format(lfp))
+ import gc
+ gc.collect()
+
+ def test_new_should_raise_on_invalid_repo_location(self):
+ self.assertRaises(InvalidGitRepositoryError, Repo, tempfile.gettempdir())
+
+ def test_new_should_raise_on_non_existent_path(self):
+ self.assertRaises(NoSuchPathError, Repo, "repos/foobar")
+
+ @with_rw_repo('0.3.2.1')
+ def test_repo_creation_from_different_paths(self, rw_repo):
+ r_from_gitdir = Repo(rw_repo.git_dir)
+ self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir)
+ assert r_from_gitdir.git_dir.endswith('.git')
+ assert not rw_repo.git.working_dir.endswith('.git')
+ self.assertEqual(r_from_gitdir.git.working_dir, rw_repo.git.working_dir)
+
+ @with_rw_repo('0.3.2.1')
+ def test_repo_creation_pathlib(self, rw_repo):
+ r_from_gitdir = Repo(pathlib.Path(rw_repo.git_dir))
+ self.assertEqual(r_from_gitdir.git_dir, rw_repo.git_dir)
+
+ def test_description(self):
+ txt = "Test repository"
+ self.rorepo.description = txt
+ self.assertEqual(self.rorepo.description, txt)
+
+ def test_heads_should_return_array_of_head_objects(self):
+ for head in self.rorepo.heads:
+ self.assertEqual(Head, head.__class__)
+
+ def test_heads_should_populate_head_data(self):
+ for head in self.rorepo.heads:
+ assert head.name
+ self.assertIsInstance(head.commit, Commit)
+ # END for each head
+
+ self.assertIsInstance(self.rorepo.heads.master, Head)
+ self.assertIsInstance(self.rorepo.heads['master'], Head)
+
+ def test_tree_from_revision(self):
+ tree = self.rorepo.tree('0.1.6')
+ self.assertEqual(len(tree.hexsha), 40)
+ self.assertEqual(tree.type, "tree")
+ self.assertEqual(self.rorepo.tree(tree), tree)
+
+ # try from invalid revision that does not exist
+ self.assertRaises(BadName, self.rorepo.tree, 'hello world')
+
+ def test_pickleable(self):
+ pickle.loads(pickle.dumps(self.rorepo))
+
+ def test_commit_from_revision(self):
+ commit = self.rorepo.commit('0.1.4')
+ self.assertEqual(commit.type, 'commit')
+ self.assertEqual(self.rorepo.commit(commit), commit)
+
+ def test_commits(self):
+ mc = 10
+ commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc))
+ self.assertEqual(len(commits), mc)
+
+ c = commits[0]
+ self.assertEqual('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha)
+ self.assertEqual(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents])
+ self.assertEqual("ce41fc29549042f1aa09cc03174896cf23f112e3", c.tree.hexsha)
+ self.assertEqual("Michael Trier", c.author.name)
+ self.assertEqual("mtrier@gmail.com", c.author.email)
+ self.assertEqual(1232829715, c.authored_date)
+ self.assertEqual(5 * 3600, c.author_tz_offset)
+ self.assertEqual("Michael Trier", c.committer.name)
+ self.assertEqual("mtrier@gmail.com", c.committer.email)
+ self.assertEqual(1232829715, c.committed_date)
+ self.assertEqual(5 * 3600, c.committer_tz_offset)
+ self.assertEqual("Bumped version 0.1.6\n", c.message)
+
+ c = commits[1]
+ self.assertIsInstance(c.parents, tuple)
+
+ def test_trees(self):
+ mc = 30
+ num_trees = 0
+ for tree in self.rorepo.iter_trees('0.1.5', max_count=mc):
+ num_trees += 1
+ self.assertIsInstance(tree, Tree)
+ # END for each tree
+ self.assertEqual(num_trees, mc)
+
+ def _assert_empty_repo(self, repo):
+ # test all kinds of things with an empty, freshly initialized repo.
+ # It should throw good errors
+
+ # entries should be empty
+ self.assertEqual(len(repo.index.entries), 0)
+
+ # head is accessible
+ assert repo.head
+ assert repo.head.ref
+ assert not repo.head.is_valid()
+
+ # we can change the head to some other ref
+ head_ref = Head.from_path(repo, Head.to_full_path('some_head'))
+ assert not head_ref.is_valid()
+ repo.head.ref = head_ref
+
+ # is_dirty can handle all kwargs
+ for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)):
+ assert not repo.is_dirty(*args)
+ # END for each arg
+
+ # we can add a file to the index ( if we are not bare )
+ if not repo.bare:
+ pass
+ # END test repos with working tree
+
+ @with_rw_directory
+ def test_clone_from_keeps_env(self, rw_dir):
+ original_repo = Repo.init(osp.join(rw_dir, "repo"))
+ environment = {"entry1": "value", "another_entry": "10"}
+
+ cloned = Repo.clone_from(original_repo.git_dir, osp.join(rw_dir, "clone"), env=environment)
+
+ self.assertEqual(environment, cloned.git.environment())
+
+ @with_rw_directory
+ def test_date_format(self, rw_dir):
+ repo = Repo.init(osp.join(rw_dir, "repo"))
+ # @-timestamp is the format used by git commit hooks
+ repo.index.commit("Commit messages", commit_date="@1400000000 +0000")
+
+ @with_rw_directory
+ def test_clone_from_pathlib(self, rw_dir):
+ original_repo = Repo.init(osp.join(rw_dir, "repo"))
+
+ Repo.clone_from(original_repo.git_dir, pathlib.Path(rw_dir) / "clone_pathlib")
+
+ @with_rw_directory
+ def test_clone_from_pathlib_withConfig(self, rw_dir):
+ original_repo = Repo.init(osp.join(rw_dir, "repo"))
+
+ cloned = Repo.clone_from(original_repo.git_dir, pathlib.Path(rw_dir) / "clone_pathlib_withConfig",
+ multi_options=["--recurse-submodules=repo",
+ "--config core.filemode=false",
+ "--config submodule.repo.update=checkout"])
+
+ self.assertEqual(cloned.config_reader().get_value('submodule', 'active'), 'repo')
+ self.assertEqual(cloned.config_reader().get_value('core', 'filemode'), False)
+ self.assertEqual(cloned.config_reader().get_value('submodule "repo"', 'update'), 'checkout')
+
+ def test_clone_from_with_path_contains_unicode(self):
+ with tempfile.TemporaryDirectory() as tmpdir:
+ unicode_dir_name = '\u0394'
+ path_with_unicode = os.path.join(tmpdir, unicode_dir_name)
+ os.makedirs(path_with_unicode)
+
+ try:
+ Repo.clone_from(
+ url=self._small_repo_url(),
+ to_path=path_with_unicode,
+ )
+ except UnicodeEncodeError:
+ self.fail('Raised UnicodeEncodeError')
+
+ @with_rw_repo('HEAD')
+ def test_max_chunk_size(self, repo):
+ class TestOutputStream(TestBase):
+ def __init__(self, max_chunk_size):
+ self.max_chunk_size = max_chunk_size
+
+ def write(self, b):
+ self.assertTrue(len(b) <= self.max_chunk_size)
+
+ for chunk_size in [16, 128, 1024]:
+ repo.git.status(output_stream=TestOutputStream(chunk_size), max_chunk_size=chunk_size)
+
+ repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE), max_chunk_size=None)
+ repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE), max_chunk_size=-10)
+ repo.git.log(n=100, output_stream=TestOutputStream(io.DEFAULT_BUFFER_SIZE))
+
+ def test_init(self):
+ prev_cwd = os.getcwd()
+ os.chdir(tempfile.gettempdir())
+ git_dir_rela = "repos/foo/bar.git"
+ del_dir_abs = osp.abspath("repos")
+ git_dir_abs = osp.abspath(git_dir_rela)
+ try:
+ # with specific path
+ for path in (git_dir_rela, git_dir_abs):
+ r = Repo.init(path=path, bare=True)
+ self.assertIsInstance(r, Repo)
+ assert r.bare is True
+ assert not r.has_separate_working_tree()
+ assert osp.isdir(r.git_dir)
+
+ self._assert_empty_repo(r)
+
+ # test clone
+ clone_path = path + "_clone"
+ rc = r.clone(clone_path)
+ self._assert_empty_repo(rc)
+
+ try:
+ rmtree(clone_path)
+ except OSError:
+ # when relative paths are used, the clone may actually be inside
+ # of the parent directory
+ pass
+ # END exception handling
+
+ # try again, this time with the absolute version
+ rc = Repo.clone_from(r.git_dir, clone_path)
+ self._assert_empty_repo(rc)
+
+ rmtree(git_dir_abs)
+ try:
+ rmtree(clone_path)
+ except OSError:
+ # when relative paths are used, the clone may actually be inside
+ # of the parent directory
+ pass
+ # END exception handling
+
+ # END for each path
+
+ os.makedirs(git_dir_rela)
+ os.chdir(git_dir_rela)
+ r = Repo.init(bare=False)
+ assert r.bare is False
+ assert not r.has_separate_working_tree()
+
+ self._assert_empty_repo(r)
+ finally:
+ try:
+ rmtree(del_dir_abs)
+ except OSError:
+ pass
+ os.chdir(prev_cwd)
+ # END restore previous state
+
+ def test_bare_property(self):
+ self.rorepo.bare
+
+ def test_daemon_export(self):
+ orig_val = self.rorepo.daemon_export
+ self.rorepo.daemon_export = not orig_val
+ self.assertEqual(self.rorepo.daemon_export, (not orig_val))
+ self.rorepo.daemon_export = orig_val
+ self.assertEqual(self.rorepo.daemon_export, orig_val)
+
+ def test_alternates(self):
+ cur_alternates = self.rorepo.alternates
+ # empty alternates
+ self.rorepo.alternates = []
+ self.assertEqual(self.rorepo.alternates, [])
+ alts = ["other/location", "this/location"]
+ self.rorepo.alternates = alts
+ self.assertEqual(alts, self.rorepo.alternates)
+ self.rorepo.alternates = cur_alternates
+
+ def test_repr(self):
+ assert repr(self.rorepo).startswith('<git.repo.base.Repo ')
+
+ def test_is_dirty_with_bare_repository(self):
+ orig_value = self.rorepo._bare
+ self.rorepo._bare = True
+ self.assertFalse(self.rorepo.is_dirty())
+ self.rorepo._bare = orig_value
+
+ def test_is_dirty(self):
+ self.rorepo._bare = False
+ for index in (0, 1):
+ for working_tree in (0, 1):
+ for untracked_files in (0, 1):
+ assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False)
+ # END untracked files
+ # END working tree
+ # END index
+ orig_val = self.rorepo._bare
+ self.rorepo._bare = True
+ assert self.rorepo.is_dirty() is False
+ self.rorepo._bare = orig_val
+
+ @with_rw_repo('HEAD')
+ def test_is_dirty_with_path(self, rwrepo):
+ assert rwrepo.is_dirty(path="git") is False
+
+ with open(osp.join(rwrepo.working_dir, "git", "util.py"), "at") as f:
+ f.write("junk")
+ assert rwrepo.is_dirty(path="git") is True
+ assert rwrepo.is_dirty(path="doc") is False
+
+ rwrepo.git.add(Git.polish_url(osp.join("git", "util.py")))
+ assert rwrepo.is_dirty(index=False, path="git") is False
+ assert rwrepo.is_dirty(path="git") is True
+
+ with open(osp.join(rwrepo.working_dir, "doc", "no-such-file.txt"), "wt") as f:
+ f.write("junk")
+ assert rwrepo.is_dirty(path="doc") is False
+ assert rwrepo.is_dirty(untracked_files=True, path="doc") is True
+
+ def test_head(self):
+ self.assertEqual(self.rorepo.head.reference.object, self.rorepo.active_branch.object)
+
+ def test_index(self):
+ index = self.rorepo.index
+ self.assertIsInstance(index, IndexFile)
+
+ def test_tag(self):
+ assert self.rorepo.tag('refs/tags/0.1.5').commit
+
+ def test_archive(self):
+ tmpfile = tempfile.mktemp(suffix='archive-test')
+ with open(tmpfile, 'wb') as stream:
+ self.rorepo.archive(stream, '0.1.6', path='doc')
+ assert stream.tell()
+ os.remove(tmpfile)
+
+ @mock.patch.object(Git, '_call_process')
+ def test_should_display_blame_information(self, git):
+ git.return_value = fixture('blame')
+ b = self.rorepo.blame('master', 'lib/git.py')
+ self.assertEqual(13, len(b))
+ self.assertEqual(2, len(b[0]))
+ # self.assertEqual(25, reduce(lambda acc, x: acc + len(x[-1]), b))
+ self.assertEqual(hash(b[0][0]), hash(b[9][0]))
+ c = b[0][0]
+ self.assertTrue(git.called)
+
+ self.assertEqual('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha)
+ self.assertEqual('Tom Preston-Werner', c.author.name)
+ self.assertEqual('tom@mojombo.com', c.author.email)
+ self.assertEqual(1191997100, c.authored_date)
+ self.assertEqual('Tom Preston-Werner', c.committer.name)
+ self.assertEqual('tom@mojombo.com', c.committer.email)
+ self.assertEqual(1191997100, c.committed_date)
+ self.assertRaisesRegex(ValueError, "634396b2f541a9f2d58b00be1a07f0c358b999b3 missing", lambda: c.message)
+
+ # test the 'lines per commit' entries
+ tlist = b[0][1]
+ self.assertTrue(tlist)
+ self.assertTrue(isinstance(tlist[0], str))
+ self.assertTrue(len(tlist) < sum(len(t) for t in tlist)) # test for single-char bug
+
+ # BINARY BLAME
+ git.return_value = fixture('blame_binary')
+ blames = self.rorepo.blame('master', 'rps')
+ self.assertEqual(len(blames), 2)
+
+ def test_blame_real(self):
+ c = 0
+ nml = 0 # amount of multi-lines per blame
+ for item in self.rorepo.head.commit.tree.traverse(
+ predicate=lambda i, d: i.type == 'blob' and i.path.endswith('.py')):
+ c += 1
+
+ for b in self.rorepo.blame(self.rorepo.head, item.path):
+ nml += int(len(b[1]) > 1)
+ # END for each item to traverse
+ assert c, "Should have executed at least one blame command"
+ assert nml, "There should at least be one blame commit that contains multiple lines"
+
+ @mock.patch.object(Git, '_call_process')
+ def test_blame_incremental(self, git):
+ # loop over two fixtures, create a test fixture for 2.11.1+ syntax
+ for git_fixture in ('blame_incremental', 'blame_incremental_2.11.1_plus'):
+ git.return_value = fixture(git_fixture)
+ blame_output = self.rorepo.blame_incremental('9debf6b0aafb6f7781ea9d1383c86939a1aacde3', 'AUTHORS')
+ blame_output = list(blame_output)
+ self.assertEqual(len(blame_output), 5)
+
+ # Check all outputted line numbers
+ ranges = flatten([entry.linenos for entry in blame_output])
+ self.assertEqual(ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(3, 14), range(15, 17)]))
+
+ commits = [entry.commit.hexsha[:7] for entry in blame_output]
+ self.assertEqual(commits, ['82b8902', '82b8902', 'c76852d', 'c76852d', 'c76852d'])
+
+ # Original filenames
+ self.assertSequenceEqual([entry.orig_path for entry in blame_output], ['AUTHORS'] * len(blame_output))
+
+ # Original line numbers
+ orig_ranges = flatten([entry.orig_linenos for entry in blame_output])
+ self.assertEqual(orig_ranges, flatten([range(2, 3), range(14, 15), range(1, 2), range(2, 13), range(13, 15)])) # noqa E501
+
+ @mock.patch.object(Git, '_call_process')
+ def test_blame_complex_revision(self, git):
+ git.return_value = fixture('blame_complex_revision')
+ res = self.rorepo.blame("HEAD~10..HEAD", "README.md")
+ self.assertEqual(len(res), 1)
+ self.assertEqual(len(res[0][1]), 83, "Unexpected amount of parsed blame lines")
+
+ @skipIf(HIDE_WINDOWS_KNOWN_ERRORS and Git.is_cygwin(),
+ """FIXME: File "C:\\projects\\gitpython\\git\\cmd.py", line 671, in execute
+ raise GitCommandError(command, status, stderr_value, stdout_value)
+ GitCommandError: Cmd('git') failed due to: exit code(128)
+ cmdline: git add 1__��ava verb��ten 1_test _myfile 1_test_other_file
+ 1_��ava-----verb��ten
+ stderr: 'fatal: pathspec '"1__çava verböten"' did not match any files'
+ """)
+ @with_rw_repo('HEAD', bare=False)
+ def test_untracked_files(self, rwrepo):
+ for run, repo_add in enumerate((rwrepo.index.add, rwrepo.git.add)):
+ base = rwrepo.working_tree_dir
+ files = (join_path_native(base, "%i_test _myfile" % run),
+ join_path_native(base, "%i_test_other_file" % run),
+ join_path_native(base, "%i__çava verböten" % run),
+ join_path_native(base, "%i_çava-----verböten" % run))
+
+ num_recently_untracked = 0
+ for fpath in files:
+ with open(fpath, "wb"):
+ pass
+ untracked_files = rwrepo.untracked_files
+ num_recently_untracked = len(untracked_files)
+
+ # assure we have all names - they are relative to the git-dir
+ num_test_untracked = 0
+ for utfile in untracked_files:
+ num_test_untracked += join_path_native(base, utfile) in files
+ self.assertEqual(len(files), num_test_untracked)
+
+ repo_add(untracked_files)
+ self.assertEqual(len(rwrepo.untracked_files), (num_recently_untracked - len(files)))
+ # end for each run
+
+ def test_config_reader(self):
+ reader = self.rorepo.config_reader() # all config files
+ assert reader.read_only
+ reader = self.rorepo.config_reader("repository") # single config file
+ assert reader.read_only
+
+ def test_config_writer(self):
+ for config_level in self.rorepo.config_level:
+ try:
+ with self.rorepo.config_writer(config_level) as writer:
+ self.assertFalse(writer.read_only)
+ except IOError:
+ # its okay not to get a writer for some configuration files if we
+ # have no permissions
+ pass
+
+ def test_config_level_paths(self):
+ for config_level in self.rorepo.config_level:
+ assert self.rorepo._get_config_path(config_level)
+
+ def test_creation_deletion(self):
+ # just a very quick test to assure it generally works. There are
+ # specialized cases in the test_refs module
+ head = self.rorepo.create_head("new_head", "HEAD~1")
+ self.rorepo.delete_head(head)
+
+ try:
+ tag = self.rorepo.create_tag("new_tag", "HEAD~2")
+ finally:
+ self.rorepo.delete_tag(tag)
+ with self.rorepo.config_writer():
+ pass
+ try:
+ remote = self.rorepo.create_remote("new_remote", "git@server:repo.git")
+ finally:
+ self.rorepo.delete_remote(remote)
+
+ def test_comparison_and_hash(self):
+ # this is only a preliminary test, more testing done in test_index
+ self.assertEqual(self.rorepo, self.rorepo)
+ self.assertFalse(self.rorepo != self.rorepo)
+ self.assertEqual(len({self.rorepo, self.rorepo}), 1)
+
+ @with_rw_directory
+ def test_tilde_and_env_vars_in_repo_path(self, rw_dir):
+ ph = os.environ.get('HOME')
+ try:
+ os.environ['HOME'] = rw_dir
+ Repo.init(osp.join('~', 'test.git'), bare=True)
+
+ os.environ['FOO'] = rw_dir
+ Repo.init(osp.join('$FOO', 'test.git'), bare=True)
+ finally:
+ if ph:
+ os.environ['HOME'] = ph
+ del os.environ['FOO']
+ # end assure HOME gets reset to what it was
+
+ def test_git_cmd(self):
+ # test CatFileContentStream, just to be very sure we have no fencepost errors
+ # last \n is the terminating newline that it expects
+ l1 = b"0123456789\n"
+ l2 = b"abcdefghijklmnopqrstxy\n"
+ l3 = b"z\n"
+ d = l1 + l2 + l3 + b"\n"
+
+ l1p = l1[:5]
+
+ # full size
+ # size is without terminating newline
+ def mkfull():
+ return Git.CatFileContentStream(len(d) - 1, BytesIO(d))
+
+ ts = 5
+
+ def mktiny():
+ return Git.CatFileContentStream(ts, BytesIO(d))
+
+ # readlines no limit
+ s = mkfull()
+ lines = s.readlines()
+ self.assertEqual(len(lines), 3)
+ self.assertTrue(lines[-1].endswith(b'\n'), lines[-1])
+ self.assertEqual(s._stream.tell(), len(d)) # must have scrubbed to the end
+
+ # realines line limit
+ s = mkfull()
+ lines = s.readlines(5)
+ self.assertEqual(len(lines), 1)
+
+ # readlines on tiny sections
+ s = mktiny()
+ lines = s.readlines()
+ self.assertEqual(len(lines), 1)
+ self.assertEqual(lines[0], l1p)
+ self.assertEqual(s._stream.tell(), ts + 1)
+
+ # readline no limit
+ s = mkfull()
+ self.assertEqual(s.readline(), l1)
+ self.assertEqual(s.readline(), l2)
+ self.assertEqual(s.readline(), l3)
+ self.assertEqual(s.readline(), b'')
+ self.assertEqual(s._stream.tell(), len(d))
+
+ # readline limit
+ s = mkfull()
+ self.assertEqual(s.readline(5), l1p)
+ self.assertEqual(s.readline(), l1[5:])
+
+ # readline on tiny section
+ s = mktiny()
+ self.assertEqual(s.readline(), l1p)
+ self.assertEqual(s.readline(), b'')
+ self.assertEqual(s._stream.tell(), ts + 1)
+
+ # read no limit
+ s = mkfull()
+ self.assertEqual(s.read(), d[:-1])
+ self.assertEqual(s.read(), b'')
+ self.assertEqual(s._stream.tell(), len(d))
+
+ # read limit
+ s = mkfull()
+ self.assertEqual(s.read(5), l1p)
+ self.assertEqual(s.read(6), l1[5:])
+ self.assertEqual(s._stream.tell(), 5 + 6) # its not yet done
+
+ # read tiny
+ s = mktiny()
+ self.assertEqual(s.read(2), l1[:2])
+ self.assertEqual(s._stream.tell(), 2)
+ self.assertEqual(s.read(), l1[2:ts])
+ self.assertEqual(s._stream.tell(), ts + 1)
+
+ def _assert_rev_parse_types(self, name, rev_obj):
+ rev_parse = self.rorepo.rev_parse
+
+ if rev_obj.type == 'tag':
+ rev_obj = rev_obj.object
+
+ # tree and blob type
+ obj = rev_parse(name + '^{tree}')
+ self.assertEqual(obj, rev_obj.tree)
+
+ obj = rev_parse(name + ':CHANGES')
+ self.assertEqual(obj.type, 'blob')
+ self.assertEqual(obj.path, 'CHANGES')
+ self.assertEqual(rev_obj.tree['CHANGES'], obj)
+
+ def _assert_rev_parse(self, name):
+ """tries multiple different rev-parse syntaxes with the given name
+ :return: parsed object"""
+ rev_parse = self.rorepo.rev_parse
+ orig_obj = rev_parse(name)
+ if orig_obj.type == 'tag':
+ obj = orig_obj.object
+ else:
+ obj = orig_obj
+ # END deref tags by default
+
+ # try history
+ rev = name + "~"
+ obj2 = rev_parse(rev)
+ self.assertEqual(obj2, obj.parents[0])
+ self._assert_rev_parse_types(rev, obj2)
+
+ # history with number
+ ni = 11
+ history = [obj.parents[0]]
+ for pn in range(ni):
+ history.append(history[-1].parents[0])
+ # END get given amount of commits
+
+ for pn in range(11):
+ rev = name + "~%i" % (pn + 1)
+ obj2 = rev_parse(rev)
+ self.assertEqual(obj2, history[pn])
+ self._assert_rev_parse_types(rev, obj2)
+ # END history check
+
+ # parent ( default )
+ rev = name + "^"
+ obj2 = rev_parse(rev)
+ self.assertEqual(obj2, obj.parents[0])
+ self._assert_rev_parse_types(rev, obj2)
+
+ # parent with number
+ for pn, parent in enumerate(obj.parents):
+ rev = name + "^%i" % (pn + 1)
+ self.assertEqual(rev_parse(rev), parent)
+ self._assert_rev_parse_types(rev, parent)
+ # END for each parent
+
+ return orig_obj
+
+ @with_rw_repo('HEAD', bare=False)
+ def test_rw_rev_parse(self, rwrepo):
+ # verify it does not confuse branches with hexsha ids
+ ahead = rwrepo.create_head('aaaaaaaa')
+ assert(rwrepo.rev_parse(str(ahead)) == ahead.commit)
+
+ def test_rev_parse(self):
+ rev_parse = self.rorepo.rev_parse
+
+ # try special case: This one failed at some point, make sure its fixed
+ self.assertEqual(rev_parse("33ebe").hexsha, "33ebe7acec14b25c5f84f35a664803fcab2f7781")
+
+ # start from reference
+ num_resolved = 0
+
+ for ref_no, ref in enumerate(Reference.iter_items(self.rorepo)):
+ path_tokens = ref.path.split("/")
+ for pt in range(len(path_tokens)):
+ path_section = '/'.join(path_tokens[-(pt + 1):])
+ try:
+ obj = self._assert_rev_parse(path_section)
+ self.assertEqual(obj.type, ref.object.type)
+ num_resolved += 1
+ except (BadName, BadObject):
+ print("failed on %s" % path_section)
+ # is fine, in case we have something like 112, which belongs to remotes/rname/merge-requests/112
+ # END exception handling
+ # END for each token
+ if ref_no == 3 - 1:
+ break
+ # END for each reference
+ assert num_resolved
+
+ # it works with tags !
+ tag = self._assert_rev_parse('0.1.4')
+ self.assertEqual(tag.type, 'tag')
+
+ # try full sha directly ( including type conversion )
+ self.assertEqual(tag.object, rev_parse(tag.object.hexsha))
+ self._assert_rev_parse_types(tag.object.hexsha, tag.object)
+
+ # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES
+ rev = '0.1.4^{tree}^{tree}'
+ self.assertEqual(rev_parse(rev), tag.object.tree)
+ self.assertEqual(rev_parse(rev + ':CHANGES'), tag.object.tree['CHANGES'])
+
+ # try to get parents from first revision - it should fail as no such revision
+ # exists
+ first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781"
+ commit = rev_parse(first_rev)
+ self.assertEqual(len(commit.parents), 0)
+ self.assertEqual(commit.hexsha, first_rev)
+ self.assertRaises(BadName, rev_parse, first_rev + "~")
+ self.assertRaises(BadName, rev_parse, first_rev + "^")
+
+ # short SHA1
+ commit2 = rev_parse(first_rev[:20])
+ self.assertEqual(commit2, commit)
+ commit2 = rev_parse(first_rev[:5])
+ self.assertEqual(commit2, commit)
+
+ # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one
+ # needs a tag which points to a blob
+
+ # ref^0 returns commit being pointed to, same with ref~0, and ^{}
+ tag = rev_parse('0.1.4')
+ for token in (('~0', '^0', '^{}')):
+ self.assertEqual(tag.object, rev_parse('0.1.4%s' % token))
+ # END handle multiple tokens
+
+ # try partial parsing
+ max_items = 40
+ for i, binsha in enumerate(self.rorepo.odb.sha_iter()):
+ self.assertEqual(rev_parse(bin_to_hex(binsha)[:8 - (i % 2)].decode('ascii')).binsha, binsha)
+ if i > max_items:
+ # this is rather slow currently, as rev_parse returns an object
+ # which requires accessing packs, it has some additional overhead
+ break
+ # END for each binsha in repo
+
+ # missing closing brace commit^{tree
+ self.assertRaises(ValueError, rev_parse, '0.1.4^{tree')
+
+ # missing starting brace
+ self.assertRaises(ValueError, rev_parse, '0.1.4^tree}')
+
+ # REVLOG
+ #######
+ head = self.rorepo.head
+
+ # need to specify a ref when using the @ syntax
+ self.assertRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha)
+
+ # uses HEAD.ref by default
+ self.assertEqual(rev_parse('@{0}'), head.commit)
+ if not head.is_detached:
+ refspec = '%s@{0}' % head.ref.name
+ self.assertEqual(rev_parse(refspec), head.ref.commit)
+ # all additional specs work as well
+ self.assertEqual(rev_parse(refspec + "^{tree}"), head.commit.tree)
+ self.assertEqual(rev_parse(refspec + ":CHANGES").type, 'blob')
+ # END operate on non-detached head
+
+ # position doesn't exist
+ self.assertRaises(IndexError, rev_parse, '@{10000}')
+
+ # currently, nothing more is supported
+ self.assertRaises(NotImplementedError, rev_parse, "@{1 week ago}")
+
+ # the last position
+ assert rev_parse('@{1}') != head.commit
+
+ def test_repo_odbtype(self):
+ target_type = GitCmdObjectDB
+ self.assertIsInstance(self.rorepo.odb, target_type)
+
+ def test_submodules(self):
+ self.assertEqual(len(self.rorepo.submodules), 1) # non-recursive
+ self.assertGreaterEqual(len(list(self.rorepo.iter_submodules())), 2)
+
+ self.assertIsInstance(self.rorepo.submodule("gitdb"), Submodule)
+ self.assertRaises(ValueError, self.rorepo.submodule, "doesn't exist")
+
+ @with_rw_repo('HEAD', bare=False)
+ def test_submodule_update(self, rwrepo):
+ # fails in bare mode
+ rwrepo._bare = True
+ self.assertRaises(InvalidGitRepositoryError, rwrepo.submodule_update)
+ rwrepo._bare = False
+
+ # test create submodule
+ sm = rwrepo.submodules[0]
+ sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path))
+ self.assertIsInstance(sm, Submodule)
+
+ # note: the rest of this functionality is tested in test_submodule
+
+ @with_rw_repo('HEAD')
+ def test_git_file(self, rwrepo):
+ # Move the .git directory to another location and create the .git file.
+ real_path_abs = osp.abspath(join_path_native(rwrepo.working_tree_dir, '.real'))
+ os.rename(rwrepo.git_dir, real_path_abs)
+ git_file_path = join_path_native(rwrepo.working_tree_dir, '.git')
+ with open(git_file_path, 'wb') as fp:
+ fp.write(fixture('git_file'))
+
+ # Create a repo and make sure it's pointing to the relocated .git directory.
+ git_file_repo = Repo(rwrepo.working_tree_dir)
+ self.assertEqual(osp.abspath(git_file_repo.git_dir), real_path_abs)
+
+ # Test using an absolute gitdir path in the .git file.
+ with open(git_file_path, 'wb') as fp:
+ fp.write(('gitdir: %s\n' % real_path_abs).encode('ascii'))
+ git_file_repo = Repo(rwrepo.working_tree_dir)
+ self.assertEqual(osp.abspath(git_file_repo.git_dir), real_path_abs)
+
+ def test_file_handle_leaks(self):
+ def last_commit(repo, rev, path):
+ commit = next(repo.iter_commits(rev, path, max_count=1))
+ commit.tree[path]
+
+ # This is based on this comment
+ # https://github.com/gitpython-developers/GitPython/issues/60#issuecomment-23558741
+ # And we expect to set max handles to a low value, like 64
+ # You should set ulimit -n X, see .travis.yml
+ # The loops below would easily create 500 handles if these would leak (4 pipes + multiple mapped files)
+ for _ in range(64):
+ for repo_type in (GitCmdObjectDB, GitDB):
+ repo = Repo(self.rorepo.working_tree_dir, odbt=repo_type)
+ last_commit(repo, 'master', 'test/test_base.py')
+ # end for each repository type
+ # end for each iteration
+
+ def test_remote_method(self):
+ self.assertRaises(ValueError, self.rorepo.remote, 'foo-blue')
+ self.assertIsInstance(self.rorepo.remote(name='origin'), Remote)
+
+ @with_rw_directory
+ def test_empty_repo(self, rw_dir):
+ """Assure we can handle empty repositories"""
+ r = Repo.init(rw_dir, mkdir=False)
+ # It's ok not to be able to iterate a commit, as there is none
+ self.assertRaises(ValueError, r.iter_commits)
+ self.assertEqual(r.active_branch.name, 'master')
+ assert not r.active_branch.is_valid(), "Branch is yet to be born"
+
+ # actually, when trying to create a new branch without a commit, git itself fails
+ # We should, however, not fail ungracefully
+ self.assertRaises(BadName, r.create_head, 'foo')
+ self.assertRaises(BadName, r.create_head, 'master')
+ # It's expected to not be able to access a tree
+ self.assertRaises(ValueError, r.tree)
+
+ new_file_path = osp.join(rw_dir, "new_file.ext")
+ touch(new_file_path)
+ r.index.add([new_file_path])
+ r.index.commit("initial commit\nBAD MESSAGE 1\n")
+
+ # Now a branch should be creatable
+ nb = r.create_head('foo')
+ assert nb.is_valid()
+
+ with open(new_file_path, 'w') as f:
+ f.write('Line 1\n')
+
+ r.index.add([new_file_path])
+ r.index.commit("add line 1\nBAD MESSAGE 2\n")
+
+ with open('%s/.git/logs/refs/heads/master' % (rw_dir,), 'r') as f:
+ contents = f.read()
+
+ assert 'BAD MESSAGE' not in contents, 'log is corrupt'
+
+ def test_merge_base(self):
+ repo = self.rorepo
+ c1 = 'f6aa8d1'
+ c2 = repo.commit('d46e3fe')
+ c3 = '763ef75'
+ self.assertRaises(ValueError, repo.merge_base)
+ self.assertRaises(ValueError, repo.merge_base, 'foo')
+
+ # two commit merge-base
+ res = repo.merge_base(c1, c2)
+ self.assertIsInstance(res, list)
+ self.assertEqual(len(res), 1)
+ self.assertIsInstance(res[0], Commit)
+ self.assertTrue(res[0].hexsha.startswith('3936084'))
+
+ for kw in ('a', 'all'):
+ res = repo.merge_base(c1, c2, c3, **{kw: True})
+ self.assertIsInstance(res, list)
+ self.assertEqual(len(res), 1)
+ # end for each keyword signalling all merge-bases to be returned
+
+ # Test for no merge base - can't do as we have
+ self.assertRaises(GitCommandError, repo.merge_base, c1, 'ffffff')
+
+ def test_is_ancestor(self):
+ git = self.rorepo.git
+ if git.version_info[:3] < (1, 8, 0):
+ raise SkipTest("git merge-base --is-ancestor feature unsupported")
+
+ repo = self.rorepo
+ c1 = 'f6aa8d1'
+ c2 = '763ef75'
+ self.assertTrue(repo.is_ancestor(c1, c1))
+ self.assertTrue(repo.is_ancestor("master", "master"))
+ self.assertTrue(repo.is_ancestor(c1, c2))
+ self.assertTrue(repo.is_ancestor(c1, "master"))
+ self.assertFalse(repo.is_ancestor(c2, c1))
+ self.assertFalse(repo.is_ancestor("master", c1))
+ for i, j in itertools.permutations([c1, 'ffffff', ''], r=2):
+ self.assertRaises(GitCommandError, repo.is_ancestor, i, j)
+
+ @with_rw_directory
+ def test_git_work_tree_dotgit(self, rw_dir):
+ """Check that we find .git as a worktree file and find the worktree
+ based on it."""
+ git = Git(rw_dir)
+ if git.version_info[:3] < (2, 5, 1):
+ raise SkipTest("worktree feature unsupported")
+
+ rw_master = self.rorepo.clone(join_path_native(rw_dir, 'master_repo'))
+ branch = rw_master.create_head('aaaaaaaa')
+ worktree_path = join_path_native(rw_dir, 'worktree_repo')
+ if Git.is_cygwin():
+ worktree_path = cygpath(worktree_path)
+ rw_master.git.worktree('add', worktree_path, branch.name)
+
+ # this ensures that we can read the repo's gitdir correctly
+ repo = Repo(worktree_path)
+ self.assertIsInstance(repo, Repo)
+
+ # this ensures we're able to actually read the refs in the tree, which
+ # means we can read commondir correctly.
+ commit = repo.head.commit
+ self.assertIsInstance(commit, Object)
+
+ # this ensures we can read the remotes, which confirms we're reading
+ # the config correctly.
+ origin = repo.remotes.origin
+ self.assertIsInstance(origin, Remote)
+
+ self.assertIsInstance(repo.heads['aaaaaaaa'], Head)
+
+ @with_rw_directory
+ def test_git_work_tree_env(self, rw_dir):
+ """Check that we yield to GIT_WORK_TREE"""
+ # clone a repo
+ # move .git directory to a subdirectory
+ # set GIT_DIR and GIT_WORK_TREE appropriately
+ # check that repo.working_tree_dir == rw_dir
+ self.rorepo.clone(join_path_native(rw_dir, 'master_repo'))
+
+ repo_dir = join_path_native(rw_dir, 'master_repo')
+ old_git_dir = join_path_native(repo_dir, '.git')
+ new_subdir = join_path_native(repo_dir, 'gitdir')
+ new_git_dir = join_path_native(new_subdir, 'git')
+ os.mkdir(new_subdir)
+ os.rename(old_git_dir, new_git_dir)
+
+ oldenv = os.environ.copy()
+ os.environ['GIT_DIR'] = new_git_dir
+ os.environ['GIT_WORK_TREE'] = repo_dir
+
+ try:
+ r = Repo()
+ self.assertEqual(r.working_tree_dir, repo_dir)
+ self.assertEqual(r.working_dir, repo_dir)
+ finally:
+ os.environ = oldenv
+
+ @with_rw_directory
+ def test_rebasing(self, rw_dir):
+ r = Repo.init(rw_dir)
+ fp = osp.join(rw_dir, 'hello.txt')
+ r.git.commit("--allow-empty", message="init",)
+ with open(fp, 'w') as fs:
+ fs.write("hello world")
+ r.git.add(Git.polish_url(fp))
+ r.git.commit(message="English")
+ self.assertEqual(r.currently_rebasing_on(), None)
+ r.git.checkout("HEAD^1")
+ with open(fp, 'w') as fs:
+ fs.write("Hola Mundo")
+ r.git.add(Git.polish_url(fp))
+ r.git.commit(message="Spanish")
+ commitSpanish = r.commit()
+ try:
+ r.git.rebase("master")
+ except GitCommandError:
+ pass
+ self.assertEqual(r.currently_rebasing_on(), commitSpanish)