diff options
Diffstat (limited to 'git/test')
-rw-r--r-- | git/test/lib/__init__.py | 4 | ||||
-rw-r--r-- | git/test/lib/asserts.py | 18 | ||||
-rw-r--r-- | git/test/lib/helper.py | 113 | ||||
-rw-r--r-- | git/test/performance/lib.py | 28 | ||||
-rw-r--r-- | git/test/performance/test_commit.py | 39 | ||||
-rw-r--r-- | git/test/performance/test_odb.py | 18 | ||||
-rw-r--r-- | git/test/performance/test_streams.py | 56 | ||||
-rw-r--r-- | git/test/performance/test_utils.py | 64 | ||||
-rw-r--r-- | git/test/test_actor.py | 8 | ||||
-rw-r--r-- | git/test/test_base.py | 49 | ||||
-rw-r--r-- | git/test/test_blob.py | 10 | ||||
-rw-r--r-- | git/test/test_commit.py | 117 | ||||
-rw-r--r-- | git/test/test_config.py | 41 | ||||
-rw-r--r-- | git/test/test_db.py | 9 | ||||
-rw-r--r-- | git/test/test_diff.py | 45 | ||||
-rw-r--r-- | git/test/test_fun.py | 98 | ||||
-rw-r--r-- | git/test/test_git.py | 25 | ||||
-rw-r--r-- | git/test/test_index.py | 300 | ||||
-rw-r--r-- | git/test/test_reflog.py | 46 | ||||
-rw-r--r-- | git/test/test_refs.py | 227 | ||||
-rw-r--r-- | git/test/test_remote.py | 243 | ||||
-rw-r--r-- | git/test/test_repo.py | 267 | ||||
-rw-r--r-- | git/test/test_stats.py | 9 | ||||
-rw-r--r-- | git/test/test_submodule.py | 234 | ||||
-rw-r--r-- | git/test/test_tree.py | 69 | ||||
-rw-r--r-- | git/test/test_util.py | 64 |
26 files changed, 1098 insertions, 1103 deletions
diff --git a/git/test/lib/__init__.py b/git/test/lib/__init__.py index 77512794..e13e227d 100644 --- a/git/test/lib/__init__.py +++ b/git/test/lib/__init__.py @@ -9,5 +9,5 @@ from mock import * from asserts import * from helper import * -__all__ = [ name for name, obj in locals().items() - if not (name.startswith('_') or inspect.ismodule(obj)) ] +__all__ = [name for name, obj in locals().items() + if not (name.startswith('_') or inspect.ismodule(obj))] diff --git a/git/test/lib/asserts.py b/git/test/lib/asserts.py index fa754b92..351901dc 100644 --- a/git/test/lib/asserts.py +++ b/git/test/lib/asserts.py @@ -10,41 +10,49 @@ from nose import tools from nose.tools import * import stat -__all__ = ['assert_instance_of', 'assert_not_instance_of', +__all__ = ['assert_instance_of', 'assert_not_instance_of', 'assert_none', 'assert_not_none', 'assert_match', 'assert_not_match', 'assert_mode_644', 'assert_mode_755'] + tools.__all__ + def assert_instance_of(expected, actual, msg=None): """Verify that object is an instance of expected """ assert isinstance(actual, expected), msg + def assert_not_instance_of(expected, actual, msg=None): """Verify that object is not an instance of expected """ assert not isinstance(actual, expected, msg) - + + def assert_none(actual, msg=None): """verify that item is None""" assert actual is None, msg + def assert_not_none(actual, msg=None): """verify that item is None""" assert actual is not None, msg + def assert_match(pattern, string, msg=None): """verify that the pattern matches the string""" assert_not_none(re.search(pattern, string), msg) + def assert_not_match(pattern, string, msg=None): """verify that the pattern does not match the string""" assert_none(re.search(pattern, string), msg) - + + def assert_mode_644(mode): """Verify given mode is 644""" - assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) + assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and not (mode & stat.S_IXUSR) + def assert_mode_755(mode): """Verify given mode is 755""" assert (mode & stat.S_IROTH) and (mode & stat.S_IRGRP) and (mode & stat.S_IXOTH) and (mode & stat.S_IXGRP) - assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR)
\ No newline at end of file + assert (mode & stat.S_IWUSR) and (mode & stat.S_IRUSR) and (mode & stat.S_IXUSR) diff --git a/git/test/lib/helper.py b/git/test/lib/helper.py index a76f1a15..913cf3b6 100644 --- a/git/test/lib/helper.py +++ b/git/test/lib/helper.py @@ -21,36 +21,42 @@ __all__ = ( #{ Routines + def fixture_path(name): test_dir = os.path.dirname(os.path.dirname(__file__)) return os.path.join(test_dir, "fixtures", name) + def fixture(name): return open(fixture_path(name), 'rb').read() + def absolute_project_path(): return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) #} END routines - -#{ Adapters - + +#{ Adapters + + class StringProcessAdapter(object): + """Allows to use strings as Process object as returned by SubProcess.Popen. Its tailored to work with the test system only""" - + def __init__(self, input_string): self.stdout = cStringIO.StringIO(input_string) self.stderr = cStringIO.StringIO() - + def wait(self): return 0 - + poll = wait - + #} END adapters -#{ Decorators +#{ Decorators + def _mktemp(*args): """Wrapper around default tempfile.mktemp to fix an osx issue""" @@ -59,28 +65,31 @@ def _mktemp(*args): tdir = '/private' + tdir return tdir + def _rmtree_onerror(osremove, fullpath, exec_info): """ - Handle the case on windows that read-only files cannot be deleted by + Handle the case on windows that read-only files cannot be deleted by os.remove by setting it to mode 777, then retry deletion. """ if os.name != 'nt' or osremove is not os.remove: raise - + os.chmod(fullpath, 0777) os.remove(fullpath) + def with_rw_repo(working_tree_ref, bare=False): """ - Same as with_bare_repo, but clones the rorepo as non-bare repository, checking + Same as with_bare_repo, but clones the rorepo as non-bare repository, checking out the working tree at the given working_tree_ref. - + This repository type is more costly due to the working copy checkout. - - To make working with relative paths easier, the cwd will be set to the working + + To make working with relative paths easier, the cwd will be set to the working dir of the repository. """ assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" + def argument_passer(func): def repo_creator(self): prefix = 'non_' @@ -89,12 +98,12 @@ def with_rw_repo(working_tree_ref, bare=False): #END handle prefix repo_dir = _mktemp("%sbare_%s" % (prefix, func.__name__)) rw_repo = self.rorepo.clone(repo_dir, shared=True, bare=bare, n=True) - + rw_repo.head.commit = rw_repo.commit(working_tree_ref) if not bare: rw_repo.head.reference.checkout() # END handle checkout - + prev_cwd = os.getcwd() os.chdir(rw_repo.working_dir) try: @@ -116,44 +125,46 @@ def with_rw_repo(working_tree_ref, bare=False): return repo_creator # END argument passer return argument_passer - + + def with_rw_and_rw_remote_repo(working_tree_ref): """ Same as with_rw_repo, but also provides a writable remote repository from which the - rw_repo has been forked as well as a handle for a git-daemon that may be started to + rw_repo has been forked as well as a handle for a git-daemon that may be started to run the remote_repo. - The remote repository was cloned as bare repository from the rorepo, wheras + The remote repository was cloned as bare repository from the rorepo, wheras the rw repo has a working tree and was cloned from the remote repository. - - remote_repo has two remotes: origin and daemon_origin. One uses a local url, - the other uses a server url. The daemon setup must be done on system level - and should be an inetd service that serves tempdir.gettempdir() and all + + remote_repo has two remotes: origin and daemon_origin. One uses a local url, + the other uses a server url. The daemon setup must be done on system level + and should be an inetd service that serves tempdir.gettempdir() and all directories in it. - + The following scetch demonstrates this:: rorepo ---<bare clone>---> rw_remote_repo ---<clone>---> rw_repo - + The test case needs to support the following signature:: def case(self, rw_repo, rw_remote_repo) - + This setup allows you to test push and pull scenarios and hooks nicely. - + See working dir info in with_rw_repo """ assert isinstance(working_tree_ref, basestring), "Decorator requires ref name for working tree checkout" + def argument_passer(func): def remote_repo_creator(self): remote_repo_dir = _mktemp("remote_repo_%s" % func.__name__) repo_dir = _mktemp("remote_clone_non_bare_repo") - + rw_remote_repo = self.rorepo.clone(remote_repo_dir, shared=True, bare=True) rw_repo = rw_remote_repo.clone(repo_dir, shared=True, bare=False, n=True) # recursive alternates info ? rw_repo.head.commit = working_tree_ref rw_repo.head.reference.checkout() - + # prepare for git-daemon rw_remote_repo.daemon_export = True - + # this thing is just annoying ! crw = rw_remote_repo.config_writer() section = "daemon" @@ -164,28 +175,28 @@ def with_rw_and_rw_remote_repo(working_tree_ref): crw.set(section, "receivepack", True) # release lock del(crw) - - # initialize the remote - first do it as local remote and pull, then + + # initialize the remote - first do it as local remote and pull, then # we change the url to point to the daemon. The daemon should be started # by the user, not by us d_remote = Remote.create(rw_repo, "daemon_origin", remote_repo_dir) d_remote.fetch() remote_repo_url = "git://localhost%s" % remote_repo_dir - + d_remote.config_writer.set('url', remote_repo_url) - + # try to list remotes to diagnoes whether the server is up try: rw_repo.git.ls_remote(d_remote) - except GitCommandError,e: + except GitCommandError, e: print str(e) if os.name == 'nt': - raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp())) + raise AssertionError('git-daemon needs to run this test, but windows does not have one. Otherwise, run: git-daemon "%s"' % os.path.dirname(_mktemp())) else: raise AssertionError('Please start a git-daemon to run this test, execute: git-daemon "%s"' % os.path.dirname(_mktemp())) # END make assertion #END catch ls remote error - + # adjust working dir prev_cwd = os.getcwd() os.chdir(rw_repo.working_dir) @@ -203,37 +214,39 @@ def with_rw_and_rw_remote_repo(working_tree_ref): return remote_repo_creator # END remote repo creator # END argument parsser - + return argument_passer - + #} END decorators - + + class TestBase(TestCase): + """ Base Class providing default functionality to all tests such as: - + - Utility functions provided by the TestCase base of the unittest method such as:: self.fail("todo") self.failUnlessRaises(...) - - - Class level repository which is considered read-only as it is shared among + + - Class level repository which is considered read-only as it is shared among all test cases in your type. - Access it using:: + Access it using:: self.rorepo # 'ro' stands for read-only - - The rorepo is in fact your current project's git repo. If you refer to specific - shas for your objects, be sure you choose some that are part of the immutable portion + + The rorepo is in fact your current project's git repo. If you refer to specific + shas for your objects, be sure you choose some that are part of the immutable portion of the project history ( to assure tests don't fail for others ). """ - + @classmethod def setUpClass(cls): """ - Dynamically add a read-only repository to our actual type. This way + Dynamically add a read-only repository to our actual type. This way each test type has its own repository """ cls.rorepo = Repo(GIT_REPO) - + def _make_file(self, rela_path, data, repo=None): """ Create a file at the given path relative to our repository, filled diff --git a/git/test/performance/lib.py b/git/test/performance/lib.py index d8313dac..00d41b76 100644 --- a/git/test/performance/lib.py +++ b/git/test/performance/lib.py @@ -30,26 +30,27 @@ def resolve_or_fail(env_var): #} END utilities -#{ Base Classes +#{ Base Classes class TestBigRepoR(TestBase): - """TestCase providing access to readonly 'big' repositories using the following + + """TestCase providing access to readonly 'big' repositories using the following member variables: - + * gitrorepo - + * Read-Only git repository - actually the repo of git itself - + * puregitrorepo - + * As gitrepo, but uses pure python implementation """ - + #{ Invariants head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca' head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5' - #} END invariants - + #} END invariants + @classmethod def setUp(cls): super(TestBigRepoR, cls).setUp() @@ -59,10 +60,11 @@ class TestBigRepoR(TestBase): class TestBigRepoRW(TestBigRepoR): + """As above, but provides a big repository that we can write to. - + Provides ``self.gitrwrepo`` and ``self.puregitrwrepo``""" - + @classmethod def setUp(cls): super(TestBigRepoRW, cls).setUp() @@ -70,9 +72,9 @@ class TestBigRepoRW(TestBigRepoR): os.mkdir(dirname) cls.gitrwrepo = cls.gitrorepo.clone(dirname, shared=True, bare=True, odbt=GitCmdObjectDB) cls.puregitrwrepo = Repo(dirname, odbt=GitDB) - + @classmethod def tearDownAll(cls): shutil.rmtree(cls.gitrwrepo.working_dir) - + #} END base classes diff --git a/git/test/performance/test_commit.py b/git/test/performance/test_commit.py index 1bdfcfa2..009b3d82 100644 --- a/git/test/performance/test_commit.py +++ b/git/test/performance/test_commit.py @@ -12,6 +12,7 @@ from cStringIO import StringIO from time import time import sys + class TestPerformance(TestBigRepoRW): # ref with about 100 commits in its history @@ -26,15 +27,15 @@ class TestPerformance(TestBigRepoRW): c.committer_tz_offset c.message c.parents - + def test_iteration(self): no = 0 nc = 0 - - # find the first commit containing the given path - always do a full - # iteration ( restricted to the path in question ), but in fact it should + + # find the first commit containing the given path - always do a full + # iteration ( restricted to the path in question ), but in fact it should # return quite a lot of commits, we just take one and hence abort the operation - + st = time() for c in self.rorepo.iter_commits(self.ref_100): nc += 1 @@ -45,8 +46,8 @@ class TestPerformance(TestBigRepoRW): # END for each object # END for each commit elapsed_time = time() - st - print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (nc, no, elapsed_time, no/elapsed_time) - + print >> sys.stderr, "Traversed %i Trees and a total of %i unchached objects in %s [s] ( %f objs/s )" % (nc, no, elapsed_time, no / elapsed_time) + def test_commit_traversal(self): # bound to cat-file parsing performance nc = 0 @@ -56,8 +57,8 @@ class TestPerformance(TestBigRepoRW): self._query_commit_info(c) # END for each traversed commit elapsed_time = time() - st - print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time) - + print >> sys.stderr, "Traversed %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc / elapsed_time) + def test_commit_iteration(self): # bound to stream parsing performance nc = 0 @@ -67,33 +68,33 @@ class TestPerformance(TestBigRepoRW): self._query_commit_info(c) # END for each traversed commit elapsed_time = time() - st - print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc/elapsed_time) - + print >> sys.stderr, "Iterated %i Commits in %s [s] ( %f commits/s )" % (nc, elapsed_time, nc / elapsed_time) + def test_commit_serialization(self): assert_commit_serialization(self.gitrwrepo, self.head_sha_2k, True) - + rwrepo = self.gitrwrepo make_object = rwrepo.odb.store # direct serialization - deserialization can be tested afterwards # serialization is probably limited on IO hc = rwrepo.commit(self.head_sha_2k) - + commits = list() nc = 5000 st = time() for i in xrange(nc): - cm = Commit( rwrepo, Commit.NULL_BIN_SHA, hc.tree, - hc.author, hc.authored_date, hc.author_tz_offset, - hc.committer, hc.committed_date, hc.committer_tz_offset, + cm = Commit(rwrepo, Commit.NULL_BIN_SHA, hc.tree, + hc.author, hc.authored_date, hc.author_tz_offset, + hc.committer, hc.committed_date, hc.committer_tz_offset, str(i), parents=hc.parents, encoding=hc.encoding) - + stream = StringIO() cm._serialize(stream) slen = stream.tell() stream.seek(0) - + cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha # END commit creation elapsed = time() - st - + print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed) diff --git a/git/test/performance/test_odb.py b/git/test/performance/test_odb.py index ccc13eb4..5ddbbd53 100644 --- a/git/test/performance/test_odb.py +++ b/git/test/performance/test_odb.py @@ -10,9 +10,9 @@ from lib import ( class TestObjDBPerformance(TestBigRepoR): - + def test_random_access(self): - results = [ ["Iterate Commits"], ["Iterate Blobs"], ["Retrieve Blob Data"] ] + results = [["Iterate Commits"], ["Iterate Blobs"], ["Retrieve Blob Data"]] for repo in (self.gitrorepo, self.puregitrorepo): # GET COMMITS st = time() @@ -20,10 +20,10 @@ class TestObjDBPerformance(TestBigRepoR): commits = list(root_commit.traverse()) nc = len(commits) elapsed = time() - st - + print >> sys.stderr, "%s: Retrieved %i commits from ObjectStore in %g s ( %f commits / s )" % (type(repo.odb), nc, elapsed, nc / elapsed) results[0].append(elapsed) - + # GET TREES # walk all trees of all commits st = time() @@ -41,10 +41,10 @@ class TestObjDBPerformance(TestBigRepoR): blobs_per_commit.append(blobs) # END for each commit elapsed = time() - st - + print >> sys.stderr, "%s: Retrieved %i objects from %i commits in %g s ( %f objects / s )" % (type(repo.odb), nt, len(commits), elapsed, nt / elapsed) results[1].append(elapsed) - + # GET BLOBS st = time() nb = 0 @@ -59,11 +59,11 @@ class TestObjDBPerformance(TestBigRepoR): break # END for each bloblist elapsed = time() - st - - print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (type(repo.odb), nb, data_bytes/1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed) + + print >> sys.stderr, "%s: Retrieved %i blob (%i KiB) and their data in %g s ( %f blobs / s, %f KiB / s )" % (type(repo.odb), nb, data_bytes / 1000, elapsed, nb / elapsed, (data_bytes / 1000) / elapsed) results[2].append(elapsed) # END for each repo type - + # final results for test_name, a, b in results: print >> sys.stderr, "%s: %f s vs %f s, pure is %f times slower" % (test_name, a, b, b / a) diff --git a/git/test/performance/test_streams.py b/git/test/performance/test_streams.py index 93e88841..e42867a3 100644 --- a/git/test/performance/test_streams.py +++ b/git/test/performance/test_streams.py @@ -18,16 +18,16 @@ from lib import ( class TestObjDBPerformance(TestBigRepoR): - - large_data_size_bytes = 1000*1000*10 # some MiB should do it - moderate_data_size_bytes = 1000*1000*1 # just 1 MiB - + + large_data_size_bytes = 1000 * 1000 * 10 # some MiB should do it + moderate_data_size_bytes = 1000 * 1000 * 1 # just 1 MiB + @with_rw_repo('HEAD', bare=True) def test_large_data_streaming(self, rwrepo): # TODO: This part overlaps with the same file in gitdb.test.performance.test_stream # It should be shared if possible ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects')) - + for randomize in range(2): desc = (randomize and 'random ') or '' print >> sys.stderr, "Creating %s data ..." % desc @@ -35,32 +35,30 @@ class TestObjDBPerformance(TestBigRepoR): size, stream = make_memory_file(self.large_data_size_bytes, randomize) elapsed = time() - st print >> sys.stderr, "Done (in %f s)" % elapsed - - # writing - due to the compression it will seem faster than it is + + # writing - due to the compression it will seem faster than it is st = time() binsha = ldb.store(IStream('blob', size, stream)).binsha elapsed_add = time() - st assert ldb.has_object(binsha) db_file = ldb.readable_db_object_path(bin_to_hex(binsha)) fsize_kib = os.path.getsize(db_file) / 1000 - - + size_kib = size / 1000 print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add) - + # reading all at once st = time() ostream = ldb.stream(binsha) shadata = ostream.read() elapsed_readall = time() - st - + stream.seek(0) assert shadata == stream.getvalue() print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall) - - + # reading in chunks of 1 MiB - cs = 512*1000 + cs = 512 * 1000 chunks = list() st = time() ostream = ldb.stream(binsha) @@ -71,21 +69,21 @@ class TestObjDBPerformance(TestBigRepoR): break # END read in chunks elapsed_readchunks = time() - st - + stream.seek(0) assert ''.join(chunks) == stream.getvalue() - + cs_kib = cs / 1000 print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks) - + # del db file so git has something to do os.remove(db_file) - - # VS. CGIT + + # VS. CGIT ########## # CGIT ! Can using the cgit programs be faster ? proc = rwrepo.git.hash_object('-w', '--stdin', as_process=True, istream=subprocess.PIPE) - + # write file - pump everything in at once to be a fast as possible data = stream.getvalue() # cache it st = time() @@ -96,25 +94,23 @@ class TestObjDBPerformance(TestBigRepoR): gelapsed_add = time() - st del(data) assert gitsha == bin_to_hex(binsha) # we do it the same way, right ? - + # as its the same sha, we reuse our path fsize_kib = os.path.getsize(db_file) / 1000 print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to using git-hash-object in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, gelapsed_add, size_kib / gelapsed_add) - - # compare ... + + # compare ... print >> sys.stderr, "Git-Python is %f %% faster than git when adding big %s files" % (100.0 - (elapsed_add / gelapsed_add) * 100, desc) - - + # read all st = time() s, t, size, data = rwrepo.git.get_object_data(gitsha) gelapsed_readall = time() - st print >> sys.stderr, "Read %i KiB of %s data at once using git-cat-file in %f s ( %f Read KiB / s)" % (size_kib, desc, gelapsed_readall, size_kib / gelapsed_readall) - # compare + # compare print >> sys.stderr, "Git-Python is %f %% faster than git when reading big %sfiles" % (100.0 - (elapsed_readall / gelapsed_readall) * 100, desc) - - + # read chunks st = time() s, t, size, stream = rwrepo.git.stream_object_data(gitsha) @@ -125,7 +121,7 @@ class TestObjDBPerformance(TestBigRepoR): # END read stream gelapsed_readchunks = time() - st print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from git-cat-file in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, gelapsed_readchunks, size_kib / gelapsed_readchunks) - - # compare + + # compare print >> sys.stderr, "Git-Python is %f %% faster than git when reading big %s files in chunks" % (100.0 - (elapsed_readchunks / gelapsed_readchunks) * 100, desc) # END for each randomization factor diff --git a/git/test/performance/test_utils.py b/git/test/performance/test_utils.py index 8637af48..c8d397fb 100644 --- a/git/test/performance/test_utils.py +++ b/git/test/performance/test_utils.py @@ -9,29 +9,33 @@ from lib import ( class TestUtilPerformance(TestBigRepoR): - + def test_access(self): # compare dict vs. slot access class Slotty(object): __slots__ = "attr" + def __init__(self): self.attr = 1 - + class Dicty(object): + def __init__(self): self.attr = 1 - + class BigSlotty(object): __slots__ = ('attr', ) + tuple('abcdefghijk') + def __init__(self): for attr in self.__slots__: setattr(self, attr, 1) - + class BigDicty(object): + def __init__(self): for attr in BigSlotty.__slots__: setattr(self, attr, 1) - + ni = 1000000 for cls in (Slotty, Dicty, BigSlotty, BigDicty): cli = cls() @@ -42,7 +46,7 @@ class TestUtilPerformance(TestBigRepoR): elapsed = time() - st print >> sys.stderr, "Accessed %s.attr %i times in %s s ( %f acc / s)" % (cls.__name__, ni, elapsed, ni / elapsed) # END for each class type - + # check num of sequence-acceses for cls in (list, tuple): x = 10 @@ -56,12 +60,12 @@ class TestUtilPerformance(TestBigRepoR): elapsed = time() - st na = ni * 3 print >> sys.stderr, "Accessed %s[x] %i times in %s s ( %f acc / s)" % (cls.__name__, na, elapsed, na / elapsed) - # END for each sequence - + # END for each sequence + def test_instantiation(self): ni = 100000 max_num_items = 4 - for mni in range(max_num_items+1): + for mni in range(max_num_items + 1): for cls in (tuple, list): st = time() for i in xrange(ni): @@ -70,11 +74,11 @@ class TestUtilPerformance(TestBigRepoR): elif mni == 1: cls((1,)) elif mni == 2: - cls((1,2)) + cls((1, 2)) elif mni == 3: - cls((1,2,3)) + cls((1, 2, 3)) elif mni == 4: - cls((1,2,3,4)) + cls((1, 2, 3, 4)) else: cls(x for x in xrange(mni)) # END handle empty cls @@ -83,27 +87,27 @@ class TestUtilPerformance(TestBigRepoR): print >> sys.stderr, "Created %i %ss of size %i in %f s ( %f inst / s)" % (ni, cls.__name__, mni, elapsed, ni / elapsed) # END for each type # END for each item count - + # tuple and tuple direct st = time() for i in xrange(ni): - t = (1,2,3,4) + t = (1, 2, 3, 4) # END for each item elapsed = time() - st print >> sys.stderr, "Created %i tuples (1,2,3,4) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed) - + st = time() for i in xrange(ni): - t = tuple((1,2,3,4)) + t = tuple((1, 2, 3, 4)) # END for each item elapsed = time() - st print >> sys.stderr, "Created %i tuples tuple((1,2,3,4)) in %f s ( %f tuples / s)" % (ni, elapsed, ni / elapsed) - + def test_unpacking_vs_indexing(self): ni = 1000000 - list_items = [1,2,3,4] - tuple_items = (1,2,3,4) - + list_items = [1, 2, 3, 4] + tuple_items = (1, 2, 3, 4) + for sequence in (list_items, tuple_items): st = time() for i in xrange(ni): @@ -111,14 +115,14 @@ class TestUtilPerformance(TestBigRepoR): # END for eac iteration elapsed = time() - st print >> sys.stderr, "Unpacked %i %ss of size %i in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed) - + st = time() for i in xrange(ni): one, two, three, four = sequence[0], sequence[1], sequence[2], sequence[3] # END for eac iteration elapsed = time() - st print >> sys.stderr, "Unpacked %i %ss of size %i individually in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed) - + st = time() for i in xrange(ni): one, two = sequence[0], sequence[1] @@ -126,15 +130,15 @@ class TestUtilPerformance(TestBigRepoR): elapsed = time() - st print >> sys.stderr, "Unpacked %i %ss of size %i individually (2 of 4) in %f s ( %f acc / s)" % (ni, type(sequence).__name__, len(sequence), elapsed, ni / elapsed) # END for each sequence - + def test_large_list_vs_iteration(self): # what costs more: alloc/realloc of lists, or the cpu strain of iterators ? def slow_iter(ni): for i in xrange(ni): yield i # END slow iter - be closer to the real world - - # alloc doesn't play a role here it seems + + # alloc doesn't play a role here it seems for ni in (500, 1000, 10000, 20000, 40000): st = time() for i in list(xrange(ni)): @@ -142,7 +146,7 @@ class TestUtilPerformance(TestBigRepoR): # END for each item elapsed = time() - st print >> sys.stderr, "Iterated %i items from list in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed) - + st = time() for i in slow_iter(ni): i @@ -150,14 +154,14 @@ class TestUtilPerformance(TestBigRepoR): elapsed = time() - st print >> sys.stderr, "Iterated %i items from iterator in %f s ( %f acc / s)" % (ni, elapsed, ni / elapsed) # END for each number of iterations - + def test_type_vs_inst_class(self): class NewType(object): pass - + # lets see which way is faster inst = NewType() - + ni = 1000000 st = time() for i in xrange(ni): @@ -165,7 +169,7 @@ class TestUtilPerformance(TestBigRepoR): # END for each item elapsed = time() - st print >> sys.stderr, "Created %i items using inst.__class__ in %f s ( %f items / s)" % (ni, elapsed, ni / elapsed) - + st = time() for i in xrange(ni): type(inst)() diff --git a/git/test/test_actor.py b/git/test/test_actor.py index b8e5ba3b..5ccf1d2e 100644 --- a/git/test/test_actor.py +++ b/git/test/test_actor.py @@ -8,15 +8,17 @@ import os from git.test.lib import * from git import * + class TestActor(object): + def test_from_string_should_separate_name_and_email(self): a = Actor._from_string("Michael Trier <mtrier@example.com>") assert_equal("Michael Trier", a.name) assert_equal("mtrier@example.com", a.email) - + # base type capabilities assert a == a - assert not ( a != a ) + assert not (a != a) m = set() m.add(a) m.add(a) @@ -33,4 +35,4 @@ class TestActor(object): def test_str_should_alias_name(self): a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert_equal(a.name, str(a))
\ No newline at end of file + assert_equal(a.name, str(a)) diff --git a/git/test/test_base.py b/git/test/test_base.py index 5edc9c52..81e785ab 100644 --- a/git/test/test_base.py +++ b/git/test/test_base.py @@ -15,18 +15,19 @@ from git.objects.util import get_object_type_by_name from gitdb.util import hex_to_bin import tempfile + class TestBase(TestBase): - - type_tuples = ( ("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"), + + type_tuples = (("blob", "8741fc1d09d61f02ffd8cded15ff603eff1ec070", "blob.py"), ("tree", "3a6a5e3eeed3723c09f1ef0399f81ed6b8d82e79", "directory"), ("commit", "4251bd59fb8e11e40c40548cba38180a9536118c", None), - ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None) ) - - def test_base_object(self): + ("tag", "e56a60e8e9cd333cfba0140a77cd12b0d9398f10", None)) + + def test_base_object(self): # test interface of base object classes types = (Blob, Tree, Commit, TagObject) assert len(types) == len(self.type_tuples) - + s = set() num_objs = 0 num_index_objs = 0 @@ -34,9 +35,9 @@ class TestBase(TestBase): binsha = hex_to_bin(hexsha) item = None if path is None: - item = obj_type(self.rorepo,binsha) + item = obj_type(self.rorepo, binsha) else: - item = obj_type(self.rorepo,binsha, 0, path) + item = obj_type(self.rorepo, binsha, 0, path) # END handle index objects num_objs += 1 assert item.hexsha == hexsha @@ -47,54 +48,54 @@ class TestBase(TestBase): assert str(item) == item.hexsha assert repr(item) s.add(item) - + if isinstance(item, base.IndexObject): num_index_objs += 1 - if hasattr(item,'path'): # never runs here + if hasattr(item, 'path'): # never runs here assert not item.path.startswith("/") # must be relative assert isinstance(item.mode, int) # END index object check - + # read from stream data_stream = item.data_stream data = data_stream.read() assert data - + tmpfile = os.tmpfile() assert item == item.stream_data(tmpfile) tmpfile.seek(0) assert tmpfile.read() == data # END stream to file directly # END for each object type to create - + # each has a unique sha assert len(s) == num_objs - assert len(s|s) == num_objs + assert len(s | s) == num_objs assert num_index_objs == 2 - + def test_get_object_type_by_name(self): for tname in base.Object.TYPES: assert base.Object in get_object_type_by_name(tname).mro() - # END for each known type - - assert_raises( ValueError, get_object_type_by_name, "doesntexist" ) + # END for each known type + + assert_raises(ValueError, get_object_type_by_name, "doesntexist") def test_object_resolution(self): # objects must be resolved to shas so they compare equal assert self.rorepo.head.reference.object == self.rorepo.active_branch.object - + @with_rw_repo('HEAD', bare=True) def test_with_bare_rw_repo(self, bare_rw_repo): assert bare_rw_repo.config_reader("repository").getboolean("core", "bare") - assert os.path.isfile(os.path.join(bare_rw_repo.git_dir,'HEAD')) - + assert os.path.isfile(os.path.join(bare_rw_repo.git_dir, 'HEAD')) + @with_rw_repo('0.1.6') def test_with_rw_repo(self, rw_repo): assert not rw_repo.config_reader("repository").getboolean("core", "bare") - assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib')) - + assert os.path.isdir(os.path.join(rw_repo.working_tree_dir, 'lib')) + @with_rw_and_rw_remote_repo('0.1.6') def test_with_rw_remote_and_rw_repo(self, rw_repo, rw_remote_repo): assert not rw_repo.config_reader("repository").getboolean("core", "bare") assert rw_remote_repo.config_reader("repository").getboolean("core", "bare") - assert os.path.isdir(os.path.join(rw_repo.working_tree_dir,'lib')) + assert os.path.isdir(os.path.join(rw_repo.working_tree_dir, 'lib')) diff --git a/git/test/test_blob.py b/git/test/test_blob.py index 6fc0287f..ddd2893f 100644 --- a/git/test/test_blob.py +++ b/git/test/test_blob.py @@ -8,16 +8,16 @@ from git.test.lib import * from git import * from gitdb.util import hex_to_bin + class TestBlob(TestBase): - + def test_mime_type_should_return_mime_type_for_known_types(self): blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'}) assert_equal("image/png", blob.mime_type) - + def test_mime_type_should_return_text_plain_for_unknown_types(self): - blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'}) + blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'something'}) assert_equal("text/plain", blob.mime_type) - + def test_nodict(self): self.failUnlessRaises(AttributeError, setattr, self.rorepo.tree()['AUTHORS'], 'someattr', 2) - diff --git a/git/test/test_commit.py b/git/test/test_commit.py index f536470f..6cd892f0 100644 --- a/git/test/test_commit.py +++ b/git/test/test_commit.py @@ -17,54 +17,54 @@ import re def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False): - """traverse all commits in the history of commit identified by commit_id and check + """traverse all commits in the history of commit identified by commit_id and check if the serialization works. :param print_performance_info: if True, we will show how fast we are""" ns = 0 # num serializations nds = 0 # num deserializations - + st = time.time() for cm in rwrepo.commit(commit_id).traverse(): nds += 1 - - # assert that we deserialize commits correctly, hence we get the same + + # assert that we deserialize commits correctly, hence we get the same # sha on serialization stream = StringIO() cm._serialize(stream) ns += 1 streamlen = stream.tell() stream.seek(0) - + istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream)) assert istream.hexsha == cm.hexsha - + nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree, - cm.author, cm.authored_date, cm.author_tz_offset, - cm.committer, cm.committed_date, cm.committer_tz_offset, + cm.author, cm.authored_date, cm.author_tz_offset, + cm.committer, cm.committed_date, cm.committer_tz_offset, cm.message, cm.parents, cm.encoding) - + assert nc.parents == cm.parents stream = StringIO() nc._serialize(stream) ns += 1 streamlen = stream.tell() stream.seek(0) - + # reuse istream istream.size = streamlen istream.stream = stream istream.binsha = None nc.binsha = rwrepo.odb.store(istream).binsha - + # if it worked, we have exactly the same contents ! assert nc.hexsha == cm.hexsha # END check commits elapsed = time.time() - st - + if print_performance_info: - print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed) + print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns / elapsed, nds / elapsed) # END handle performance info - + class TestCommit(TestBase): @@ -73,7 +73,7 @@ class TestCommit(TestBase): commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae') # commits have no dict self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1) - commit.author # bake + commit.author # bake assert_equal("Sebastian Thiel", commit.author.name) assert_equal("byronimo@gmail.com", commit.author.email) @@ -82,26 +82,25 @@ class TestCommit(TestBase): assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int) assert commit.message == "Added missing information to docstrings of commit and stats module\n" - def test_stats(self): commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781') stats = commit.stats - + def check_entries(d): assert isinstance(d, dict) for key in ("insertions", "deletions", "lines"): assert key in d - # END assertion helper - assert stats.files + # END assertion helper + assert stats.files assert stats.total - - check_entries(stats.total) + + check_entries(stats.total) assert "files" in stats.total - + for filepath, d in stats.files.items(): check_entries(d) # END for each stated file - + # assure data is parsed properly michael = Actor._from_string("Michael Trier <mtrier@gmail.com>") assert commit.author == michael @@ -111,7 +110,7 @@ class TestCommit(TestBase): assert commit.author_tz_offset == 14400, commit.author_tz_offset assert commit.committer_tz_offset == 14400, commit.committer_tz_offset assert commit.message == "initial project\n" - + def test_unicode_actor(self): # assure we can parse unicode actors correctly name = "Üäöß ÄußÉ".decode("utf-8") @@ -119,7 +118,7 @@ class TestCommit(TestBase): special = Actor._from_string(u"%s <something@this.com>" % name) assert special.name == name assert isinstance(special.name, unicode) - + def test_traversal(self): start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff") first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781") @@ -127,65 +126,65 @@ class TestCommit(TestBase): p1 = start.parents[1] p00 = p0.parents[0] p10 = p1.parents[0] - + # basic branch first, depth first dfirst = start.traverse(branch_first=False) bfirst = start.traverse(branch_first=True) assert dfirst.next() == p0 assert dfirst.next() == p00 - + assert bfirst.next() == p0 assert bfirst.next() == p1 assert bfirst.next() == p00 assert bfirst.next() == p10 - + # at some point, both iterations should stop assert list(bfirst)[-1] == first stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True) l = list(stoptraverse) assert len(l[0]) == 2 - + # ignore self assert start.traverse(ignore_self=False).next() == start - - # depth + + # depth assert len(list(start.traverse(ignore_self=False, depth=0))) == 1 - + # prune - assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1 - + assert start.traverse(branch_first=1, prune=lambda i, d: i == p0).next() == p1 + # predicate - assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1 - + assert start.traverse(branch_first=1, predicate=lambda i, d: i == p1).next() == p1 + # traversal should stop when the beginning is reached self.failUnlessRaises(StopIteration, first.traverse().next) - - # parents of the first commit should be empty ( as the only parent has a null + + # parents of the first commit should be empty ( as the only parent has a null # sha ) assert len(first.parents) == 0 - + def test_iteration(self): # we can iterate commits all_commits = Commit.list_items(self.rorepo, self.rorepo.head) assert all_commits assert all_commits == list(self.rorepo.iter_commits()) - + # this includes merge commits mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d') assert mcomit in all_commits - + # we can limit the result to paths ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES')) assert ltd_commits and len(ltd_commits) < len(all_commits) - + # show commits of multiple paths, resulting in a union of commits less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS'))) assert len(ltd_commits) < len(less_ltd_commits) - + def test_iter_items(self): # pretty not allowed self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw") - + def test_rev_list_bisect_all(self): """ 'git rev-list --bisect-all' returns additional information @@ -206,8 +205,8 @@ class TestCommit(TestBase): assert_equal(sha1, commit.hexsha) def test_count(self): - assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143 - + assert self.rorepo.tag('refs/tags/0.1.5').commit.count() == 143 + def test_list(self): assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit) @@ -222,10 +221,10 @@ class TestCommit(TestBase): def test_equality(self): commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA) commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA) - commit3 = Commit(self.rorepo, "\1"*20) + commit3 = Commit(self.rorepo, "\1" * 20) assert_equal(commit1, commit2) assert_not_equal(commit2, commit3) - + def test_iter_parents(self): # should return all but ourselves, even if skip is defined c = self.rorepo.commit('0.1.5') @@ -234,40 +233,40 @@ class TestCommit(TestBase): first_parent = piter.next() assert first_parent != c assert first_parent == c.parents[0] - # END for each - + # END for each + def test_base(self): name_rev = self.rorepo.head.commit.name_rev assert isinstance(name_rev, basestring) - + @with_rw_repo('HEAD', bare=True) def test_serialization(self, rwrepo): # create all commits of our repo assert_commit_serialization(rwrepo, '0.1.6') - + def test_serialization_unicode_support(self): assert Commit.default_encoding.lower() == 'utf-8' - + # create a commit with unicode in the message, and the author's name # Verify its serialization and deserialization cmt = self.rorepo.commit('0.1.6') assert isinstance(cmt.message, unicode) # it automatically decodes it as such - assert isinstance(cmt.author.name, unicode) # same here - + assert isinstance(cmt.author.name, unicode) # same here + cmt.message = "üäêèß".decode("utf-8") assert len(cmt.message) == 5 - + cmt.author.name = "äüß".decode("utf-8") assert len(cmt.author.name) == 3 - + cstream = StringIO() cmt._serialize(cstream) cstream.seek(0) assert len(cstream.getvalue()) - + ncmt = Commit(self.rorepo, cmt.binsha) ncmt._deserialize(cstream) - + assert cmt.author.name == ncmt.author.name assert cmt.message == ncmt.message # actually, it can't be printed in a shell as repr wants to have ascii only diff --git a/git/test/test_config.py b/git/test/test_config.py index b397b193..b6888023 100644 --- a/git/test/test_config.py +++ b/git/test/test_config.py @@ -10,36 +10,37 @@ import StringIO from copy import copy from ConfigParser import NoSectionError + class TestBase(TestCase): - + def _to_memcache(self, file_path): fp = open(file_path, "r") sio = StringIO.StringIO(fp.read()) sio.name = file_path return sio - + def _parsers_equal_or_raise(self, lhs, rhs): pass - + def test_read_write(self): # writer must create the exact same file as the one read before for filename in ("git_config", "git_config_global"): file_obj = self._to_memcache(fixture_path(filename)) file_obj_orig = copy(file_obj) - w_config = GitConfigParser(file_obj, read_only = False) + w_config = GitConfigParser(file_obj, read_only=False) w_config.read() # enforce reading assert w_config._sections w_config.write() # enforce writing - + # we stripped lines when reading, so the results differ assert file_obj.getvalue() != file_obj_orig.getvalue() - + # creating an additional config writer must fail due to exclusive access - self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False) - + self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only=False) + # should still have a lock and be able to make changes assert w_config._lock._has_lock() - + # changes should be written right away sname = "my_section" oname = "mykey" @@ -47,15 +48,15 @@ class TestBase(TestCase): w_config.add_section(sname) assert w_config.has_section(sname) w_config.set(sname, oname, val) - assert w_config.has_option(sname,oname) + assert w_config.has_option(sname, oname) assert w_config.get(sname, oname) == val - + sname_new = "new_section" oname_new = "new_key" ival = 10 w_config.set_value(sname_new, oname_new, ival) assert w_config.get_value(sname_new, oname_new) == ival - + file_obj.seek(0) r_config = GitConfigParser(file_obj, read_only=True) #print file_obj.getvalue() @@ -63,7 +64,7 @@ class TestBase(TestCase): assert r_config.has_option(sname, oname) assert r_config.get(sname, oname) == val # END for each filename - + def test_base(self): path_repo = fixture_path("git_config") path_global = fixture_path("git_config_global") @@ -71,7 +72,7 @@ class TestBase(TestCase): assert r_config.read_only num_sections = 0 num_options = 0 - + # test reader methods assert r_config._is_initialized == False for section in r_config.sections(): @@ -84,21 +85,19 @@ class TestBase(TestCase): assert val assert "\n" not in option assert "\n" not in val - + # writing must fail self.failUnlessRaises(IOError, r_config.set, section, option, None) - self.failUnlessRaises(IOError, r_config.remove_option, section, option ) + self.failUnlessRaises(IOError, r_config.remove_option, section, option) # END for each option self.failUnlessRaises(IOError, r_config.remove_section, section) - # END for each section + # END for each section assert num_sections and num_options assert r_config._is_initialized == True - + # get value which doesnt exist, with default default = "my default value" assert r_config.get_value("doesnt", "exist", default) == default - + # it raises if there is no default though self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist") - - diff --git a/git/test/test_db.py b/git/test/test_db.py index dc8190a7..b53c4209 100644 --- a/git/test/test_db.py +++ b/git/test/test_db.py @@ -9,17 +9,18 @@ from gitdb.util import bin_to_hex from git.exc import BadObject import os + class TestDB(TestBase): - + def test_base(self): gdb = GitCmdObjectDB(os.path.join(self.rorepo.git_dir, 'objects'), self.rorepo.git) - + # partial to complete - works with everything hexsha = bin_to_hex(gdb.partial_to_complete_sha_hex("0.1.6")) assert len(hexsha) == 40 - + assert bin_to_hex(gdb.partial_to_complete_sha_hex(hexsha[:20])) == hexsha - + # fails with BadObject for invalid_rev in ("0000", "bad/ref", "super bad"): self.failUnlessRaises(BadObject, gdb.partial_to_complete_sha_hex, invalid_rev) diff --git a/git/test/test_diff.py b/git/test/test_diff.py index 80652c8e..151a3d14 100644 --- a/git/test/test_diff.py +++ b/git/test/test_diff.py @@ -7,8 +7,9 @@ from git.test.lib import * from git import * + class TestDiff(TestBase): - + def _assert_diff_format(self, diffs): # verify that the format of the diff is sane for diff in diffs: @@ -16,19 +17,19 @@ class TestDiff(TestBase): assert isinstance(diff.a_mode, int) if diff.b_mode: assert isinstance(diff.b_mode, int) - + if diff.a_blob: assert not diff.a_blob.path.endswith('\n') if diff.b_blob: assert not diff.b_blob.path.endswith('\n') # END for each diff return diffs - + def test_list_from_string_new_mode(self): output = StringProcessAdapter(fixture('diff_new_mode')) diffs = Diff._index_from_patch_format(self.rorepo, output.stdout) self._assert_diff_format(diffs) - + assert_equal(1, len(diffs)) assert_equal(10, len(diffs[0].diff.splitlines())) @@ -36,7 +37,7 @@ class TestDiff(TestBase): output = StringProcessAdapter(fixture('diff_rename')) diffs = Diff._index_from_patch_format(self.rorepo, output.stdout) self._assert_diff_format(diffs) - + assert_equal(1, len(diffs)) diff = diffs[0] @@ -47,10 +48,10 @@ class TestDiff(TestBase): def test_diff_patch_format(self): # test all of the 'old' format diffs for completness - it should at least # be able to deal with it - fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only", - "diff_new_mode", "diff_numstat", "diff_p", "diff_rename", - "diff_tree_numstat_root" ) - + fixtures = ("diff_2", "diff_2f", "diff_f", "diff_i", "diff_mode_only", + "diff_new_mode", "diff_numstat", "diff_p", "diff_rename", + "diff_tree_numstat_root") + for fixture_name in fixtures: diff_proc = StringProcessAdapter(fixture(fixture_name)) diffs = Diff._index_from_patch_format(self.rorepo, diff_proc.stdout) @@ -61,24 +62,24 @@ class TestDiff(TestBase): assertion_map = dict() for i, commit in enumerate(self.rorepo.iter_commits('0.1.6', max_count=2)): diff_item = commit - if i%2 == 0: + if i % 2 == 0: diff_item = commit.tree # END use tree every second item - + for other in (None, commit.Index, commit.parents[0]): for paths in (None, "CHANGES", ("CHANGES", "lib")): for create_patch in range(2): diff_index = diff_item.diff(other, paths, create_patch) assert isinstance(diff_index, DiffIndex) - + if diff_index: self._assert_diff_format(diff_index) for ct in DiffIndex.change_type: - key = 'ct_%s'%ct + key = 'ct_%s' % ct assertion_map.setdefault(key, 0) - assertion_map[key] = assertion_map[key]+len(list(diff_index.iter_change_type(ct))) + assertion_map[key] = assertion_map[key] + len(list(diff_index.iter_change_type(ct))) # END for each changetype - + # check entries diff_set = set() diff_set.add(diff_index[0]) @@ -86,23 +87,21 @@ class TestDiff(TestBase): assert len(diff_set) == 1 assert diff_index[0] == diff_index[0] assert not (diff_index[0] != diff_index[0]) - # END diff index checking + # END diff index checking # END for each patch option # END for each path option # END for each other side # END for each commit - - # assert we could always find at least one instance of the members we + + # assert we could always find at least one instance of the members we # can iterate in the diff index - if not this indicates its not working correctly # or our test does not span the whole range of possibilities - for key,value in assertion_map.items(): + for key, value in assertion_map.items(): assert value, "Did not find diff for %s" % key - # END for each iteration type - + # END for each iteration type + # test path not existing in the index - should be ignored c = self.rorepo.head.commit cp = c.parents[0] diff_index = c.diff(cp, ["does/not/exist"]) assert len(diff_index) == 0 - - diff --git a/git/test/test_fun.py b/git/test/test_fun.py index 096cd368..4672901c 100644 --- a/git/test/test_fun.py +++ b/git/test/test_fun.py @@ -1,7 +1,7 @@ from git.test.lib import * from git.objects.fun import ( traverse_tree_recursive, - traverse_trees_recursive, + traverse_trees_recursive, tree_to_stream, tree_entries_from_data ) @@ -15,7 +15,7 @@ from gitdb.base import IStream from gitdb.typ import str_tree_type from stat import ( - S_IFDIR, + S_IFDIR, S_IFREG, S_IFLNK ) @@ -23,8 +23,9 @@ from stat import ( from git.index import IndexFile from cStringIO import StringIO + class TestFun(TestBase): - + def _assert_index_entries(self, entries, trees): index = IndexFile.from_tree(self.rorepo, *[self.rorepo.tree(bin_to_hex(t)) for t in trees]) assert entries @@ -32,22 +33,22 @@ class TestFun(TestBase): for entry in entries: assert (entry.path, entry.stage) in index.entries # END assert entry matches fully - + def test_aggressive_tree_merge(self): # head tree with additions, removals and modification compared to its predecessor odb = self.rorepo.odb - HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c") + HC = self.rorepo.commit("6c1faef799095f3990e9970bc2cb10aa0221cf9c") H = HC.tree B = HC.parents[0].tree - + # entries from single tree trees = [H.binsha] self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - + # from multiple trees trees = [B.binsha, H.binsha] self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - + # three way, no conflict tree = self.rorepo.tree B = tree("35a09c0534e89b2d43ec4101a5fb54576b577905") @@ -55,16 +56,16 @@ class TestFun(TestBase): M = tree("1f2b19de3301e76ab3a6187a49c9c93ff78bafbd") trees = [B.binsha, H.binsha, M.binsha] self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - + # three-way, conflict in at least one file, both modified B = tree("a7a4388eeaa4b6b94192dce67257a34c4a6cbd26") H = tree("f9cec00938d9059882bb8eabdaf2f775943e00e5") M = tree("44a601a068f4f543f73fd9c49e264c931b1e1652") trees = [B.binsha, H.binsha, M.binsha] self._assert_index_entries(aggressive_tree_merge(odb, trees), trees) - + # too many trees - self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees*2) + self.failUnlessRaises(ValueError, aggressive_tree_merge, odb, trees * 2) def mktree(self, odb, entries): """create a tree from the given tree entries and safe it to the database""" @@ -73,122 +74,123 @@ class TestFun(TestBase): sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) return istream.binsha - + @with_rw_repo('0.1.6') def test_three_way_merge(self, rwrepo): def mkfile(name, sha, executable=0): - return (sha, S_IFREG | 0644 | executable*0111, name) + return (sha, S_IFREG | 0644 | executable * 0111, name) + def mkcommit(name, sha): return (sha, S_IFDIR | S_IFLNK, name) + def assert_entries(entries, num_entries, has_conflict=False): assert len(entries) == num_entries assert has_conflict == (len([e for e in entries if e.stage != 0]) > 0) mktree = self.mktree - - shaa = "\1"*20 - shab = "\2"*20 - shac = "\3"*20 - + + shaa = "\1" * 20 + shab = "\2" * 20 + shac = "\3" * 20 + odb = rwrepo.odb - + # base tree bfn = 'basefile' fbase = mkfile(bfn, shaa) tb = mktree(odb, [fbase]) - + # non-conflicting new files, same data fa = mkfile('1', shab) th = mktree(odb, [fbase, fa]) fb = mkfile('2', shac) tm = mktree(odb, [fbase, fb]) - + # two new files, same base file trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 3) - + # both delete same file, add own one fa = mkfile('1', shab) th = mktree(odb, [fa]) fb = mkfile('2', shac) tm = mktree(odb, [fb]) - + # two new files trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 2) - + # same file added in both, differently fa = mkfile('1', shab) th = mktree(odb, [fa]) fb = mkfile('1', shac) tm = mktree(odb, [fb]) - + # expect conflict trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 2, True) - + # same file added, different mode fa = mkfile('1', shab) th = mktree(odb, [fa]) fb = mkcommit('1', shab) tm = mktree(odb, [fb]) - + # expect conflict trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 2, True) - + # same file added in both fa = mkfile('1', shab) th = mktree(odb, [fa]) fb = mkfile('1', shab) tm = mktree(odb, [fb]) - + # expect conflict trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 1) - + # modify same base file, differently fa = mkfile(bfn, shab) th = mktree(odb, [fa]) fb = mkfile(bfn, shac) tm = mktree(odb, [fb]) - + # conflict, 3 versions on 3 stages trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 3, True) - - + # change mode on same base file, by making one a commit, the other executable # no content change ( this is totally unlikely to happen in the real world ) fa = mkcommit(bfn, shaa) th = mktree(odb, [fa]) fb = mkfile(bfn, shaa, executable=1) tm = mktree(odb, [fb]) - + # conflict, 3 versions on 3 stages, because of different mode trees = [tb, th, tm] assert_entries(aggressive_tree_merge(odb, trees), 3, True) - + for is_them in range(2): # only we/they change contents fa = mkfile(bfn, shab) th = mktree(odb, [fa]) - + trees = [tb, th, tb] if is_them: trees = [tb, tb, th] entries = aggressive_tree_merge(odb, trees) assert len(entries) == 1 and entries[0].binsha == shab - + # only we/they change the mode fa = mkcommit(bfn, shaa) th = mktree(odb, [fa]) - + trees = [tb, th, tb] if is_them: trees = [tb, tb, th] entries = aggressive_tree_merge(odb, trees) assert len(entries) == 1 and entries[0].binsha == shaa and entries[0].mode == fa[1] - + # one side deletes, the other changes = conflict fa = mkfile(bfn, shab) th = mktree(odb, [fa]) @@ -199,16 +201,16 @@ class TestFun(TestBase): # as one is deleted, there are only 2 entries assert_entries(aggressive_tree_merge(odb, trees), 2, True) # END handle ours, theirs - + def _assert_tree_entries(self, entries, num_trees): for entry in entries: assert len(entry) == num_trees paths = set(e[2] for e in entry if e) - + # only one path per set of entries assert len(paths) == 1 # END verify entry - + def test_tree_traversal(self): # low level tree tarversal odb = self.rorepo.odb @@ -216,29 +218,29 @@ class TestFun(TestBase): M = self.rorepo.tree('e14e3f143e7260de9581aee27e5a9b2645db72de') # merge tree B = self.rorepo.tree('f606937a7a21237c866efafcad33675e6539c103') # base tree B_old = self.rorepo.tree('1f66cfbbce58b4b552b041707a12d437cc5f400a') # old base tree - + # two very different trees entries = traverse_trees_recursive(odb, [B_old.binsha, H.binsha], '') self._assert_tree_entries(entries, 2) - + oentries = traverse_trees_recursive(odb, [H.binsha, B_old.binsha], '') assert len(oentries) == len(entries) self._assert_tree_entries(oentries, 2) - + # single tree is_no_tree = lambda i, d: i.type != 'tree' entries = traverse_trees_recursive(odb, [B.binsha], '') assert len(entries) == len(list(B.traverse(predicate=is_no_tree))) self._assert_tree_entries(entries, 1) - + # two trees entries = traverse_trees_recursive(odb, [B.binsha, H.binsha], '') self._assert_tree_entries(entries, 2) - + # tree trees entries = traverse_trees_recursive(odb, [B.binsha, H.binsha, M.binsha], '') self._assert_tree_entries(entries, 3) - + def test_tree_traversal_single(self): max_count = 50 count = 0 diff --git a/git/test/test_git.py b/git/test/test_git.py index 7132aa83..49c256ca 100644 --- a/git/test/test_git.py +++ b/git/test/test_git.py @@ -5,15 +5,16 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php import os -from git.test.lib import ( TestBase, - patch, +from git.test.lib import (TestBase, + patch, raises, assert_equal, assert_true, assert_match, - fixture_path ) -from git import ( Git, - GitCommandError ) + fixture_path) +from git import (Git, + GitCommandError) + class TestGit(TestBase): @@ -41,7 +42,6 @@ class TestGit(TestBase): def test_it_raises_errors(self): self.git.this_does_not_exist() - def test_it_transforms_kwargs_into_git_command_arguments(self): assert_equal(["-s"], self.git.transform_kwargs(**{'s': True})) assert_equal(["-s5"], self.git.transform_kwargs(**{'s': 5})) @@ -52,7 +52,7 @@ class TestGit(TestBase): assert_equal(["-s", "-t"], self.git.transform_kwargs(**{'s': True, 't': True})) def test_it_executes_git_to_shell_and_returns_result(self): - assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git","version"])) + assert_match('^git version [\d\.]{2}.*$', self.git.execute(["git", "version"])) def test_it_accepts_stdin(self): filename = fixture_path("cat_file_blob") @@ -71,13 +71,13 @@ class TestGit(TestBase): # read header only import subprocess as sp hexsha = "b2339455342180c7cc1e9bba3e9f181f7baa5167" - g = self.git.cat_file(batch_check=True, istream=sp.PIPE,as_process=True) + g = self.git.cat_file(batch_check=True, istream=sp.PIPE, as_process=True) g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n") g.stdin.flush() obj_info = g.stdout.readline() # read header + data - g = self.git.cat_file(batch=True, istream=sp.PIPE,as_process=True) + g = self.git.cat_file(batch=True, istream=sp.PIPE, as_process=True) g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n") g.stdin.flush() obj_info_two = g.stdout.readline() @@ -87,15 +87,14 @@ class TestGit(TestBase): size = int(obj_info.split()[2]) data = g.stdout.read(size) g.stdout.read(1) - + # now we should be able to read a new object g.stdin.write("b2339455342180c7cc1e9bba3e9f181f7baa5167\n") g.stdin.flush() assert g.stdout.readline() == obj_info - # same can be achived using the respective command functions - hexsha, typename, size = self.git.get_object_header(hexsha) + hexsha, typename, size = self.git.get_object_header(hexsha) hexsha, typename_two, size_two, data = self.git.get_object_data(hexsha) assert typename == typename_two and size == size_two @@ -123,7 +122,7 @@ class TestGit(TestBase): self.assertEquals(git_version, git_command_version) def test_single_char_git_options_are_passed_to_git(self): - input_value='TestValue' + input_value = 'TestValue' output_value = self.git(c='user.name={}'.format(input_value)).config('--get', 'user.name') self.assertEquals(input_value, output_value) diff --git a/git/test/test_index.py b/git/test/test_index.py index d532a3b4..f1f718cd 100644 --- a/git/test/test_index.py +++ b/git/test/test_index.py @@ -14,12 +14,13 @@ import glob import shutil from stat import * + class TestIndex(TestBase): - + def __init__(self, *args): super(TestIndex, self).__init__(*args) self._reset_progress() - + def _assert_fprogress(self, entries): assert len(entries) == len(self._fprogress_map) for path, call_count in self._fprogress_map.iteritems(): @@ -35,48 +36,48 @@ class TestIndex(TestBase): if curval == 1: assert done self._fprogress_map[path] = curval + 1 - + def _fprogress_add(self, path, done, item): - """Called as progress func - we keep track of the proper + """Called as progress func - we keep track of the proper call order""" assert item is not None self._fprogress(path, done, item) - + def _reset_progress(self): # maps paths to the count of calls self._fprogress_map = dict() - + def _assert_entries(self, entries): for entry in entries: assert isinstance(entry, BaseIndexEntry) assert not os.path.isabs(entry.path) assert not "\\" in entry.path # END for each entry - + def test_index_file_base(self): # read from file index = IndexFile(self.rorepo, fixture_path("index")) assert index.entries assert index.version > 0 - + # test entry last_val = None entry = index.entries.itervalues().next() - for attr in ("path","ctime","mtime","dev","inode","mode","uid", - "gid","size","binsha", "hexsha", "stage"): + for attr in ("path", "ctime", "mtime", "dev", "inode", "mode", "uid", + "gid", "size", "binsha", "hexsha", "stage"): val = getattr(entry, attr) # END for each method - + # test update entries = index.entries assert isinstance(index.update(), IndexFile) assert entries is not index.entries - + # test stage index_merge = IndexFile(self.rorepo, fixture_path("index_merge")) assert len(index_merge.entries) == 106 - assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0 )) - + assert len(list(e for e in index_merge.entries.itervalues() if e.stage != 0)) + # write the data - it must match the original tmpfile = tempfile.mktemp() index_merge.write(tmpfile) @@ -84,82 +85,80 @@ class TestIndex(TestBase): assert fp.read() == fixture("index_merge") fp.close() os.remove(tmpfile) - + def _cmp_tree_index(self, tree, index): # fail unless both objects contain the same paths and blobs if isinstance(tree, str): tree = self.rorepo.commit(tree).tree - + num_blobs = 0 blist = list() - for blob in tree.traverse(predicate = lambda e,d: e.type == "blob", branch_first=False): - assert (blob.path,0) in index.entries + for blob in tree.traverse(predicate=lambda e, d: e.type == "blob", branch_first=False): + assert (blob.path, 0) in index.entries blist.append(blob) # END for each blob in tree if len(blist) != len(index.entries): iset = set(k[0] for k in index.entries.keys()) bset = set(b.path for b in blist) - raise AssertionError( "CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset-iset, iset-bset) ) + raise AssertionError("CMP Failed: Missing entries in index: %s, missing in tree: %s" % (bset - iset, iset - bset)) # END assertion message - + @with_rw_repo('0.1.6') def test_index_file_from_tree(self, rw_repo): common_ancestor_sha = "5117c9c8a4d3af19a9958677e45cda9269de1541" cur_sha = "4b43ca7ff72d5f535134241e7c797ddc9c7a3573" other_sha = "39f85c4358b7346fee22169da9cad93901ea9eb9" - - # simple index from tree + + # simple index from tree base_index = IndexFile.from_tree(rw_repo, common_ancestor_sha) assert base_index.entries self._cmp_tree_index(common_ancestor_sha, base_index) - + # merge two trees - its like a fast-forward two_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha) assert two_way_index.entries self._cmp_tree_index(cur_sha, two_way_index) - + # merge three trees - here we have a merge conflict three_way_index = IndexFile.from_tree(rw_repo, common_ancestor_sha, cur_sha, other_sha) assert len(list(e for e in three_way_index.entries.values() if e.stage != 0)) - - + # ITERATE BLOBS merge_required = lambda t: t[0] != 0 merge_blobs = list(three_way_index.iter_blobs(merge_required)) assert merge_blobs - assert merge_blobs[0][0] in (1,2,3) + assert merge_blobs[0][0] in (1, 2, 3) assert isinstance(merge_blobs[0][1], Blob) - + # test BlobFilter prefix = 'lib/git' for stage, blob in base_index.iter_blobs(BlobFilter([prefix])): - assert blob.path.startswith(prefix) - - + assert blob.path.startswith(prefix) + # writing a tree should fail with an unmerged index self.failUnlessRaises(UnmergedEntriesError, three_way_index.write_tree) - + # removed unmerged entries unmerged_blob_map = three_way_index.unmerged_blobs() assert unmerged_blob_map - + # pick the first blob at the first stage we find and use it as resolved version - three_way_index.resolve_blobs( l[0][1] for l in unmerged_blob_map.itervalues() ) + three_way_index.resolve_blobs(l[0][1] for l in unmerged_blob_map.itervalues()) tree = three_way_index.write_tree() assert isinstance(tree, Tree) num_blobs = 0 - for blob in tree.traverse(predicate=lambda item,d: item.type == "blob"): - assert (blob.path,0) in three_way_index.entries + for blob in tree.traverse(predicate=lambda item, d: item.type == "blob"): + assert (blob.path, 0) in three_way_index.entries num_blobs += 1 # END for each blob assert num_blobs == len(three_way_index.entries) - + @with_rw_repo('0.1.6') def test_index_merge_tree(self, rw_repo): - # A bit out of place, but we need a different repo for this: + # A bit out of place, but we need a different repo for this: assert self.rorepo != rw_repo and not (self.rorepo == rw_repo) assert len(set((self.rorepo, self.rorepo, rw_repo, rw_repo))) == 2 - + # SINGLE TREE MERGE # current index is at the (virtual) cur_commit next_commit = "4c39f9da792792d4e73fc3a5effde66576ae128c" @@ -169,107 +168,106 @@ class TestIndex(TestBase): rw_repo.index.merge_tree(next_commit) # only one change should be recorded assert manifest_entry.binsha != rw_repo.index.entries[manifest_key].binsha - + rw_repo.index.reset(rw_repo.head) assert rw_repo.index.entries[manifest_key].binsha == manifest_entry.binsha - + # FAKE MERGE ############# - # Add a change with a NULL sha that should conflict with next_commit. We - # pretend there was a change, but we do not even bother adding a proper + # Add a change with a NULL sha that should conflict with next_commit. We + # pretend there was a change, but we do not even bother adding a proper # sha for it ( which makes things faster of course ) - manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0"*20, 0, manifest_entry[3])) + manifest_fake_entry = BaseIndexEntry((manifest_entry[0], "\0" * 20, 0, manifest_entry[3])) # try write flag self._assert_entries(rw_repo.index.add([manifest_fake_entry], write=False)) - # add actually resolves the null-hex-sha for us as a feature, but we can + # add actually resolves the null-hex-sha for us as a feature, but we can # edit the index manually assert rw_repo.index.entries[manifest_key].binsha != Object.NULL_BIN_SHA - # must operate on the same index for this ! Its a bit problematic as + # must operate on the same index for this ! Its a bit problematic as # it might confuse people - index = rw_repo.index + index = rw_repo.index index.entries[manifest_key] = IndexEntry.from_base(manifest_fake_entry) index.write() assert rw_repo.index.entries[manifest_key].hexsha == Diff.NULL_HEX_SHA - + # write an unchanged index ( just for the fun of it ) rw_repo.index.write() - - # a three way merge would result in a conflict and fails as the command will - # not overwrite any entries in our index and hence leave them unmerged. This is + + # a three way merge would result in a conflict and fails as the command will + # not overwrite any entries in our index and hence leave them unmerged. This is # mainly a protection feature as the current index is not yet in a tree self.failUnlessRaises(GitCommandError, index.merge_tree, next_commit, base=parent_commit) - - # the only way to get the merged entries is to safe the current index away into a tree, + + # the only way to get the merged entries is to safe the current index away into a tree, # which is like a temporary commit for us. This fails as well as the NULL sha deos not # have a corresponding object # NOTE: missing_ok is not a kwarg anymore, missing_ok is always true # self.failUnlessRaises(GitCommandError, index.write_tree) - + # if missing objects are okay, this would work though ( they are always okay now ) tree = index.write_tree() - + # now make a proper three way merge with unmerged entries unmerged_tree = IndexFile.from_tree(rw_repo, parent_commit, tree, next_commit) unmerged_blobs = unmerged_tree.unmerged_blobs() assert len(unmerged_blobs) == 1 and unmerged_blobs.keys()[0] == manifest_key[0] - - + @with_rw_repo('0.1.6') def test_index_file_diffing(self, rw_repo): # default Index instance points to our index index = IndexFile(rw_repo) assert index.path is not None assert len(index.entries) - + # write the file back index.write() - + # could sha it, or check stats - + # test diff - # resetting the head will leave the index in a different state, and the + # resetting the head will leave the index in a different state, and the # diff will yield a few changes cur_head_commit = rw_repo.head.reference.commit ref = rw_repo.head.reset('HEAD~6', index=True, working_tree=False) - + # diff against same index is 0 diff = index.diff() assert len(diff) == 0 - + # against HEAD as string, must be the same as it matches index diff = index.diff('HEAD') assert len(diff) == 0 - + # against previous head, there must be a difference diff = index.diff(cur_head_commit) assert len(diff) - + # we reverse the result adiff = index.diff(str(cur_head_commit), R=True) odiff = index.diff(cur_head_commit, R=False) # now its not reversed anymore assert adiff != odiff assert odiff == diff # both unreversed diffs against HEAD - + # against working copy - its still at cur_commit wdiff = index.diff(None) assert wdiff != adiff assert wdiff != odiff - + # against something unusual self.failUnlessRaises(ValueError, index.diff, int) - + # adjust the index to match an old revision cur_branch = rw_repo.active_branch cur_commit = cur_branch.commit rev_head_parent = 'HEAD~1' assert index.reset(rev_head_parent) is index - + assert cur_branch == rw_repo.active_branch assert cur_commit == rw_repo.head.commit - + # there must be differences towards the working tree which is in the 'future' assert index.diff(None) - + # reset the working copy as well to current head,to pull 'back' as well new_data = "will be reverted" file_path = os.path.join(rw_repo.working_tree_dir, "CHANGES") @@ -280,12 +278,12 @@ class TestIndex(TestBase): assert not index.diff(None) assert cur_branch == rw_repo.active_branch assert cur_commit == rw_repo.head.commit - fp = open(file_path,'rb') + fp = open(file_path, 'rb') try: assert fp.read() != new_data finally: fp.close() - + # test full checkout test_file = os.path.join(rw_repo.working_tree_dir, "CHANGES") open(test_file, 'ab').write("some data") @@ -293,24 +291,24 @@ class TestIndex(TestBase): assert 'CHANGES' in list(rval) self._assert_fprogress([None]) assert os.path.isfile(test_file) - + os.remove(test_file) rval = index.checkout(None, force=False, fprogress=self._fprogress) assert 'CHANGES' in list(rval) self._assert_fprogress([None]) assert os.path.isfile(test_file) - + # individual file os.remove(test_file) rval = index.checkout(test_file, fprogress=self._fprogress) assert list(rval)[0] == 'CHANGES' self._assert_fprogress([test_file]) assert os.path.exists(test_file) - + # checking out non-existing file throws self.failUnlessRaises(CheckoutError, index.checkout, "doesnt_exist_ever.txt.that") self.failUnlessRaises(CheckoutError, index.checkout, paths=["doesnt/exist"]) - + # checkout file with modifications append_data = "hello" fp = open(test_file, "ab") @@ -325,16 +323,16 @@ class TestIndex(TestBase): assert open(test_file).read().endswith(append_data) else: raise AssertionError("Exception CheckoutError not thrown") - + # if we force it it should work index.checkout(test_file, force=True) assert not open(test_file).read().endswith(append_data) - + # checkout directory shutil.rmtree(os.path.join(rw_repo.working_tree_dir, "lib")) rval = index.checkout('lib') assert len(list(rval)) > 1 - + def _count_existing(self, repo, files): """ Returns count of files that actually exist in the repository directory. @@ -346,24 +344,24 @@ class TestIndex(TestBase): # END for each deleted file return existing # END num existing helper - + @with_rw_repo('0.1.6') def test_index_mutation(self, rw_repo): index = rw_repo.index num_entries = len(index.entries) cur_head = rw_repo.head - + uname = "Some Developer" umail = "sd@company.com" rw_repo.config_writer().set_value("user", "name", uname) - rw_repo.config_writer().set_value("user", "email", umail) - - # remove all of the files, provide a wild mix of paths, BaseIndexEntries, + rw_repo.config_writer().set_value("user", "email", umail) + + # remove all of the files, provide a wild mix of paths, BaseIndexEntries, # IndexEntries def mixed_iterator(): count = 0 for entry in index.entries.itervalues(): - type_id = count % 4 + type_id = count % 4 if type_id == 0: # path yield entry.path elif type_id == 1: # blob @@ -375,39 +373,39 @@ class TestIndex(TestBase): else: raise AssertionError("Invalid Type") count += 1 - # END for each entry + # END for each entry # END mixed iterator deleted_files = index.remove(mixed_iterator(), working_tree=False) assert deleted_files assert self._count_existing(rw_repo, deleted_files) == len(deleted_files) assert len(index.entries) == 0 - + # reset the index to undo our changes index.reset() assert len(index.entries) == num_entries - + # remove with working copy deleted_files = index.remove(mixed_iterator(), working_tree=True) assert deleted_files assert self._count_existing(rw_repo, deleted_files) == 0 - + # reset everything index.reset(working_tree=True) assert self._count_existing(rw_repo, deleted_files) == len(deleted_files) - + # invalid type self.failUnlessRaises(TypeError, index.remove, [1]) - + # absolute path - deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir,"lib")], r=True) + deleted_files = index.remove([os.path.join(rw_repo.working_tree_dir, "lib")], r=True) assert len(deleted_files) > 1 self.failUnlessRaises(ValueError, index.remove, ["/doesnt/exists"]) - + # TEST COMMITTING # commit changed index cur_commit = cur_head.commit commit_message = "commit default head" - + new_commit = index.commit(commit_message, head=False) assert cur_commit != new_commit assert new_commit.author.name == uname @@ -418,66 +416,66 @@ class TestIndex(TestBase): assert new_commit.parents[0] == cur_commit assert len(new_commit.parents) == 1 assert cur_head.commit == cur_commit - + # same index, no parents commit_message = "index without parents" commit_no_parents = index.commit(commit_message, parent_commits=list(), head=True) assert commit_no_parents.message == commit_message assert len(commit_no_parents.parents) == 0 assert cur_head.commit == commit_no_parents - + # same index, multiple parents commit_message = "Index with multiple parents\n commit with another line" - commit_multi_parent = index.commit(commit_message,parent_commits=(commit_no_parents, new_commit)) + commit_multi_parent = index.commit(commit_message, parent_commits=(commit_no_parents, new_commit)) assert commit_multi_parent.message == commit_message assert len(commit_multi_parent.parents) == 2 assert commit_multi_parent.parents[0] == commit_no_parents assert commit_multi_parent.parents[1] == new_commit assert cur_head.commit == commit_multi_parent - + # re-add all files in lib # get the lib folder back on disk, but get an index without it index.reset(new_commit.parents[0], working_tree=True).reset(new_commit, working_tree=False) lib_file_path = os.path.join("lib", "git", "__init__.py") assert (lib_file_path, 0) not in index.entries assert os.path.isfile(os.path.join(rw_repo.working_tree_dir, lib_file_path)) - + # directory entries = index.add(['lib'], fprogress=self._fprogress_add) self._assert_entries(entries) self._assert_fprogress(entries) - assert len(entries)>1 - - # glob + assert len(entries) > 1 + + # glob entries = index.reset(new_commit).add([os.path.join('lib', 'git', '*.py')], fprogress=self._fprogress_add) self._assert_entries(entries) self._assert_fprogress(entries) assert len(entries) == 14 - - # same file - entries = index.reset(new_commit).add([os.path.abspath(os.path.join('lib', 'git', 'head.py'))]*2, fprogress=self._fprogress_add) + + # same file + entries = index.reset(new_commit).add([os.path.abspath(os.path.join('lib', 'git', 'head.py'))] * 2, fprogress=self._fprogress_add) self._assert_entries(entries) assert entries[0].mode & 0644 == 0644 # would fail, test is too primitive to handle this case # self._assert_fprogress(entries) self._reset_progress() assert len(entries) == 2 - + # missing path self.failUnlessRaises(OSError, index.reset(new_commit).add, ['doesnt/exist/must/raise']) - + # blob from older revision overrides current index revision old_blob = new_commit.parents[0].tree.blobs[0] entries = index.reset(new_commit).add([old_blob], fprogress=self._fprogress_add) self._assert_entries(entries) self._assert_fprogress(entries) - assert index.entries[(old_blob.path,0)].hexsha == old_blob.hexsha and len(entries) == 1 - + assert index.entries[(old_blob.path, 0)].hexsha == old_blob.hexsha and len(entries) == 1 + # mode 0 not allowed null_hex_sha = Diff.NULL_HEX_SHA null_bin_sha = "\0" * 20 - self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_bin_sha,0,"doesntmatter"))]) - + self.failUnlessRaises(ValueError, index.reset(new_commit).add, [BaseIndexEntry((0, null_bin_sha, 0, "doesntmatter"))]) + # add new file new_file_relapath = "my_new_file" new_file_path = self._make_file(new_file_relapath, "hello world", rw_repo) @@ -485,7 +483,7 @@ class TestIndex(TestBase): self._assert_entries(entries) self._assert_fprogress(entries) assert len(entries) == 1 and entries[0].hexsha != null_hex_sha - + # add symlink if sys.platform != "win32": basename = "my_real_symlink" @@ -497,11 +495,11 @@ class TestIndex(TestBase): self._assert_fprogress(entries) assert len(entries) == 1 and S_ISLNK(entries[0].mode) assert S_ISLNK(index.entries[index.entry_key("my_real_symlink", 0)].mode) - + # we expect only the target to be written assert index.repo.odb.stream(entries[0].binsha).read() == target - # END real symlink test - + # END real symlink test + # add fake symlink and assure it checks-our as symlink fake_symlink_relapath = "my_fake_symlink" link_target = "/etc/that" @@ -512,83 +510,83 @@ class TestIndex(TestBase): self._assert_fprogress(entries) assert entries[0].hexsha != null_hex_sha assert len(entries) == 1 and S_ISLNK(entries[0].mode) - + # assure this also works with an alternate method full_index_entry = IndexEntry.from_base(BaseIndexEntry((0120000, entries[0].binsha, 0, entries[0].path))) entry_key = index.entry_key(full_index_entry) index.reset(new_commit) - + assert entry_key not in index.entries index.entries[entry_key] = full_index_entry index.write() index.update() # force reread of entries new_entry = index.entries[entry_key] assert S_ISLNK(new_entry.mode) - + # a tree created from this should contain the symlink tree = index.write_tree() assert fake_symlink_relapath in tree index.write() # flush our changes for the checkout - + # checkout the fakelink, should be a link then assert not S_ISLNK(os.stat(fake_symlink_path)[ST_MODE]) os.remove(fake_symlink_path) index.checkout(fake_symlink_path) - + # on windows we will never get symlinks if os.name == 'nt': - # simlinks should contain the link as text ( which is what a + # simlinks should contain the link as text ( which is what a # symlink actually is ) - open(fake_symlink_path,'rb').read() == link_target + open(fake_symlink_path, 'rb').read() == link_target else: assert S_ISLNK(os.lstat(fake_symlink_path)[ST_MODE]) - + # TEST RENAMING def assert_mv_rval(rval): for source, dest in rval: assert not os.path.exists(source) and os.path.exists(dest) # END for each renamed item # END move assertion utility - + self.failUnlessRaises(ValueError, index.move, ['just_one_path']) # file onto existing file files = ['AUTHORS', 'LICENSE'] self.failUnlessRaises(GitCommandError, index.move, files) - - # again, with force + + # again, with force assert_mv_rval(index.move(files, f=True)) - + # files into directory - dry run paths = ['LICENSE', 'VERSION', 'doc'] rval = index.move(paths, dry_run=True) assert len(rval) == 2 assert os.path.exists(paths[0]) - + # again, no dry run rval = index.move(paths) assert_mv_rval(rval) - + # dir into dir rval = index.move(['doc', 'test']) assert_mv_rval(rval) - - + # TEST PATH REWRITING ###################### count = [0] + def rewriter(entry): rval = str(count[0]) count[0] += 1 return rval # END rewriter - + def make_paths(): # two existing ones, one new one yield 'CHANGES' yield 'ez_setup.py' yield index.entries[index.entry_key('README', 0)] yield index.entries[index.entry_key('.gitignore', 0)] - + for fid in range(3): fname = 'newfile%i' % fid open(fname, 'wb').write("abcd") @@ -597,11 +595,10 @@ class TestIndex(TestBase): # END path producer paths = list(make_paths()) self._assert_entries(index.add(paths, path_rewriter=rewriter)) - + for filenum in range(len(paths)): assert index.entry_key(str(filenum), 0) in index.entries - - + # TEST RESET ON PATHS ###################### arela = "aa" @@ -613,34 +610,33 @@ class TestIndex(TestBase): keys = (akey, bkey) absfiles = (afile, bfile) files = (arela, brela) - + for fkey in keys: assert not fkey in index.entries - + index.add(files, write=True) nc = index.commit("2 files committed", head=False) - + for fkey in keys: assert fkey in index.entries - + # just the index index.reset(paths=(arela, afile)) assert not akey in index.entries assert bkey in index.entries - + # now with working tree - files on disk as well as entries must be recreated rw_repo.head.commit = nc for absfile in absfiles: os.remove(absfile) - + index.reset(working_tree=True, paths=files) - - for fkey in keys: + + for fkey in keys: assert fkey in index.entries for absfile in absfiles: assert os.path.isfile(absfile) - - + @with_rw_repo('HEAD') def test_compare_write_tree(self, rw_repo): # write all trees and compare them @@ -654,16 +650,14 @@ class TestIndex(TestBase): index = rw_repo.index.reset(commit) orig_tree = commit.tree assert index.write_tree() == orig_tree - # END for each commit - + # END for each commit + def test_index_new(self): B = self.rorepo.tree("6d9b1f4f9fa8c9f030e3207e7deacc5d5f8bba4e") H = self.rorepo.tree("25dca42bac17d511b7e2ebdd9d1d679e7626db5f") M = self.rorepo.tree("e746f96bcc29238b79118123028ca170adc4ff0f") - - for args in ((B,), (B,H), (B,H,M)): + + for args in ((B,), (B, H), (B, H, M)): index = IndexFile.new(self.rorepo, *args) assert isinstance(index, IndexFile) # END for each arg tuple - - diff --git a/git/test/test_reflog.py b/git/test/test_reflog.py index fca9e1cd..fec50095 100644 --- a/git/test/test_reflog.py +++ b/git/test/test_reflog.py @@ -7,6 +7,7 @@ import tempfile import shutil import os + class TestRefLog(TestBase): def test_reflogentry(self): @@ -14,51 +15,51 @@ class TestRefLog(TestBase): hexsha = 'F' * 40 actor = Actor('name', 'email') msg = "message" - + self.failUnlessRaises(ValueError, RefLogEntry.new, nullhexsha, hexsha, 'noactor', 0, 0, "") e = RefLogEntry.new(nullhexsha, hexsha, actor, 0, 1, msg) - + assert e.oldhexsha == nullhexsha assert e.newhexsha == hexsha assert e.actor == actor assert e.time[0] == 0 assert e.time[1] == 1 assert e.message == msg - + # check representation (roughly) assert repr(e).startswith(nullhexsha) - + def test_base(self): rlp_head = fixture_path('reflog_HEAD') rlp_master = fixture_path('reflog_master') tdir = tempfile.mktemp(suffix="test_reflogs") os.mkdir(tdir) - - rlp_master_ro = RefLog.path(self.rorepo.head) + + rlp_master_ro = RefLog.path(self.rorepo.head) assert os.path.isfile(rlp_master_ro) - + # simple read reflog = RefLog.from_file(rlp_master_ro) assert reflog._path is not None assert isinstance(reflog, RefLog) assert len(reflog) - + # iter_entries works with path and with stream assert len(list(RefLog.iter_entries(open(rlp_master)))) assert len(list(RefLog.iter_entries(rlp_master))) - + # raise on invalid revlog # TODO: Try multiple corrupted ones ! pp = 'reflog_invalid_' for suffix in ('oldsha', 'newsha', 'email', 'date', 'sep'): - self.failUnlessRaises(ValueError, RefLog.from_file, fixture_path(pp+suffix)) + self.failUnlessRaises(ValueError, RefLog.from_file, fixture_path(pp + suffix)) #END for each invalid file - + # cannot write an uninitialized reflog self.failUnlessRaises(ValueError, RefLog().write) - + # test serialize and deserialize - results must match exactly - binsha = chr(255)*20 + binsha = chr(255) * 20 msg = "my reflog message" cr = self.rorepo.config_reader() for rlp in (rlp_head, rlp_master): @@ -66,35 +67,34 @@ class TestRefLog(TestBase): tfile = os.path.join(tdir, os.path.basename(rlp)) reflog.to_file(tfile) assert reflog.write() is reflog - + # parsed result must match ... treflog = RefLog.from_file(tfile) assert treflog == reflog - + # ... as well as each bytes of the written stream assert open(tfile).read() == open(rlp).read() - + # append an entry entry = RefLog.append_entry(cr, tfile, IndexObject.NULL_BIN_SHA, binsha, msg) assert entry.oldhexsha == IndexObject.NULL_HEX_SHA - assert entry.newhexsha == 'f'*40 + assert entry.newhexsha == 'f' * 40 assert entry.message == msg assert RefLog.from_file(tfile)[-1] == entry - + # index entry # raises on invalid index self.failUnlessRaises(IndexError, RefLog.entry_at, rlp, 10000) - + # indices can be positive ... assert isinstance(RefLog.entry_at(rlp, 0), RefLogEntry) RefLog.entry_at(rlp, 23) - + # ... and negative for idx in (-1, -24): RefLog.entry_at(rlp, idx) #END for each index to read - # END for each reflog - - + # END for each reflog + # finally remove our temporary data shutil.rmtree(tdir) diff --git a/git/test/test_refs.py b/git/test/test_refs.py index cf08d7ec..ee9d8074 100644 --- a/git/test/test_refs.py +++ b/git/test/test_refs.py @@ -13,39 +13,40 @@ from git.objects.tag import TagObject from itertools import chain import os + class TestRefs(TestBase): def test_from_path(self): # should be able to create any reference directly - for ref_type in ( Reference, Head, TagReference, RemoteReference ): + for ref_type in (Reference, Head, TagReference, RemoteReference): for name in ('rela_name', 'path/rela_name'): full_path = ref_type.to_full_path(name) instance = ref_type.from_path(self.rorepo, full_path) assert isinstance(instance, ref_type) - # END for each name + # END for each name # END for each type - + # invalid path self.failUnlessRaises(ValueError, TagReference, self.rorepo, "refs/invalid/tag") # works without path check TagReference(self.rorepo, "refs/invalid/tag", check_path=False) - + def test_tag_base(self): tag_object_refs = list() for tag in self.rorepo.tags: assert "refs/tags" in tag.path assert tag.name - assert isinstance( tag.commit, Commit ) + assert isinstance(tag.commit, Commit) if tag.tag is not None: - tag_object_refs.append( tag ) + tag_object_refs.append(tag) tagobj = tag.tag # have no dict self.failUnlessRaises(AttributeError, setattr, tagobj, 'someattr', 1) - assert isinstance( tagobj, TagObject ) + assert isinstance(tagobj, TagObject) assert tagobj.tag == tag.name - assert isinstance( tagobj.tagger, Actor ) - assert isinstance( tagobj.tagged_date, int ) - assert isinstance( tagobj.tagger_tz_offset, int ) + assert isinstance(tagobj.tagger, Actor) + assert isinstance(tagobj.tagged_date, int) + assert isinstance(tagobj.tagger_tz_offset, int) assert tagobj.message assert tag.object == tagobj # can't assign the object @@ -55,16 +56,13 @@ class TestRefs(TestBase): assert tag_object_refs assert isinstance(self.rorepo.tags['0.1.5'], TagReference) - def test_tags_author(self): tag = self.rorepo.tags[0] tagobj = tag.tag - assert isinstance( tagobj.tagger, Actor ) + assert isinstance(tagobj.tagger, Actor) tagger_name = tagobj.tagger.name assert tagger_name == 'Michael Trier' - - def test_tags(self): # tag refs can point to tag objects or to commits s = set() @@ -79,8 +77,8 @@ class TestRefs(TestBase): s.add(ref) # END for each ref assert len(s) == ref_count - assert len(s|s) == ref_count - + assert len(s | s) == ref_count + @with_rw_repo('HEAD', bare=False) def test_heads(self, rwrepo): for head in rwrepo.heads: @@ -91,7 +89,7 @@ class TestRefs(TestBase): cur_object = head.object assert prev_object == cur_object # represent the same git object assert prev_object is not cur_object # but are different instances - + writer = head.config_writer() tv = "testopt" writer.set_value(tv, 1) @@ -99,7 +97,7 @@ class TestRefs(TestBase): del(writer) assert head.config_reader().get_value(tv) == 1 head.config_writer().remove_option(tv) - + # after the clone, we might still have a tracking branch setup head.set_tracking_branch(None) assert head.tracking_branch() is None @@ -109,7 +107,7 @@ class TestRefs(TestBase): head.set_tracking_branch(None) assert head.tracking_branch() is None # END for each head - + # verify REFLOG gets altered head = rwrepo.head cur_head = head.ref @@ -123,76 +121,73 @@ class TestRefs(TestBase): assert len(thlog) == hlog_len + 1 assert thlog[-1].oldhexsha == cur_commit.hexsha assert thlog[-1].newhexsha == pcommit.hexsha - + # the ref didn't change though assert len(cur_head.log()) == blog_len - + # head changes once again, cur_head doesn't change head.set_reference(cur_head, 'reattach head') - assert len(head.log()) == hlog_len+2 + assert len(head.log()) == hlog_len + 2 assert len(cur_head.log()) == blog_len - + # adjusting the head-ref also adjust the head, so both reflogs are # altered cur_head.set_commit(pcommit, 'changing commit') - assert len(cur_head.log()) == blog_len+1 - assert len(head.log()) == hlog_len+3 - - + assert len(cur_head.log()) == blog_len + 1 + assert len(head.log()) == hlog_len + 3 + # with automatic dereferencing assert head.set_commit(cur_commit, 'change commit once again') is head - assert len(head.log()) == hlog_len+4 - assert len(cur_head.log()) == blog_len+2 - + assert len(head.log()) == hlog_len + 4 + assert len(cur_head.log()) == blog_len + 2 + # a new branch has just a single entry other_head = Head.create(rwrepo, 'mynewhead', pcommit, logmsg='new head created') log = other_head.log() assert len(log) == 1 assert log[0].oldhexsha == pcommit.NULL_HEX_SHA assert log[0].newhexsha == pcommit.hexsha - - + def test_refs(self): types_found = set() for ref in self.rorepo.refs: types_found.add(type(ref)) - assert len(types_found) >= 3 - + assert len(types_found) >= 3 + def test_is_valid(self): assert Reference(self.rorepo, 'refs/doesnt/exist').is_valid() == False assert self.rorepo.head.is_valid() assert self.rorepo.head.reference.is_valid() assert SymbolicReference(self.rorepo, 'hellothere').is_valid() == False - + def test_orig_head(self): assert type(self.rorepo.head.orig_head()) == SymbolicReference - + @with_rw_repo('0.1.6') def test_head_reset(self, rw_repo): cur_head = rw_repo.head old_head_commit = cur_head.commit new_head_commit = cur_head.ref.commit.parents[0] - cur_head.reset(new_head_commit, index=True) # index only + cur_head.reset(new_head_commit, index=True) # index only assert cur_head.reference.commit == new_head_commit - + self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True) new_head_commit = new_head_commit.parents[0] cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt assert cur_head.reference.commit == new_head_commit - + # paths - make sure we have something to do rw_repo.index.reset(old_head_commit.parents[0]) - cur_head.reset(cur_head, paths = "test") - cur_head.reset(new_head_commit, paths = "lib") + cur_head.reset(cur_head, paths="test") + cur_head.reset(new_head_commit, paths="lib") # hard resets with paths don't work, its all or nothing - self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib") - + self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths="lib") + # we can do a mixed reset, and then checkout from the index though cur_head.reset(new_head_commit) - rw_repo.index.checkout(["lib"], force=True)# - - - # now that we have a write write repo, change the HEAD reference - its + rw_repo.index.checkout(["lib"], force=True) + + # now that we have a write write repo, change the HEAD reference - its # like git-reset --soft heads = rw_repo.heads assert heads @@ -203,7 +198,7 @@ class TestRefs(TestBase): assert cur_head.commit == head.commit assert not cur_head.is_detached # END for each head - + # detach active_head = heads[0] curhead_commit = active_head.commit @@ -211,50 +206,50 @@ class TestRefs(TestBase): assert cur_head.commit == curhead_commit assert cur_head.is_detached self.failUnlessRaises(TypeError, getattr, cur_head, "reference") - + # tags are references, hence we can point to them some_tag = rw_repo.tags[0] cur_head.reference = some_tag assert not cur_head.is_detached assert cur_head.commit == some_tag.commit - assert isinstance(cur_head.reference, TagReference) - + assert isinstance(cur_head.reference, TagReference) + # put HEAD back to a real head, otherwise everything else fails cur_head.reference = active_head - + # type check self.failUnlessRaises(ValueError, setattr, cur_head, "reference", "that") - - # head handling + + # head handling commit = 'HEAD' prev_head_commit = cur_head.commit for count, new_name in enumerate(("my_new_head", "feature/feature1")): - actual_commit = commit+"^"*count + actual_commit = commit + "^" * count new_head = Head.create(rw_repo, new_name, actual_commit) assert new_head.is_detached assert cur_head.commit == prev_head_commit assert isinstance(new_head, Head) # already exists, but has the same value, so its fine Head.create(rw_repo, new_name, new_head.commit) - + # its not fine with a different value self.failUnlessRaises(OSError, Head.create, rw_repo, new_name, new_head.commit.parents[0]) - + # force it new_head = Head.create(rw_repo, new_name, actual_commit, force=True) old_path = new_head.path old_name = new_head.name - + assert new_head.rename("hello").name == "hello" assert new_head.rename("hello/world").name == "hello/world" assert new_head.rename(old_name).name == old_name and new_head.path == old_path - + # rename with force tmp_head = Head.create(rw_repo, "tmphead") self.failUnlessRaises(GitCommandError, tmp_head.rename, new_head) tmp_head.rename(new_head, force=True) assert tmp_head == new_head and tmp_head.object == new_head.object - + logfile = RefLog.path(tmp_head) assert os.path.isfile(logfile) Head.delete(rw_repo, tmp_head) @@ -265,17 +260,17 @@ class TestRefs(TestBase): # force on deletion testing would be missing here, code looks okay though ;) # END for each new head name self.failUnlessRaises(TypeError, RemoteReference.create, rw_repo, "some_name") - + # tag ref tag_name = "1.0.2" light_tag = TagReference.create(rw_repo, tag_name) self.failUnlessRaises(GitCommandError, TagReference.create, rw_repo, tag_name) - light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force = True) + light_tag = TagReference.create(rw_repo, tag_name, "HEAD~1", force=True) assert isinstance(light_tag, TagReference) assert light_tag.name == tag_name assert light_tag.commit == cur_head.commit.parents[0] assert light_tag.tag is None - + # tag with tag object other_tag_name = "releases/1.0.2RC" msg = "my mighty tag\nsecond line" @@ -284,18 +279,18 @@ class TestRefs(TestBase): assert obj_tag.name == other_tag_name assert obj_tag.commit == cur_head.commit assert obj_tag.tag is not None - + TagReference.delete(rw_repo, light_tag, obj_tag) tags = rw_repo.tags assert light_tag not in tags and obj_tag not in tags - + # remote deletion remote_refs_so_far = 0 - remotes = rw_repo.remotes + remotes = rw_repo.remotes assert remotes for remote in remotes: refs = remote.refs - + # If a HEAD exists, it must be deleted first. Otherwise it might # end up pointing to an invalid ref it the ref was deleted before. remote_head_name = "HEAD" @@ -303,30 +298,30 @@ class TestRefs(TestBase): RemoteReference.delete(rw_repo, refs[remote_head_name]) del(refs[remote_head_name]) #END handle HEAD deletion - + RemoteReference.delete(rw_repo, *refs) remote_refs_so_far += len(refs) for ref in refs: assert ref.remote_name == remote.name # END for each ref to delete assert remote_refs_so_far - + for remote in remotes: # remotes without references throw self.failUnlessRaises(AssertionError, getattr, remote, 'refs') # END for each remote - + # change where the active head points to if cur_head.is_detached: cur_head.reference = rw_repo.heads[0] - + head = cur_head.reference old_commit = head.commit head.commit = old_commit.parents[0] assert head.commit == old_commit.parents[0] assert head.commit == cur_head.commit head.commit = old_commit - + # setting a non-commit as commit fails, but succeeds as object head_tree = head.commit.tree self.failUnlessRaises(ValueError, setattr, head, 'commit', head_tree) @@ -335,8 +330,8 @@ class TestRefs(TestBase): head.object = head_tree assert head.object == head_tree # cannot query tree as commit - self.failUnlessRaises(TypeError, getattr, head, 'commit') - + self.failUnlessRaises(TypeError, getattr, head, 'commit') + # set the commit directly using the head. This would never detach the head assert not cur_head.is_detached head.object = old_commit @@ -346,58 +341,58 @@ class TestRefs(TestBase): assert cur_head.is_detached cur_head.commit = parent_commit assert cur_head.is_detached and cur_head.commit == parent_commit - + cur_head.reference = head assert not cur_head.is_detached cur_head.commit = parent_commit assert not cur_head.is_detached assert head.commit == parent_commit - + # test checkout active_branch = rw_repo.active_branch for head in rw_repo.heads: checked_out_head = head.checkout() assert checked_out_head == head # END for each head to checkout - + # checkout with branch creation new_head = active_branch.checkout(b="new_head") assert active_branch != rw_repo.active_branch assert new_head == rw_repo.active_branch - + # checkout with force as we have a changed a file # clear file - open(new_head.commit.tree.blobs[-1].abspath,'w').close() + open(new_head.commit.tree.blobs[-1].abspath, 'w').close() assert len(new_head.commit.diff(None)) - + # create a new branch that is likely to touch the file we changed - far_away_head = rw_repo.create_head("far_head",'HEAD~100') + far_away_head = rw_repo.create_head("far_head", 'HEAD~100') self.failUnlessRaises(GitCommandError, far_away_head.checkout) assert active_branch == active_branch.checkout(force=True) assert rw_repo.head.reference != far_away_head - + # test reference creation partial_ref = 'sub/ref' full_ref = 'refs/%s' % partial_ref ref = Reference.create(rw_repo, partial_ref) assert ref.path == full_ref assert ref.object == rw_repo.head.commit - + self.failUnlessRaises(OSError, Reference.create, rw_repo, full_ref, 'HEAD~20') # it works if it is at the same spot though and points to the same reference assert Reference.create(rw_repo, full_ref, 'HEAD').path == full_ref Reference.delete(rw_repo, full_ref) - + # recreate the reference using a full_ref ref = Reference.create(rw_repo, full_ref) assert ref.path == full_ref assert ref.object == rw_repo.head.commit - + # recreate using force ref = Reference.create(rw_repo, partial_ref, 'HEAD~1', force=True) assert ref.path == full_ref assert ref.object == rw_repo.head.commit.parents[0] - + # rename it orig_obj = ref.object for name in ('refs/absname', 'rela_name', 'feature/rela_name'): @@ -407,10 +402,10 @@ class TestRefs(TestBase): assert ref_new_name.object == orig_obj assert ref_new_name == ref # END for each name type - + # References that don't exist trigger an error if we want to access them self.failUnlessRaises(ValueError, getattr, Reference(rw_repo, "refs/doesntexist"), 'commit') - + # exists, fail unless we force ex_ref_path = far_away_head.path self.failUnlessRaises(OSError, ref.rename, ex_ref_path) @@ -418,35 +413,35 @@ class TestRefs(TestBase): far_away_head.commit = ref.commit ref.rename(ex_ref_path) assert ref.path == ex_ref_path and ref.object == orig_obj - assert ref.rename(ref.path).path == ex_ref_path # rename to same name - + assert ref.rename(ref.path).path == ex_ref_path # rename to same name + # create symbolic refs symref_path = "symrefs/sym" symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference) assert symref.path == symref_path assert symref.reference == cur_head.reference - + self.failUnlessRaises(OSError, SymbolicReference.create, rw_repo, symref_path, cur_head.reference.commit) - # it works if the new ref points to the same reference + # it works if the new ref points to the same reference SymbolicReference.create(rw_repo, symref.path, symref.reference).path == symref.path SymbolicReference.delete(rw_repo, symref) # would raise if the symref wouldn't have been deletedpbl symref = SymbolicReference.create(rw_repo, symref_path, cur_head.reference) - + # test symbolic references which are not at default locations like HEAD # or FETCH_HEAD - they may also be at spots in refs of course symbol_ref_path = "refs/symbol_ref" symref = SymbolicReference(rw_repo, symbol_ref_path) assert symref.path == symbol_ref_path symbol_ref_abspath = os.path.join(rw_repo.git_dir, symref.path) - + # set it symref.reference = new_head assert symref.reference == new_head assert os.path.isfile(symbol_ref_abspath) assert symref.commit == new_head.commit - - for name in ('absname','folder/rela_name'): + + for name in ('absname', 'folder/rela_name'): symref_new_name = symref.rename(name) assert isinstance(symref_new_name, SymbolicReference) assert name in symref_new_name.path @@ -454,10 +449,10 @@ class TestRefs(TestBase): assert symref_new_name == symref assert not symref.is_detached # END for each ref - + # create a new non-head ref just to be sure we handle it even if packed Reference.create(rw_repo, full_ref) - + # test ref listing - assure we have packed refs rw_repo.git.pack_refs(all=True, prune=True) heads = rw_repo.heads @@ -465,14 +460,14 @@ class TestRefs(TestBase): assert new_head in heads assert active_branch in heads assert rw_repo.tags - + # we should be able to iterate all symbolic refs as well - in that case # we should expect only symbolic references to be returned for symref in SymbolicReference.iter_items(rw_repo): assert not symref.is_detached - + # when iterating references, we can get references and symrefs - # when deleting all refs, I'd expect them to be gone ! Even from + # when deleting all refs, I'd expect them to be gone ! Even from # the packed ones # For this to work, we must not be on any branch rw_repo.head.reference = rw_repo.head.commit @@ -484,62 +479,60 @@ class TestRefs(TestBase): # END delete ref # END for each ref to iterate and to delete assert deleted_refs - + for ref in Reference.iter_items(rw_repo): if ref.is_detached: assert ref not in deleted_refs # END for each ref - - # reattach head - head will not be returned if it is not a symbolic + + # reattach head - head will not be returned if it is not a symbolic # ref rw_repo.head.reference = Head.create(rw_repo, "master") - + # At least the head should still exist assert os.path.isfile(os.path.join(rw_repo.git_dir, 'HEAD')) refs = list(SymbolicReference.iter_items(rw_repo)) assert len(refs) == 1 - - + # test creation of new refs from scratch for path in ("basename", "dir/somename", "dir2/subdir/basename"): - # REFERENCES + # REFERENCES ############ fpath = Reference.to_full_path(path) ref_fp = Reference.from_path(rw_repo, fpath) assert not ref_fp.is_valid() ref = Reference(rw_repo, fpath) assert ref == ref_fp - + # can be created by assigning a commit ref.commit = rw_repo.head.commit assert ref.is_valid() - + # if the assignment raises, the ref doesn't exist Reference.delete(ref.repo, ref.path) assert not ref.is_valid() self.failUnlessRaises(ValueError, setattr, ref, 'commit', "nonsense") assert not ref.is_valid() - + # I am sure I had my reason to make it a class method at first, but # now it doesn't make so much sense anymore, want an instance method as well # See http://byronimo.lighthouseapp.com/projects/51787-gitpython/tickets/27 Reference.delete(ref.repo, ref.path) assert not ref.is_valid() - + ref.object = rw_repo.head.commit assert ref.is_valid() - + Reference.delete(ref.repo, ref.path) assert not ref.is_valid() self.failUnlessRaises(ValueError, setattr, ref, 'object', "nonsense") assert not ref.is_valid() - + # END for each path - + def test_dereference_recursive(self): # for now, just test the HEAD assert SymbolicReference.dereference_recursive(self.rorepo, 'HEAD') - + def test_reflog(self): assert isinstance(self.rorepo.heads.master.log(), RefLog) - diff --git a/git/test/test_remote.py b/git/test/test_remote.py index b1248096..a5a73ce1 100644 --- a/git/test/test_remote.py +++ b/git/test/test_remote.py @@ -12,17 +12,19 @@ import shutil import os import random -# assure we have repeatable results +# assure we have repeatable results random.seed(0) + class TestRemoteProgress(RemoteProgress): - __slots__ = ( "_seen_lines", "_stages_per_op", '_num_progress_messages' ) + __slots__ = ("_seen_lines", "_stages_per_op", '_num_progress_messages') + def __init__(self): super(TestRemoteProgress, self).__init__() self._seen_lines = list() self._stages_per_op = dict() self._num_progress_messages = 0 - + def _parse_progress_line(self, line): # we may remove the line later if it is dropped # Keep it for debugging @@ -30,37 +32,36 @@ class TestRemoteProgress(RemoteProgress): rval = super(TestRemoteProgress, self)._parse_progress_line(line) assert len(line) > 1, "line %r too short" % line return rval - + def line_dropped(self, line): try: self._seen_lines.remove(line) except ValueError: pass - + def update(self, op_code, cur_count, max_count=None, message=''): # check each stage only comes once op_id = op_code & self.OP_MASK assert op_id in (self.COUNTING, self.COMPRESSING, self.WRITING) - + self._stages_per_op.setdefault(op_id, 0) - self._stages_per_op[ op_id ] = self._stages_per_op[ op_id ] | (op_code & self.STAGE_MASK) - - if op_code & (self.WRITING|self.END) == (self.WRITING|self.END): + self._stages_per_op[op_id] = self._stages_per_op[op_id] | (op_code & self.STAGE_MASK) + + if op_code & (self.WRITING | self.END) == (self.WRITING | self.END): assert message # END check we get message - + self._num_progress_messages += 1 - - + def make_assertion(self): # we don't always receive messages if not self._seen_lines: return - + # sometimes objects are not compressed which is okay - assert len(self._seen_ops) in (2,3) + assert len(self._seen_ops) in (2, 3) assert self._stages_per_op - + # must have seen all stages for op, stages in self._stages_per_op.items(): assert stages & self.STAGE_MASK == self.STAGE_MASK @@ -68,15 +69,14 @@ class TestRemoteProgress(RemoteProgress): def assert_received_message(self): assert self._num_progress_messages - + class TestRemote(TestBase): - + def _print_fetchhead(self, repo): fp = open(os.path.join(repo.git_dir, "FETCH_HEAD")) fp.close() - - + def _do_test_fetch_result(self, results, remote): # self._print_fetchhead(remote.repo) assert len(results) > 0 and isinstance(results[0], FetchInfo) @@ -84,15 +84,15 @@ class TestRemote(TestBase): assert isinstance(info.note, basestring) if isinstance(info.ref, Reference): assert info.flags != 0 - # END reference type flags handling + # END reference type flags handling assert isinstance(info.ref, (SymbolicReference, Reference)) - if info.flags & (info.FORCED_UPDATE|info.FAST_FORWARD): + if info.flags & (info.FORCED_UPDATE | info.FAST_FORWARD): assert isinstance(info.old_commit, Commit) else: assert info.old_commit is None - # END forced update checking + # END forced update checking # END for each info - + def _do_test_push_result(self, results, remote): assert len(results) > 0 and isinstance(results[0], PushInfo) for info in results: @@ -108,32 +108,31 @@ class TestRemote(TestBase): assert has_one else: # there must be a remote commit - if info.flags & info.DELETED == 0: + if info.flags & info.DELETED == 0: assert isinstance(info.local_ref, Reference) else: assert info.local_ref is None assert type(info.remote_ref) in (TagReference, RemoteReference) # END error checking - # END for each info - - + # END for each info + def _do_test_fetch_info(self, repo): self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "nonsense", '') self.failUnlessRaises(ValueError, FetchInfo._from_line, repo, "? [up to date] 0.1.7RC -> origin/0.1.7RC", '') - + def _commit_random_file(self, repo): #Create a file with a random name and random data and commit it to repo. # Return the commited absolute file path index = repo.index - new_file = self._make_file(os.path.basename(tempfile.mktemp()),str(random.random()), repo) + new_file = self._make_file(os.path.basename(tempfile.mktemp()), str(random.random()), repo) index.add([new_file]) index.commit("Committing %s" % new_file) return new_file - - def _do_test_fetch(self,remote, rw_repo, remote_repo): + + def _do_test_fetch(self, remote, rw_repo, remote_repo): # specialized fetch testing to de-clutter the main test self._do_test_fetch_info(rw_repo) - + def fetch_and_test(remote, **kwargs): progress = TestRemoteProgress() kwargs['progress'] = progress @@ -142,60 +141,60 @@ class TestRemote(TestBase): self._do_test_fetch_result(res, remote) return res # END fetch and check - + def get_info(res, remote, name): - return res["%s/%s"%(remote,name)] - + return res["%s/%s" % (remote, name)] + # put remote head to master as it is garantueed to exist remote_repo.head.reference = remote_repo.heads.master - + res = fetch_and_test(remote) # all uptodate for info in res: assert info.flags & info.HEAD_UPTODATE - + # rewind remote head to trigger rejection # index must be false as remote is a bare repo rhead = remote_repo.head remote_commit = rhead.commit rhead.reset("HEAD~2", index=False) res = fetch_and_test(remote) - mkey = "%s/%s"%(remote,'master') + mkey = "%s/%s" % (remote, 'master') master_info = res[mkey] assert master_info.flags & FetchInfo.FORCED_UPDATE and master_info.note is not None - + # normal fast forward - set head back to previous one rhead.commit = remote_commit res = fetch_and_test(remote) assert res[mkey].flags & FetchInfo.FAST_FORWARD - + # new remote branch new_remote_branch = Head.create(remote_repo, "new_branch") res = fetch_and_test(remote) new_branch_info = get_info(res, remote, new_remote_branch) assert new_branch_info.flags & FetchInfo.NEW_HEAD - + # remote branch rename ( causes creation of a new one locally ) new_remote_branch.rename("other_branch_name") res = fetch_and_test(remote) other_branch_info = get_info(res, remote, new_remote_branch) assert other_branch_info.ref.commit == new_branch_info.ref.commit - + # remove new branch Head.delete(new_remote_branch.repo, new_remote_branch) res = fetch_and_test(remote) # deleted remote will not be fetched self.failUnlessRaises(IndexError, get_info, res, remote, new_remote_branch) - + # prune stale tracking branches stale_refs = remote.stale_refs assert len(stale_refs) == 2 and isinstance(stale_refs[0], RemoteReference) RemoteReference.delete(rw_repo, *stale_refs) - + # test single branch fetch with refspec including target remote - res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master"%remote) + res = fetch_and_test(remote, refspec="master:refs/remotes/%s/master" % remote) assert len(res) == 1 and get_info(res, remote, 'master') - + # ... with respec and no target res = fetch_and_test(remote, refspec='master') assert len(res) == 1 @@ -203,27 +202,27 @@ class TestRemote(TestBase): # ... multiple refspecs res = fetch_and_test(remote, refspec=['master', 'fred']) assert len(res) == 1 - + # add new tag reference rtag = TagReference.create(remote_repo, "1.0-RV_hello.there") res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert isinstance(tinfo.ref, TagReference) and tinfo.ref.commit == rtag.commit assert tinfo.flags & tinfo.NEW_TAG - + # adjust tag commit Reference.set_object(rtag, rhead.commit.parents[0].parents[0]) res = fetch_and_test(remote, tags=True) tinfo = res[str(rtag)] assert tinfo.commit == rtag.commit assert tinfo.flags & tinfo.TAG_UPDATE - + # delete remote tag - local one will stay TagReference.delete(remote_repo, rtag) res = fetch_and_test(remote, tags=True) self.failUnlessRaises(IndexError, get_info, res, remote, str(rtag)) - - # provoke to receive actual objects to see what kind of output we have to + + # provoke to receive actual objects to see what kind of output we have to # expect. For that we need a remote transport protocol # Create a new UN-shared repo and fetch into it after we pushed a change # to the shared repo @@ -231,31 +230,31 @@ class TestRemote(TestBase): # must clone with a local path for the repo implementation not to freak out # as it wants local paths only ( which I can understand ) other_repo = remote_repo.clone(other_repo_dir, shared=False) - remote_repo_url = "git://localhost%s"%remote_repo.git_dir - + remote_repo_url = "git://localhost%s" % remote_repo.git_dir + # put origin to git-url - other_origin = other_repo.remotes.origin + other_origin = other_repo.remotes.origin other_origin.config_writer.set("url", remote_repo_url) # it automatically creates alternates as remote_repo is shared as well. # It will use the transport though and ignore alternates when fetching # assert not other_repo.alternates # this would fail - + # assure we are in the right state rw_repo.head.reset(remote.refs.master, working_tree=True) try: self._commit_random_file(rw_repo) remote.push(rw_repo.head.reference) - - # here I would expect to see remote-information about packing - # objects and so on. Unfortunately, this does not happen + + # here I would expect to see remote-information about packing + # objects and so on. Unfortunately, this does not happen # if we are redirecting the output - git explicitly checks for this # and only provides progress information to ttys res = fetch_and_test(other_origin) finally: shutil.rmtree(other_repo_dir) # END test and cleanup - - def _assert_push_and_pull(self,remote, rw_repo, remote_repo): + + def _assert_push_and_pull(self, remote, rw_repo, remote_repo): # push our changes lhead = rw_repo.head lindex = rw_repo.index @@ -263,16 +262,16 @@ class TestRemote(TestBase): try: lhead.reference = rw_repo.heads.master except AttributeError: - # if the author is on a non-master branch, the clones might not have + # if the author is on a non-master branch, the clones might not have # a local master yet. We simply create it lhead.reference = rw_repo.create_head('master') - # END master handling + # END master handling lhead.reset(remote.refs.master, working_tree=True) - + # push without spec should fail ( without further configuration ) # well, works nicely # self.failUnlessRaises(GitCommandError, remote.push) - + # simple file push self._commit_random_file(rw_repo) progress = TestRemoteProgress() @@ -280,25 +279,25 @@ class TestRemote(TestBase): assert isinstance(res, IterableList) self._do_test_push_result(res, remote) progress.make_assertion() - + # rejected - undo last commit lhead.reset("HEAD~1") res = remote.push(lhead.reference) - assert res[0].flags & PushInfo.ERROR + assert res[0].flags & PushInfo.ERROR assert res[0].flags & PushInfo.REJECTED self._do_test_push_result(res, remote) - + # force rejected pull res = remote.push('+%s' % lhead.reference) - assert res[0].flags & PushInfo.ERROR == 0 + assert res[0].flags & PushInfo.ERROR == 0 assert res[0].flags & PushInfo.FORCED_UPDATE self._do_test_push_result(res, remote) - + # invalid refspec res = remote.push("hellothere") assert len(res) == 0 - - # push new tags + + # push new tags progress = TestRemoteProgress() to_be_updated = "my_tag.1.0RV" new_tag = TagReference.create(rw_repo, to_be_updated) @@ -307,26 +306,26 @@ class TestRemote(TestBase): assert res[-1].flags & PushInfo.NEW_TAG progress.make_assertion() self._do_test_push_result(res, remote) - + # update push new tags # Rejection is default new_tag = TagReference.create(rw_repo, to_be_updated, ref='HEAD~1', force=True) res = remote.push(tags=True) self._do_test_push_result(res, remote) assert res[-1].flags & PushInfo.REJECTED and res[-1].flags & PushInfo.ERROR - + # push force this tag res = remote.push("+%s" % new_tag.path) assert res[-1].flags & PushInfo.ERROR == 0 and res[-1].flags & PushInfo.FORCED_UPDATE - + # delete tag - have to do it using refspec res = remote.push(":%s" % new_tag.path) self._do_test_push_result(res, remote) assert res[0].flags & PushInfo.DELETED - # Currently progress is not properly transferred, especially not using + # Currently progress is not properly transferred, especially not using # the git daemon # progress.assert_received_message() - + # push new branch new_head = Head.create(rw_repo, "my_new_branch") progress = TestRemoteProgress() @@ -334,45 +333,45 @@ class TestRemote(TestBase): assert res[0].flags & PushInfo.NEW_HEAD progress.make_assertion() self._do_test_push_result(res, remote) - + # delete new branch on the remote end and locally res = remote.push(":%s" % new_head.path) self._do_test_push_result(res, remote) Head.delete(rw_repo, new_head) assert res[-1].flags & PushInfo.DELETED - + # --all res = remote.push(all=True) self._do_test_push_result(res, remote) - + remote.pull('master') - - # cleanup - delete created tags and branches as we are in an innerloop on + + # cleanup - delete created tags and branches as we are in an innerloop on # the same repository TagReference.delete(rw_repo, new_tag, other_tag) remote.push(":%s" % other_tag.path) - + @with_rw_and_rw_remote_repo('0.1.6') def test_base(self, rw_repo, remote_repo): num_remotes = 0 remote_set = set() ran_fetch_test = False - + for remote in rw_repo.remotes: num_remotes += 1 assert remote == remote assert str(remote) != repr(remote) remote_set.add(remote) remote_set.add(remote) # should already exist - - # REFS + + # REFS refs = remote.refs assert refs for ref in refs: assert ref.remote_name == remote.name assert ref.remote_head # END for each ref - + # OPTIONS # cannot use 'fetch' key anymore as it is now a method for opt in ("url", ): @@ -380,10 +379,10 @@ class TestRemote(TestBase): reader = remote.config_reader assert reader.get(opt) == val assert reader.get_value(opt, None) == val - + # unable to write with a reader self.failUnlessRaises(IOError, reader.set, opt, "test") - + # change value writer = remote.config_writer new_val = "myval" @@ -393,9 +392,9 @@ class TestRemote(TestBase): assert writer.get(opt) == val del(writer) assert getattr(remote, opt) == val - # END for each default option key - - # RENAME + # END for each default option key + + # RENAME other_name = "totally_other_name" prev_name = remote.name assert remote.rename(other_name) == remote @@ -404,98 +403,96 @@ class TestRemote(TestBase): for time in range(2): assert remote.rename(prev_name).name == prev_name # END for each rename ( back to prev_name ) - + # PUSH/PULL TESTING self._assert_push_and_pull(remote, rw_repo, remote_repo) - + # FETCH TESTING - # Only for remotes - local cases are the same or less complicated + # Only for remotes - local cases are the same or less complicated # as additional progress information will never be emitted if remote.name == "daemon_origin": self._do_test_fetch(remote, rw_repo, remote_repo) ran_fetch_test = True - # END fetch test - + # END fetch test + remote.update() # END for each remote - + assert ran_fetch_test assert num_remotes assert num_remotes == len(remote_set) - + origin = rw_repo.remote('origin') assert origin == rw_repo.remotes.origin - + @with_rw_repo('HEAD', bare=True) def test_creation_and_removal(self, bare_rw_repo): new_name = "test_new_one" arg_list = (new_name, "git@server:hello.git") - remote = Remote.create(bare_rw_repo, *arg_list ) + remote = Remote.create(bare_rw_repo, *arg_list) assert remote.name == "test_new_one" assert remote in bare_rw_repo.remotes - + # create same one again self.failUnlessRaises(GitCommandError, Remote.create, bare_rw_repo, *arg_list) - + Remote.remove(bare_rw_repo, new_name) - + for remote in bare_rw_repo.remotes: if remote.name == new_name: raise AssertionError("Remote removal failed") # END if deleted remote matches existing remote's name # END for each remote - + def test_fetch_info(self): # assure we can handle remote-tracking branches fetch_info_line_fmt = "c437ee5deb8d00cf02f03720693e4c802e99f390 not-for-merge %s '0.3' of git://github.com/gitpython-developers/GitPython" remote_info_line_fmt = "* [new branch] nomatter -> %s" fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "local/master", + remote_info_line_fmt % "local/master", fetch_info_line_fmt % 'remote-tracking branch') assert fi.ref.is_valid() assert fi.ref.commit - + # handles non-default refspecs: One can specify a different path in refs/remotes # or a special path just in refs/something for instance - + fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "subdir/tagname", + remote_info_line_fmt % "subdir/tagname", fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path.startswith('refs/tags') - + # it could be in a remote direcftory though fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/tags/tagname", + remote_info_line_fmt % "remotename/tags/tagname", fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path.startswith('refs/remotes/') - + # it can also be anywhere ! tag_path = "refs/something/remotename/tags/tagname" fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % tag_path, + remote_info_line_fmt % tag_path, fetch_info_line_fmt % 'tag') - + assert isinstance(fi.ref, TagReference) assert fi.ref.path == tag_path - + # branches default to refs/remotes fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "remotename/branch", + remote_info_line_fmt % "remotename/branch", fetch_info_line_fmt % 'branch') - + assert isinstance(fi.ref, RemoteReference) assert fi.ref.remote_name == 'remotename' - + # but you can force it anywhere, in which case we only have a references fi = FetchInfo._from_line(self.rorepo, - remote_info_line_fmt % "refs/something/branch", + remote_info_line_fmt % "refs/something/branch", fetch_info_line_fmt % 'branch') - + assert type(fi.ref) is Reference assert fi.ref.path == "refs/something/branch" - - diff --git a/git/test/test_repo.py b/git/test/test_repo.py index 9770d97c..d6568d0b 100644 --- a/git/test/test_repo.py +++ b/git/test/test_repo.py @@ -24,7 +24,7 @@ from cStringIO import StringIO class TestRepo(TestBase): - + @raises(InvalidGitRepositoryError) def test_new_should_raise_on_invalid_repo_location(self): Repo(tempfile.gettempdir()) @@ -52,21 +52,21 @@ class TestRepo(TestBase): def test_heads_should_populate_head_data(self): for head in self.rorepo.heads: assert head.name - assert isinstance(head.commit,Commit) - # END for each head - + assert isinstance(head.commit, Commit) + # END for each head + assert isinstance(self.rorepo.heads.master, Head) assert isinstance(self.rorepo.heads['master'], Head) - + def test_tree_from_revision(self): tree = self.rorepo.tree('0.1.6') - assert len(tree.hexsha) == 40 + assert len(tree.hexsha) == 40 assert tree.type == "tree" assert self.rorepo.tree(tree) == tree - + # try from invalid revision that does not exist self.failUnlessRaises(BadObject, self.rorepo.tree, 'hello world') - + def test_commit_from_revision(self): commit = self.rorepo.commit('0.1.4') assert commit.type == 'commit' @@ -76,7 +76,7 @@ class TestRepo(TestBase): mc = 10 commits = list(self.rorepo.iter_commits('0.1.6', max_count=mc)) assert len(commits) == mc - + c = commits[0] assert_equal('9a4b1d4d11eee3c5362a4152216376e634bd14cf', c.hexsha) assert_equal(["c76852d0bff115720af3f27acdb084c59361e5f6"], [p.hexsha for p in c.parents]) @@ -84,11 +84,11 @@ class TestRepo(TestBase): assert_equal("Michael Trier", c.author.name) assert_equal("mtrier@gmail.com", c.author.email) assert_equal(1232829715, c.authored_date) - assert_equal(5*3600, c.author_tz_offset) + assert_equal(5 * 3600, c.author_tz_offset) assert_equal("Michael Trier", c.committer.name) assert_equal("mtrier@gmail.com", c.committer.email) assert_equal(1232829715, c.committed_date) - assert_equal(5*3600, c.committer_tz_offset) + assert_equal(5 * 3600, c.committer_tz_offset) assert_equal("Bumped version 0.1.6\n", c.message) c = commits[1] @@ -103,34 +103,32 @@ class TestRepo(TestBase): # END for each tree assert num_trees == mc - def _assert_empty_repo(self, repo): - # test all kinds of things with an empty, freshly initialized repo. + # test all kinds of things with an empty, freshly initialized repo. # It should throw good errors - + # entries should be empty assert len(repo.index.entries) == 0 - + # head is accessible assert repo.head assert repo.head.ref assert not repo.head.is_valid() - + # we can change the head to some other ref head_ref = Head.from_path(repo, Head.to_full_path('some_head')) assert not head_ref.is_valid() repo.head.ref = head_ref - + # is_dirty can handle all kwargs for args in ((1, 0, 0), (0, 1, 0), (0, 0, 1)): assert not repo.is_dirty(*args) - # END for each arg - + # END for each arg + # we can add a file to the index ( if we are not bare ) if not repo.bare: pass # END test repos with working tree - def test_init(self): prev_cwd = os.getcwd() @@ -145,15 +143,14 @@ class TestRepo(TestBase): assert isinstance(r, Repo) assert r.bare == True assert os.path.isdir(r.git_dir) - + self._assert_empty_repo(r) - + # test clone clone_path = path + "_clone" rc = r.clone(clone_path) self._assert_empty_repo(rc) - - + try: shutil.rmtree(clone_path) except OSError: @@ -161,11 +158,11 @@ class TestRepo(TestBase): # of the parent directory pass # END exception handling - + # try again, this time with the absolute version rc = Repo.clone_from(r.git_dir, clone_path) self._assert_empty_repo(rc) - + shutil.rmtree(git_dir_abs) try: shutil.rmtree(clone_path) @@ -174,14 +171,14 @@ class TestRepo(TestBase): # of the parent directory pass # END exception handling - + # END for each path - + os.makedirs(git_dir_rela) os.chdir(git_dir_rela) r = Repo.init(bare=False) r.bare == False - + self._assert_empty_repo(r) finally: try: @@ -190,23 +187,23 @@ class TestRepo(TestBase): pass os.chdir(prev_cwd) # END restore previous state - + def test_bare_property(self): self.rorepo.bare def test_daemon_export(self): orig_val = self.rorepo.daemon_export self.rorepo.daemon_export = not orig_val - assert self.rorepo.daemon_export == ( not orig_val ) + assert self.rorepo.daemon_export == (not orig_val) self.rorepo.daemon_export = orig_val assert self.rorepo.daemon_export == orig_val - + def test_alternates(self): cur_alternates = self.rorepo.alternates # empty alternates self.rorepo.alternates = [] assert self.rorepo.alternates == [] - alts = [ "other/location", "this/location" ] + alts = ["other/location", "this/location"] self.rorepo.alternates = alts assert alts == self.rorepo.alternates self.rorepo.alternates = cur_alternates @@ -219,13 +216,13 @@ class TestRepo(TestBase): orig_value = self.rorepo._bare self.rorepo._bare = True assert_false(self.rorepo.is_dirty()) - self.rorepo._bare = orig_value + self.rorepo._bare = orig_value def test_is_dirty(self): self.rorepo._bare = False - for index in (0,1): - for working_tree in (0,1): - for untracked_files in (0,1): + for index in (0, 1): + for working_tree in (0, 1): + for untracked_files in (0, 1): assert self.rorepo.is_dirty(index, working_tree, untracked_files) in (True, False) # END untracked files # END working tree @@ -241,27 +238,27 @@ class TestRepo(TestBase): def test_index(self): index = self.rorepo.index assert isinstance(index, IndexFile) - + def test_tag(self): assert self.rorepo.tag('refs/tags/0.1.5').commit - + def test_archive(self): tmpfile = os.tmpfile() self.rorepo.archive(tmpfile, '0.1.5') assert tmpfile.tell() - + @patch.object(Git, '_call_process') def test_should_display_blame_information(self, git): git.return_value = fixture('blame') - b = self.rorepo.blame( 'master', 'lib/git.py') + b = self.rorepo.blame('master', 'lib/git.py') assert_equal(13, len(b)) - assert_equal( 2, len(b[0]) ) + assert_equal(2, len(b[0])) # assert_equal(25, reduce(lambda acc, x: acc + len(x[-1]), b)) assert_equal(hash(b[0][0]), hash(b[9][0])) c = b[0][0] assert_true(git.called) assert_equal(git.call_args, (('blame', 'master', '--', 'lib/git.py'), {'p': True})) - + assert_equal('634396b2f541a9f2d58b00be1a07f0c358b999b3', c.hexsha) assert_equal('Tom Preston-Werner', c.author.name) assert_equal('tom@mojombo.com', c.author.email) @@ -270,13 +267,13 @@ class TestRepo(TestBase): assert_equal('tom@mojombo.com', c.committer.email) assert_equal(1191997100, c.committed_date) assert_equal('initial grit setup', c.message) - + # test the 'lines per commit' entries tlist = b[0][1] - assert_true( tlist ) - assert_true( isinstance( tlist[0], basestring ) ) - assert_true( len( tlist ) < sum( len(t) for t in tlist ) ) # test for single-char bug - + assert_true(tlist) + assert_true(isinstance(tlist[0], basestring)) + assert_true(len(tlist) < sum(len(t) for t in tlist)) # test for single-char bug + def test_blame_real(self): c = 0 for item in self.rorepo.head.commit.tree.traverse( @@ -285,20 +282,20 @@ class TestRepo(TestBase): b = self.rorepo.blame(self.rorepo.head, item.path) #END for each item to traverse assert c - + def test_untracked_files(self): base = self.rorepo.working_tree_dir - files = ( join_path_native(base, "__test_myfile"), - join_path_native(base, "__test_other_file") ) + files = (join_path_native(base, "__test_myfile"), + join_path_native(base, "__test_other_file")) num_recently_untracked = 0 try: for fpath in files: - fd = open(fpath,"wb") + fd = open(fpath, "wb") fd.close() # END for each filename untracked_files = self.rorepo.untracked_files num_recently_untracked = len(untracked_files) - + # assure we have all names - they are relative to the git-dir num_test_untracked = 0 for utfile in untracked_files: @@ -308,80 +305,81 @@ class TestRepo(TestBase): for fpath in files: if os.path.isfile(fpath): os.remove(fpath) - # END handle files - + # END handle files + assert len(self.rorepo.untracked_files) == (num_recently_untracked - len(files)) - + def test_config_reader(self): - reader = self.rorepo.config_reader() # all config files + reader = self.rorepo.config_reader() # all config files assert reader.read_only reader = self.rorepo.config_reader("repository") # single config file assert reader.read_only - + def test_config_writer(self): for config_level in self.rorepo.config_level: try: writer = self.rorepo.config_writer(config_level) assert not writer.read_only except IOError: - # its okay not to get a writer for some configuration files if we + # its okay not to get a writer for some configuration files if we # have no permissions - pass - # END for each config level - + pass + # END for each config level + def test_creation_deletion(self): - # just a very quick test to assure it generally works. There are + # just a very quick test to assure it generally works. There are # specialized cases in the test_refs module head = self.rorepo.create_head("new_head", "HEAD~1") self.rorepo.delete_head(head) - + tag = self.rorepo.create_tag("new_tag", "HEAD~2") self.rorepo.delete_tag(tag) self.rorepo.config_writer() remote = self.rorepo.create_remote("new_remote", "git@server:repo.git") self.rorepo.delete_remote(remote) - + def test_comparison_and_hash(self): # this is only a preliminary test, more testing done in test_index assert self.rorepo == self.rorepo and not (self.rorepo != self.rorepo) assert len(set((self.rorepo, self.rorepo))) == 1 - + def test_git_cmd(self): # test CatFileContentStream, just to be very sure we have no fencepost errors # last \n is the terminating newline that it expects l1 = "0123456789\n" l2 = "abcdefghijklmnopqrstxy\n" - l3 = "z\n" + l3 = "z\n" d = "%s%s%s\n" % (l1, l2, l3) - + l1p = l1[:5] - + # full size # size is without terminating newline def mkfull(): - return Git.CatFileContentStream(len(d)-1, StringIO(d)) - + return Git.CatFileContentStream(len(d) - 1, StringIO(d)) + ts = 5 + def mktiny(): return Git.CatFileContentStream(ts, StringIO(d)) - + # readlines no limit s = mkfull() lines = s.readlines() assert len(lines) == 3 and lines[-1].endswith('\n') assert s._stream.tell() == len(d) # must have scrubbed to the end - + # realines line limit s = mkfull() lines = s.readlines(5) assert len(lines) == 1 - + # readlines on tiny sections s = mktiny() lines = s.readlines() assert len(lines) == 1 and lines[0] == l1p - assert s._stream.tell() == ts+1 - + assert s._stream.tell() == ts + 1 + # readline no limit s = mkfull() assert s.readline() == l1 @@ -389,52 +387,51 @@ class TestRepo(TestBase): assert s.readline() == l3 assert s.readline() == '' assert s._stream.tell() == len(d) - + # readline limit s = mkfull() assert s.readline(5) == l1p assert s.readline() == l1[5:] - + # readline on tiny section s = mktiny() assert s.readline() == l1p assert s.readline() == '' - assert s._stream.tell() == ts+1 - + assert s._stream.tell() == ts + 1 + # read no limit s = mkfull() assert s.read() == d[:-1] assert s.read() == '' assert s._stream.tell() == len(d) - + # read limit s = mkfull() assert s.read(5) == l1p assert s.read(6) == l1[5:] assert s._stream.tell() == 5 + 6 # its not yet done - + # read tiny s = mktiny() assert s.read(2) == l1[:2] assert s._stream.tell() == 2 assert s.read() == l1[2:ts] - assert s._stream.tell() == ts+1 - + assert s._stream.tell() == ts + 1 + def _assert_rev_parse_types(self, name, rev_obj): rev_parse = self.rorepo.rev_parse - + if rev_obj.type == 'tag': rev_obj = rev_obj.object - + # tree and blob type obj = rev_parse(name + '^{tree}') assert obj == rev_obj.tree - + obj = rev_parse(name + ':CHANGES') assert obj.type == 'blob' and obj.path == 'CHANGES' assert rev_obj.tree['CHANGES'] == obj - - + def _assert_rev_parse(self, name): """tries multiple different rev-parse syntaxes with the given name :return: parsed object""" @@ -444,62 +441,62 @@ class TestRepo(TestBase): obj = orig_obj.object else: obj = orig_obj - # END deref tags by default - + # END deref tags by default + # try history rev = name + "~" obj2 = rev_parse(rev) assert obj2 == obj.parents[0] self._assert_rev_parse_types(rev, obj2) - + # history with number ni = 11 history = [obj.parents[0]] for pn in range(ni): history.append(history[-1].parents[0]) # END get given amount of commits - + for pn in range(11): - rev = name + "~%i" % (pn+1) + rev = name + "~%i" % (pn + 1) obj2 = rev_parse(rev) assert obj2 == history[pn] self._assert_rev_parse_types(rev, obj2) # END history check - + # parent ( default ) rev = name + "^" obj2 = rev_parse(rev) assert obj2 == obj.parents[0] self._assert_rev_parse_types(rev, obj2) - + # parent with number for pn, parent in enumerate(obj.parents): - rev = name + "^%i" % (pn+1) + rev = name + "^%i" % (pn + 1) assert rev_parse(rev) == parent self._assert_rev_parse_types(rev, parent) # END for each parent - + return orig_obj - + @with_rw_repo('HEAD', bare=False) def test_rw_rev_parse(self, rwrepo): # verify it does not confuse branches with hexsha ids ahead = rwrepo.create_head('aaaaaaaa') assert(rwrepo.rev_parse(str(ahead)) == ahead.commit) - + def test_rev_parse(self): rev_parse = self.rorepo.rev_parse - + # try special case: This one failed at some point, make sure its fixed assert rev_parse("33ebe").hexsha == "33ebe7acec14b25c5f84f35a664803fcab2f7781" - + # start from reference num_resolved = 0 - + for ref in Reference.iter_items(self.rorepo): path_tokens = ref.path.split("/") for pt in range(len(path_tokens)): - path_section = '/'.join(path_tokens[-(pt+1):]) + path_section = '/'.join(path_tokens[-(pt + 1):]) try: obj = self._assert_rev_parse(path_section) assert obj.type == ref.object.type @@ -512,115 +509,109 @@ class TestRepo(TestBase): # END for each token # END for each reference assert num_resolved - + # it works with tags ! tag = self._assert_rev_parse('0.1.4') assert tag.type == 'tag' - + # try full sha directly ( including type conversion ) assert tag.object == rev_parse(tag.object.hexsha) self._assert_rev_parse_types(tag.object.hexsha, tag.object) - - + # multiple tree types result in the same tree: HEAD^{tree}^{tree}:CHANGES rev = '0.1.4^{tree}^{tree}' assert rev_parse(rev) == tag.object.tree - assert rev_parse(rev+':CHANGES') == tag.object.tree['CHANGES'] - - + assert rev_parse(rev + ':CHANGES') == tag.object.tree['CHANGES'] + # try to get parents from first revision - it should fail as no such revision # exists first_rev = "33ebe7acec14b25c5f84f35a664803fcab2f7781" commit = rev_parse(first_rev) assert len(commit.parents) == 0 assert commit.hexsha == first_rev - self.failUnlessRaises(BadObject, rev_parse, first_rev+"~") - self.failUnlessRaises(BadObject, rev_parse, first_rev+"^") - + self.failUnlessRaises(BadObject, rev_parse, first_rev + "~") + self.failUnlessRaises(BadObject, rev_parse, first_rev + "^") + # short SHA1 commit2 = rev_parse(first_rev[:20]) assert commit2 == commit commit2 = rev_parse(first_rev[:5]) assert commit2 == commit - - + # todo: dereference tag into a blob 0.1.7^{blob} - quite a special one # needs a tag which points to a blob - - + # ref^0 returns commit being pointed to, same with ref~0, and ^{} tag = rev_parse('0.1.4') for token in (('~0', '^0', '^{}')): assert tag.object == rev_parse('0.1.4%s' % token) # END handle multiple tokens - + # try partial parsing max_items = 40 for i, binsha in enumerate(self.rorepo.odb.sha_iter()): - assert rev_parse(bin_to_hex(binsha)[:8-(i%2)]).binsha == binsha + assert rev_parse(bin_to_hex(binsha)[:8 - (i % 2)]).binsha == binsha if i > max_items: # this is rather slow currently, as rev_parse returns an object # which requires accessing packs, it has some additional overhead break # END for each binsha in repo - + # missing closing brace commit^{tree self.failUnlessRaises(ValueError, rev_parse, '0.1.4^{tree') - + # missing starting brace self.failUnlessRaises(ValueError, rev_parse, '0.1.4^tree}') - + # REVLOG ####### head = self.rorepo.head - + # need to specify a ref when using the @ syntax self.failUnlessRaises(BadObject, rev_parse, "%s@{0}" % head.commit.hexsha) - + # uses HEAD.ref by default assert rev_parse('@{0}') == head.commit if not head.is_detached: refspec = '%s@{0}' % head.ref.name assert rev_parse(refspec) == head.ref.commit # all additional specs work as well - assert rev_parse(refspec+"^{tree}") == head.commit.tree - assert rev_parse(refspec+":CHANGES").type == 'blob' + assert rev_parse(refspec + "^{tree}") == head.commit.tree + assert rev_parse(refspec + ":CHANGES").type == 'blob' #END operate on non-detached head - + # the last position assert rev_parse('@{1}') != head.commit - + # position doesn't exist self.failUnlessRaises(IndexError, rev_parse, '@{10000}') - + # currently, nothing more is supported self.failUnlessRaises(NotImplementedError, rev_parse, "@{1 week ago}") - + def test_repo_odbtype(self): target_type = GitDB if sys.version_info[1] < 5: target_type = GitCmdObjectDB assert isinstance(self.rorepo.odb, target_type) - + def test_submodules(self): assert len(self.rorepo.submodules) == 1 # non-recursive assert len(list(self.rorepo.iter_submodules())) >= 2 - + assert isinstance(self.rorepo.submodule("gitdb"), Submodule) self.failUnlessRaises(ValueError, self.rorepo.submodule, "doesn't exist") - + @with_rw_repo('HEAD', bare=False) def test_submodule_update(self, rwrepo): # fails in bare mode rwrepo._bare = True self.failUnlessRaises(InvalidGitRepositoryError, rwrepo.submodule_update) rwrepo._bare = False - + # test create submodule sm = rwrepo.submodules[0] sm = rwrepo.create_submodule("my_new_sub", "some_path", join_path_native(self.rorepo.working_tree_dir, sm.path)) assert isinstance(sm, Submodule) - + # note: the rest of this functionality is tested in test_submodule - - diff --git a/git/test/test_stats.py b/git/test/test_stats.py index 2bdb0a89..d827c680 100644 --- a/git/test/test_stats.py +++ b/git/test/test_stats.py @@ -7,19 +7,20 @@ from git.test.lib import * from git import * + class TestStats(TestBase): - + def test__list_from_string(self): output = fixture('diff_numstat') stats = Stats._list_from_string(self.rorepo, output) - + assert_equal(2, stats.total['files']) assert_equal(52, stats.total['lines']) assert_equal(29, stats.total['insertions']) assert_equal(23, stats.total['deletions']) - + assert_equal(29, stats.files["a.txt"]['insertions']) assert_equal(18, stats.files["a.txt"]['deletions']) - + assert_equal(0, stats.files["b.txt"]['insertions']) assert_equal(5, stats.files["b.txt"]['deletions']) diff --git a/git/test/test_submodule.py b/git/test/test_submodule.py index 37bf9f13..0ecb5c1f 100644 --- a/git/test/test_submodule.py +++ b/git/test/test_submodule.py @@ -25,35 +25,36 @@ if sys.platform == 'win32': class TestRootProgress(RootUpdateProgress): + """Just prints messages, for now without checking the correctness of the states""" - + def update(self, op, index, max_count, message=''): print message - + prog = TestRootProgress() + class TestSubmodule(TestBase): k_subm_current = "468cad66ff1f80ddaeee4123c24e4d53a032c00d" k_subm_changed = "394ed7006ee5dc8bddfd132b64001d5dfc0ffdd3" k_no_subm_tag = "0.1.6" - def _do_base_tests(self, rwrepo): """Perform all tests in the given repository, it may be bare or nonbare""" # manual instantiation - smm = Submodule(rwrepo, "\0"*20) + smm = Submodule(rwrepo, "\0" * 20) # name needs to be set in advance - self.failUnlessRaises(AttributeError, getattr, smm, 'name') - + self.failUnlessRaises(AttributeError, getattr, smm, 'name') + # iterate - 1 submodule sms = Submodule.list_items(rwrepo, self.k_subm_current) assert len(sms) == 1 sm = sms[0] - + # at a different time, there is None assert len(Submodule.list_items(rwrepo, self.k_no_subm_tag)) == 0 - + assert sm.path == 'git/ext/gitdb' assert sm.path != sm.name # in our case, we have ids there, which don't equal the path assert sm.url == 'git://github.com/gitpython-developers/gitdb.git' @@ -64,26 +65,26 @@ class TestSubmodule(TestBase): assert sm.size == 0 # the module is not checked-out yet self.failUnlessRaises(InvalidGitRepositoryError, sm.module) - + # which is why we can't get the branch either - it points into the module() repository self.failUnlessRaises(InvalidGitRepositoryError, getattr, sm, 'branch') - + # branch_path works, as its just a string assert isinstance(sm.branch_path, basestring) - + # some commits earlier we still have a submodule, but its at a different commit smold = Submodule.iter_items(rwrepo, self.k_subm_changed).next() assert smold.binsha != sm.binsha assert smold != sm # the name changed - + # force it to reread its information del(smold._url) smold.url == sm.url - + # test config_reader/writer methods sm.config_reader() - new_smclone_path = None # keep custom paths for later - new_csmclone_path = None # + new_smclone_path = None # keep custom paths for later + new_csmclone_path = None # if rwrepo.bare: self.failUnlessRaises(InvalidGitRepositoryError, sm.config_writer) else: @@ -96,25 +97,25 @@ class TestSubmodule(TestBase): assert sm.url == new_smclone_path # END handle bare repo smold.config_reader() - + # cannot get a writer on historical submodules if not rwrepo.bare: self.failUnlessRaises(ValueError, smold.config_writer) # END handle bare repo - + # make the old into a new - this doesn't work as the name changed prev_parent_commit = smold.parent_commit self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_subm_current) # the sha is properly updated - smold.set_parent_commit(self.k_subm_changed+"~1") + smold.set_parent_commit(self.k_subm_changed + "~1") assert smold.binsha != sm.binsha - - # raises if the sm didn't exist in new parent - it keeps its + + # raises if the sm didn't exist in new parent - it keeps its # parent_commit unchanged self.failUnlessRaises(ValueError, smold.set_parent_commit, self.k_no_subm_tag) - + # TEST TODO: if a path in the gitmodules file, but not in the index, it raises - + # TEST UPDATE ############## # module retrieval is not always possible @@ -126,107 +127,105 @@ class TestSubmodule(TestBase): # its not checked out in our case self.failUnlessRaises(InvalidGitRepositoryError, sm.module) assert not sm.module_exists() - + # currently there is only one submodule assert len(list(rwrepo.iter_submodules())) == 1 - assert sm.binsha != "\0"*20 - + assert sm.binsha != "\0" * 20 + # TEST ADD ########### # preliminary tests # adding existing returns exactly the existing sma = Submodule.add(rwrepo, sm.name, sm.path) assert sma.path == sm.path - + # no url and no module at path fails self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", "pathtorepo", url=None) - + # CONTINUE UPDATE ################# - + # lets update it - its a recursive one too newdir = os.path.join(sm.abspath, 'dir') os.makedirs(newdir) - + # update fails if the path already exists non-empty self.failUnlessRaises(OSError, sm.update) os.rmdir(newdir) - + # dry-run does nothing sm.update(dry_run=True, progress=prog) assert not sm.module_exists() - + assert sm.update() is sm sm_repopath = sm.path # cache for later assert sm.module_exists() assert isinstance(sm.module(), git.Repo) assert sm.module().working_tree_dir == sm.abspath - + # INTERLEAVE ADD TEST ##################### # url must match the one in the existing repository ( if submodule name suggests a new one ) # or we raise self.failUnlessRaises(ValueError, Submodule.add, rwrepo, "newsubm", sm.path, "git://someurl/repo.git") - - + # CONTINUE UPDATE ################# # we should have setup a tracking branch, which is also active assert sm.module().head.ref.tracking_branch() is not None - + # delete the whole directory and re-initialize shutil.rmtree(sm.abspath) assert len(sm.children()) == 0 # dry-run does nothing sm.update(dry_run=True, recursive=False, progress=prog) assert len(sm.children()) == 0 - + sm.update(recursive=False) assert len(list(rwrepo.iter_submodules())) == 2 assert len(sm.children()) == 1 # its not checked out yet csm = sm.children()[0] assert not csm.module_exists() csm_repopath = csm.path - + # adjust the path of the submodules module to point to the local destination new_csmclone_path = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path, csm.path)) csm.config_writer().set_value('url', new_csmclone_path) assert csm.url == new_csmclone_path - + # dry-run does nothing assert not csm.module_exists() sm.update(recursive=True, dry_run=True, progress=prog) assert not csm.module_exists() - + # update recursively again sm.update(recursive=True) assert csm.module_exists() - + # tracking branch once again csm.module().head.ref.tracking_branch() is not None - + # this flushed in a sub-submodule assert len(list(rwrepo.iter_submodules())) == 2 - - + # reset both heads to the previous version, verify that to_latest_revision works smods = (sm.module(), csm.module()) for repo in smods: repo.head.reset('HEAD~2', working_tree=1) # END for each repo to reset - - # dry run does nothing + + # dry run does nothing sm.update(recursive=True, dry_run=True, progress=prog) for repo in smods: assert repo.head.commit != repo.head.ref.tracking_branch().commit # END for each repo to check - + sm.update(recursive=True, to_latest_revision=True) for repo in smods: assert repo.head.commit == repo.head.ref.tracking_branch().commit # END for each repo to check del(smods) - + # if the head is detached, it still works ( but warns ) smref = sm.module().head.ref sm.module().head.ref = 'HEAD~1' @@ -234,15 +233,15 @@ class TestSubmodule(TestBase): csm_tracking_branch = csm.module().head.ref.tracking_branch() csm.module().head.ref.set_tracking_branch(None) sm.update(recursive=True, to_latest_revision=True) - + # to_latest_revision changes the child submodule's commit, it needs an # update now csm.set_parent_commit(csm.repo.head.commit) - + # undo the changes sm.module().head.ref = smref csm.module().head.ref.set_tracking_branch(csm_tracking_branch) - + # REMOVAL OF REPOSITOTRY ######################## # must delete something @@ -260,21 +259,21 @@ class TestSubmodule(TestBase): # still, we have the file modified self.failUnlessRaises(InvalidGitRepositoryError, sm.remove, dry_run=True) sm.module().index.reset(working_tree=True) - + # enforce the submodule to be checked out at the right spot as well. csm.update() - + # this would work assert sm.remove(dry_run=True) is sm assert sm.module_exists() sm.remove(force=True, dry_run=True) assert sm.module_exists() - + # but ... we have untracked files in the child submodule fn = join_path_native(csm.module().working_tree_dir, "newfile") open(fn, 'w').write("hi") self.failUnlessRaises(InvalidGitRepositoryError, sm.remove) - + # forcibly delete the child repository prev_count = len(sm.children()) assert csm.remove(force=True) is csm @@ -284,62 +283,62 @@ class TestSubmodule(TestBase): # now we have a changed index, as configuration was altered. # fix this sm.module().index.reset(working_tree=True) - + # now delete only the module of the main submodule assert sm.module_exists() sm.remove(configuration=False) assert sm.exists() assert not sm.module_exists() assert sm.config_reader().get_value('url') - + # delete the rest sm.remove() assert not sm.exists() assert not sm.module_exists() - + assert len(rwrepo.submodules) == 0 - + # ADD NEW SUBMODULE ################### # add a simple remote repo - trailing slashes are no problem smid = "newsub" osmid = "othersub" - nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path+"/", None, no_checkout=True) + nsm = Submodule.add(rwrepo, smid, sm_repopath, new_smclone_path + "/", None, no_checkout=True) assert nsm.name == smid assert nsm.module_exists() assert nsm.exists() # its not checked out assert not os.path.isfile(join_path_native(nsm.module().working_tree_dir, Submodule.k_modules_file)) assert len(rwrepo.submodules) == 1 - + # add another submodule, but into the root, not as submodule osm = Submodule.add(rwrepo, osmid, csm_repopath, new_csmclone_path, Submodule.k_head_default) assert osm != nsm assert osm.module_exists() assert osm.exists() assert os.path.isfile(join_path_native(osm.module().working_tree_dir, 'setup.py')) - + assert len(rwrepo.submodules) == 2 - + # commit the changes, just to finalize the operation rwrepo.index.commit("my submod commit") assert len(rwrepo.submodules) == 2 - - # needs update as the head changed, it thinks its in the history + + # needs update as the head changed, it thinks its in the history # of the repo otherwise nsm.set_parent_commit(rwrepo.head.commit) osm.set_parent_commit(rwrepo.head.commit) - + # MOVE MODULE ############# # invalid inptu self.failUnlessRaises(ValueError, nsm.move, 'doesntmatter', module=False, configuration=False) - + # renaming to the same path does nothing assert nsm.move(sm.path) is nsm - + # rename a module - nmp = join_path_native("new", "module", "dir") + "/" # new module path + nmp = join_path_native("new", "module", "dir") + "/" # new module path pmp = nsm.path abspmp = nsm.abspath assert nsm.move(nmp) is nsm @@ -347,49 +346,49 @@ class TestSubmodule(TestBase): nmpl = to_native_path_linux(nmp) assert nsm.path == nmpl assert rwrepo.submodules[0].path == nmpl - + mpath = 'newsubmodule' absmpath = join_path_native(rwrepo.working_tree_dir, mpath) open(absmpath, 'w').write('') self.failUnlessRaises(ValueError, nsm.move, mpath) os.remove(absmpath) - + # now it works, as we just move it back nsm.move(pmp) assert nsm.path == pmp assert rwrepo.submodules[0].path == pmp - + # TODO lowprio: test remaining exceptions ... for now its okay, the code looks right - + # REMOVE 'EM ALL ################ # if a submodule's repo has no remotes, it can't be added without an explicit url osmod = osm.module() - + osm.remove(module=False) for remote in osmod.remotes: remote.remove(osmod, remote.name) assert not osm.exists() - self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None) + self.failUnlessRaises(ValueError, Submodule.add, rwrepo, osmid, csm_repopath, url=None) # END handle bare mode - + # Error if there is no submodule file here self.failUnlessRaises(IOError, Submodule._config_parser, rwrepo, rwrepo.commit(self.k_no_subm_tag), True) - + @with_rw_repo(k_subm_current) def test_base_rw(self, rwrepo): self._do_base_tests(rwrepo) - + @with_rw_repo(k_subm_current, bare=True) def test_base_bare(self, rwrepo): self._do_base_tests(rwrepo) - + @with_rw_repo(k_subm_current, bare=False) def test_root_module(self, rwrepo): # Can query everything without problems rm = RootModule(self.rorepo) assert rm.module() is self.rorepo - + # try attributes rm.binsha rm.mode @@ -398,24 +397,24 @@ class TestSubmodule(TestBase): assert rm.parent_commit == self.rorepo.head.commit rm.url rm.branch - + assert len(rm.list_items(rm.module())) == 1 rm.config_reader() rm.config_writer() - + # deep traversal gitdb / async rsmsp = [sm.path for sm in rm.traverse()] assert len(rsmsp) >= 2 # gitdb and async [and smmap], async being a child of gitdb - + # cannot set the parent commit as root module's path didn't exist self.failUnlessRaises(ValueError, rm.set_parent_commit, 'HEAD') - + # TEST UPDATE ############# # setup commit which remove existing, add new and modify existing submodules rm = RootModule(rwrepo) assert len(rm.children()) == 1 - + # modify path without modifying the index entry # ( which is what the move method would do properly ) #================================================== @@ -424,37 +423,37 @@ class TestSubmodule(TestBase): fp = join_path_native(pp, sm.path) prep = sm.path assert not sm.module_exists() # was never updated after rwrepo's clone - - # assure we clone from a local source + + # assure we clone from a local source sm.config_writer().set_value('url', to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, sm.path))) - + # dry-run does nothing sm.update(recursive=False, dry_run=True, progress=prog) assert not sm.module_exists() - + sm.update(recursive=False) assert sm.module_exists() sm.config_writer().set_value('path', fp) # change path to something with prefix AFTER url change - + # update fails as list_items in such a situations cannot work, as it cannot # find the entry at the changed path self.failUnlessRaises(InvalidGitRepositoryError, rm.update, recursive=False) - + # move it properly - doesn't work as it its path currently points to an indexentry # which doesn't exist ( move it to some path, it doesn't matter here ) self.failUnlessRaises(InvalidGitRepositoryError, sm.move, pp) # reset the path(cache) to where it was, now it works sm.path = prep sm.move(fp, module=False) # leave it at the old location - + assert not sm.module_exists() - cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit - + cpathchange = rwrepo.index.commit("changed sm path") # finally we can commit + # update puts the module into place rm.update(recursive=False, progress=prog) sm.set_parent_commit(cpathchange) assert sm.module_exists() - + # add submodule #================ nsmn = "newsubmodule" @@ -468,17 +467,14 @@ class TestSubmodule(TestBase): # repo and a new submodule comes into life nsm.remove(configuration=False, module=True) assert not nsm.module_exists() and nsm.exists() - - + # dry-run does nothing rm.update(recursive=False, dry_run=True, progress=prog) - + # otherwise it will work rm.update(recursive=False, progress=prog) assert nsm.module_exists() - - - + # remove submodule - the previous one #==================================== sm.set_parent_commit(csmadded) @@ -486,45 +482,44 @@ class TestSubmodule(TestBase): assert not sm.remove(module=False).exists() assert os.path.isdir(smp) # module still exists csmremoved = rwrepo.index.commit("Removed submodule") - + # an update will remove the module # not in dry_run rm.update(recursive=False, dry_run=True) assert os.path.isdir(smp) - + rm.update(recursive=False) assert not os.path.isdir(smp) - - - # change url + + # change url #============= - # to the first repository, this way we have a fast checkout, and a completely different + # to the first repository, this way we have a fast checkout, and a completely different # repository at the different url nsm.set_parent_commit(csmremoved) nsmurl = to_native_path_linux(join_path_native(self.rorepo.working_tree_dir, rsmsp[0])) nsm.config_writer().set_value('url', nsmurl) csmpathchange = rwrepo.index.commit("changed url") nsm.set_parent_commit(csmpathchange) - + prev_commit = nsm.module().head.commit # dry-run does nothing rm.update(recursive=False, dry_run=True, progress=prog) assert nsm.module().remotes.origin.url != nsmurl - + rm.update(recursive=False, progress=prog) assert nsm.module().remotes.origin.url == nsmurl # head changed, as the remote url and its commit changed assert prev_commit != nsm.module().head.commit - + # add the submodule's changed commit to the index, which is what the # user would do # beforehand, update our instance's binsha with the new one nsm.binsha = nsm.module().head.commit.binsha rwrepo.index.add([nsm]) - + # change branch #================= - # we only have one branch, so we switch to a virtual one, and back + # we only have one branch, so we switch to a virtual one, and back # to the current one to trigger the difference cur_branch = nsm.branch nsmm = nsm.module() @@ -534,34 +529,33 @@ class TestSubmodule(TestBase): csmbranchchange = rwrepo.index.commit("changed branch to %s" % branch) nsm.set_parent_commit(csmbranchchange) # END for each branch to change - + # Lets remove our tracking branch to simulate some changes nsmmh = nsmm.head assert nsmmh.ref.tracking_branch() is None # never set it up until now assert not nsmmh.is_detached - + #dry run does nothing rm.update(recursive=False, dry_run=True, progress=prog) assert nsmmh.ref.tracking_branch() is None - + # the real thing does rm.update(recursive=False, progress=prog) - + assert nsmmh.ref.tracking_branch() is not None assert not nsmmh.is_detached - + # recursive update # ================= # finally we recursively update a module, just to run the code at least once # remove the module so that it has more work - assert len(nsm.children()) >= 1 # could include smmap + assert len(nsm.children()) >= 1 # could include smmap assert nsm.exists() and nsm.module_exists() and len(nsm.children()) >= 1 # assure we pull locally only - nsmc = nsm.children()[0] + nsmc = nsm.children()[0] nsmc.config_writer().set_value('url', async_url) rm.update(recursive=True, progress=prog, dry_run=True) # just to run the code rm.update(recursive=True, progress=prog) - + # gitdb: has either 1 or 2 submodules depending on the version assert len(nsm.children()) >= 1 and nsmc.module_exists() - diff --git a/git/test/test_tree.py b/git/test/test_tree.py index 16d5be59..0f1fb7c3 100644 --- a/git/test/test_tree.py +++ b/git/test/test_tree.py @@ -13,8 +13,9 @@ from git.objects.fun import ( ) from cStringIO import StringIO + class TestTree(TestBase): - + def test_serializable(self): # tree at the given commit contains a submodule as well roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c') @@ -25,75 +26,74 @@ class TestTree(TestBase): tree = item # trees have no dict self.failUnlessRaises(AttributeError, setattr, tree, 'someattr', 1) - + orig_data = tree.data_stream.read() orig_cache = tree._cache - + stream = StringIO() tree._serialize(stream) assert stream.getvalue() == orig_data - + stream.seek(0) testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '') testtree._deserialize(stream) assert testtree._cache == orig_cache - - + # TEST CACHE MUTATOR mod = testtree.cache self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name") self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode") self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name") - + # add new item name = "fake_dir" mod.add(testtree.NULL_HEX_SHA, tree.mode, name) assert name in testtree - + # its available in the tree immediately assert isinstance(testtree[name], Tree) - + # adding it again will not cause multiple of them to be presents cur_count = len(testtree) mod.add(testtree.NULL_HEX_SHA, tree.mode, name) assert len(testtree) == cur_count - + # fails with a different sha - name exists - hexsha = "1"*40 + hexsha = "1" * 40 self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name) - + # force it - replace existing one mod.add(hexsha, tree.mode, name, force=True) assert testtree[name].hexsha == hexsha assert len(testtree) == cur_count - + # unchecked addition always works, even with invalid items invalid_name = "hi/there" mod.add_unchecked(hexsha, 0, invalid_name) assert len(testtree) == cur_count + 1 - + del(mod[invalid_name]) assert len(testtree) == cur_count # del again, its fine del(mod[invalid_name]) - + # have added one item, we are done mod.set_done() mod.set_done() # multiple times are okay - + # serialize, its different now stream = StringIO() testtree._serialize(stream) stream.seek(0) assert stream.getvalue() != orig_data - + # replaces cache, but we make sure of it del(testtree._cache) testtree._deserialize(stream) assert name in testtree assert invalid_name not in testtree # END for each item in tree - + def test_traverse(self): root = self.rorepo.tree('0.1.6') num_recursive = 0 @@ -101,34 +101,34 @@ class TestTree(TestBase): for obj in root.traverse(): if "/" in obj.path: num_recursive += 1 - + assert isinstance(obj, (Blob, Tree)) all_items.append(obj) # END for each object assert all_items == root.list_traverse() - + # limit recursion level to 0 - should be same as default iteration assert all_items assert 'CHANGES' in root assert len(list(root)) == len(list(root.traverse(depth=1))) - + # only choose trees - trees_only = lambda i,d: i.type == "tree" - trees = list(root.traverse(predicate = trees_only)) - assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) )) - + trees_only = lambda i, d: i.type == "tree" + trees = list(root.traverse(predicate=trees_only)) + assert len(trees) == len(list(i for i in root.traverse() if trees_only(i, 0))) + # test prune - lib_folder = lambda t,d: t.path == "lib" - pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder)) + lib_folder = lambda t, d: t.path == "lib" + pruned_trees = list(root.traverse(predicate=trees_only, prune=lib_folder)) assert len(pruned_trees) < len(trees) - + # trees and blobs - assert len(set(trees)|set(root.trees)) == len(trees) - assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs ) + assert len(set(trees) | set(root.trees)) == len(trees) + assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len(root.blobs) subitem = trees[0][0] assert "/" in subitem.path assert subitem.name == os.path.basename(subitem.path) - + # assure that at some point the traversed paths have a slash in them found_slash = False for item in root.traverse(): @@ -136,9 +136,8 @@ class TestTree(TestBase): if '/' in item.path: found_slash = True # END check for slash - - # slashes in paths are supported as well - assert root[item.path] == item == root/item.path + + # slashes in paths are supported as well + assert root[item.path] == item == root / item.path # END for each item assert found_slash - diff --git a/git/test/test_util.py b/git/test/test_util.py index ea761217..63842d19 100644 --- a/git/test/test_util.py +++ b/git/test/test_util.py @@ -17,59 +17,60 @@ import time class TestIterableMember(object): + """A member of an iterable list""" __slots__ = ("name", "prefix_name") - + def __init__(self, name): self.name = name self.prefix_name = name - + class TestUtils(TestBase): + def setup(self): self.testdict = { - "string": "42", - "int": 42, - "array": [ 42 ], + "string": "42", + "int": 42, + "array": [42], } def test_it_should_dashify(self): assert_equal('this-is-my-argument', dashify('this_is_my_argument')) assert_equal('foo', dashify('foo')) - - + def test_lock_file(self): my_file = tempfile.mktemp() lock_file = LockFile(my_file) assert not lock_file._has_lock() # release lock we don't have - fine lock_file._release_lock() - + # get lock lock_file._obtain_lock_or_raise() assert lock_file._has_lock() - + # concurrent access other_lock_file = LockFile(my_file) assert not other_lock_file._has_lock() self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise) - + lock_file._release_lock() assert not lock_file._has_lock() - + other_lock_file._obtain_lock_or_raise() self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise) - + # auto-release on destruction del(other_lock_file) lock_file._obtain_lock_or_raise() lock_file._release_lock() - + def test_blocking_lock_file(self): my_file = tempfile.mktemp() lock_file = BlockingLockFile(my_file) lock_file._obtain_lock() - + # next one waits for the lock start = time.time() wait_time = 0.1 @@ -77,10 +78,10 @@ class TestUtils(TestBase): self.failUnlessRaises(IOError, wait_lock._obtain_lock) elapsed = time.time() - start assert elapsed <= wait_time + 0.02 # some extra time it may cost - + def test_user_id(self): assert '@' in get_user_id() - + def test_parse_date(self): # test all supported formats def assert_rval(rval, veri_time, offset=0): @@ -88,13 +89,13 @@ class TestUtils(TestBase): assert isinstance(rval[0], int) and isinstance(rval[1], int) assert rval[0] == veri_time assert rval[1] == offset - + # now that we are here, test our conversion functions as well utctz = altz_to_utctz_str(offset) assert isinstance(utctz, basestring) assert utctz_to_altz(verify_utctz(utctz)) == offset # END assert rval utility - + rfc = ("Thu, 07 Apr 2005 22:13:11 +0000", 0) iso = ("2005-04-07T22:13:11 -0200", 7200) iso2 = ("2005-04-07 22:13:11 +0400", -14400) @@ -105,52 +106,52 @@ class TestUtils(TestBase): for date, offset in (rfc, iso, iso2, iso3, alt, alt2): assert_rval(parse_date(date), veri_time, offset) # END for each date type - + # and failure self.failUnlessRaises(ValueError, parse_date, 'invalid format') self.failUnlessRaises(ValueError, parse_date, '123456789 -02000') self.failUnlessRaises(ValueError, parse_date, ' 123456789 -0200') - + def test_actor(self): for cr in (None, self.rorepo.config_reader()): assert isinstance(Actor.committer(cr), Actor) assert isinstance(Actor.author(cr), Actor) #END assure config reader is handled - + def test_iterable_list(self): for args in (('name',), ('name', 'prefix_')): l = IterableList('name') - + m1 = TestIterableMember('one') m2 = TestIterableMember('two') - + l.extend((m1, m2)) - + assert len(l) == 2 - + # contains works with name and identity assert m1.name in l assert m2.name in l assert m2 in l assert m2 in l assert 'invalid' not in l - + # with string index assert l[m1.name] is m1 assert l[m2.name] is m2 - + # with int index assert l[0] is m1 assert l[1] is m2 - + # with getattr assert l.one is m1 assert l.two is m2 - + # test exceptions self.failUnlessRaises(AttributeError, getattr, l, 'something') self.failUnlessRaises(IndexError, l.__getitem__, 'something') - + # delete by name and index self.failUnlessRaises(IndexError, l.__delitem__, 'something') del(l[m2.name]) @@ -159,8 +160,7 @@ class TestUtils(TestBase): del(l[0]) assert m1.name not in l assert len(l) == 0 - + self.failUnlessRaises(IndexError, l.__delitem__, 0) self.failUnlessRaises(IndexError, l.__delitem__, 'something') #END for each possible mode - |