diff options
Diffstat (limited to 'gitdb/test')
-rw-r--r-- | gitdb/test/__init__.py | 9 | ||||
-rw-r--r-- | gitdb/test/db/test_base.py | 17 | ||||
-rw-r--r-- | gitdb/test/lib.py | 199 | ||||
-rw-r--r-- | gitdb/test/object/test_blob.py | 21 | ||||
-rw-r--r-- | gitdb/test/object/test_commit.py | 274 | ||||
-rw-r--r-- | gitdb/test/object/test_tree.py | 143 | ||||
-rw-r--r-- | gitdb/test/performance/lib.py | 48 | ||||
-rw-r--r-- | gitdb/test/performance/test_pack.py | 89 | ||||
-rw-r--r-- | gitdb/test/performance/test_stream.py | 189 | ||||
-rw-r--r-- | gitdb/test/test_base.py | 91 | ||||
-rw-r--r-- | gitdb/test/test_config.py | 101 | ||||
-rw-r--r-- | gitdb/test/test_example.py | 63 | ||||
-rw-r--r-- | gitdb/test/test_pack.py | 246 | ||||
-rw-r--r-- | gitdb/test/test_refs.py | 39 | ||||
-rw-r--r-- | gitdb/test/test_util.py | 125 |
15 files changed, 20 insertions, 1634 deletions
diff --git a/gitdb/test/__init__.py b/gitdb/test/__init__.py index 760f531..a19fc87 100644 --- a/gitdb/test/__init__.py +++ b/gitdb/test/__init__.py @@ -3,14 +3,5 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php -import gitdb.util -#{ Initialization -def _init_pool(): - """Assure the pool is actually threaded""" - size = 2 - print "Setting ThreadPool to %i" % size - gitdb.util.pool.set_size(size) - -#} END initialization diff --git a/gitdb/test/db/test_base.py b/gitdb/test/db/test_base.py index 0a381be..8b13789 100644 --- a/gitdb/test/db/test_base.py +++ b/gitdb/test/db/test_base.py @@ -1,18 +1 @@ -# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors -# -# This module is part of GitDB and is released under -# the New BSD License: http://www.opensource.org/licenses/bsd-license.php -from lib import * -from gitdb.db import RefSpec -class TestBase(TestDBBase): - - @with_rw_directory - def test_basics(self, path): - self.failUnlessRaises(ValueError, RefSpec, None, None) - rs = RefSpec(None, "something") - assert rs.force == False - assert rs.delete_destination() - assert rs.source is None - assert rs.destination == "something" - diff --git a/gitdb/test/lib.py b/gitdb/test/lib.py index 9224f5f..8b13789 100644 --- a/gitdb/test/lib.py +++ b/gitdb/test/lib.py @@ -1,200 +1 @@ -# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors -# -# This module is part of PureGitDB and is released under -# the New BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Utilities used in ODB testing""" -from gitdb import OStream -from gitdb.db.py import PureGitDB -from gitdb.stream import ( - Sha1Writer, - ZippedStoreShaWriter - ) - -from gitdb.util import zlib - -import sys -import random -from array import array -from cStringIO import StringIO - -import glob -import unittest -import tempfile -import shutil -import os -import gc - - -#{ Decorators - -def with_rw_directory(func): - """Create a temporary directory which can be written to, remove it if the - test suceeds, but leave it otherwise to aid additional debugging""" - def wrapper(self): - path = tempfile.mktemp(prefix=func.__name__) - os.mkdir(path) - keep = False - try: - try: - return func(self, path) - except Exception: - print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path) - keep = True - raise - finally: - # Need to collect here to be sure all handles have been closed. It appears - # a windows-only issue. In fact things should be deleted, as well as - # memory maps closed, once objects go out of scope. For some reason - # though this is not the case here unless we collect explicitly. - if not keep: - gc.collect() - shutil.rmtree(path) - # END handle exception - # END wrapper - - wrapper.__name__ = func.__name__ - return wrapper - - -def with_rw_repo(func): - """Create a copy of our repository and put it into a writable location. It will - be removed if the test doesn't result in an error. - As we can currently only copy the fully working tree, tests must not rely on - being on a certain branch or on anything really except for the default tags - that should exist - Wrapped function obtains a git repository """ - def wrapper(self, path): - src_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) - assert(os.path.isdir(path)) - os.rmdir(path) # created by wrapper, but must not exist for copy operation - shutil.copytree(src_dir, path) - target_gitdir = os.path.join(path, '.git') - assert os.path.isdir(target_gitdir) - return func(self, PureGitDB(target_gitdir)) - #END wrapper - wrapper.__name__ = func.__name__ - return with_rw_directory(wrapper) - - - -def with_packs_rw(func): - """Function that provides a path into which the packs for testing should be - copied. Will pass on the path to the actual function afterwards - - :note: needs with_rw_directory wrapped around it""" - def wrapper(self, path): - src_pack_glob = fixture_path('packs/*') - copy_files_globbed(src_pack_glob, path, hard_link_ok=True) - return func(self, path) - # END wrapper - - wrapper.__name__ = func.__name__ - return with_rw_directory(wrapper) - -#} END decorators - -#{ Routines - -def repo_dir(): - """:return: path to our own repository, being our own .git directory. - :note: doesn't work in bare repositories""" - base = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), '.git') - assert os.path.isdir(base) - return base - - -def maketemp(*args): - """Wrapper around default tempfile.mktemp to fix an osx issue""" - tdir = tempfile.mktemp(*args) - if sys.platform == 'darwin': - tdir = '/private' + tdir - return tdir - -def fixture_path(relapath=''): - """:return: absolute path into the fixture directory - :param relapath: relative path into the fixtures directory, or '' - to obtain the fixture directory itself""" - return os.path.join(os.path.dirname(__file__), 'fixtures', relapath) - -def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): - """Copy all files found according to the given source glob into the target directory - :param hard_link_ok: if True, hard links will be created if possible. Otherwise - the files will be copied""" - for src_file in glob.glob(source_glob): - if hard_link_ok and hasattr(os, 'link'): - target = os.path.join(target_dir, os.path.basename(src_file)) - try: - os.link(src_file, target) - except OSError: - shutil.copy(src_file, target_dir) - # END handle cross device links ( and resulting failure ) - else: - shutil.copy(src_file, target_dir) - # END try hard link - # END for each file to copy - - -def make_bytes(size_in_bytes, randomize=False): - """:return: string with given size in bytes - :param randomize: try to produce a very random stream""" - actual_size = size_in_bytes / 4 - producer = xrange(actual_size) - if randomize: - producer = list(producer) - random.shuffle(producer) - # END randomize - a = array('i', producer) - return a.tostring() - -def make_object(type, data): - """:return: bytes resembling an uncompressed object""" - odata = "blob %i\0" % len(data) - return odata + data - -def make_memory_file(size_in_bytes, randomize=False): - """:return: tuple(size_of_stream, stream) - :param randomize: try to produce a very random stream""" - d = make_bytes(size_in_bytes, randomize) - return len(d), StringIO(d) - -#} END routines - -#{ Stream Utilities - -class DummyStream(object): - def __init__(self): - self.was_read = False - self.bytes = 0 - self.closed = False - - def read(self, size): - self.was_read = True - self.bytes = size - - def close(self): - self.closed = True - - def _assert(self): - assert self.was_read - - -class DeriveTest(OStream): - def __init__(self, sha, type, size, stream, *args, **kwargs): - self.myarg = kwargs.pop('myarg') - self.args = args - - def _assert(self): - assert self.args - assert self.myarg - -#} END stream utilitiess - -#{ Bases - -class TestBase(unittest.TestCase): - """Base class for all tests""" - # The non-database specific tests just provides a default pure git database - rorepo = PureGitDB(repo_dir()) - -#} END bases diff --git a/gitdb/test/object/test_blob.py b/gitdb/test/object/test_blob.py index 661c050..139597f 100644 --- a/gitdb/test/object/test_blob.py +++ b/gitdb/test/object/test_blob.py @@ -1,23 +1,2 @@ -# test_blob.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -from git.test.lib import * -from git import * -from gitdb.util import hex_to_bin -class TestBlob(TestBase): - - def test_mime_type_should_return_mime_type_for_known_types(self): - blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA, 'path': 'foo.png'}) - assert_equal("image/png", blob.mime_type) - - def test_mime_type_should_return_text_plain_for_unknown_types(self): - blob = Blob(self.rorepo, **{'binsha': Blob.NULL_BIN_SHA,'path': 'something'}) - assert_equal("text/plain", blob.mime_type) - - def test_nodict(self): - self.failUnlessRaises(AttributeError, setattr, self.rorepo.tree()['AUTHORS'], 'someattr', 2) - diff --git a/gitdb/test/object/test_commit.py b/gitdb/test/object/test_commit.py index 4a8d8b8..8b13789 100644 --- a/gitdb/test/object/test_commit.py +++ b/gitdb/test/object/test_commit.py @@ -1,275 +1 @@ -# -*- coding: utf-8 -*- -# test_commit.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -from git.test.lib import * -from git import * -from gitdb import IStream -from gitdb.util import hex_to_bin - -from cStringIO import StringIO -import time -import sys - - -def assert_commit_serialization(rwrepo, commit_id, print_performance_info=False): - """traverse all commits in the history of commit identified by commit_id and check - if the serialization works. - :param print_performance_info: if True, we will show how fast we are""" - ns = 0 # num serializations - nds = 0 # num deserializations - - st = time.time() - for cm in rwrepo.commit(commit_id).traverse(): - nds += 1 - - # assert that we deserialize commits correctly, hence we get the same - # sha on serialization - stream = StringIO() - cm._serialize(stream) - ns += 1 - streamlen = stream.tell() - stream.seek(0) - - istream = rwrepo.odb.store(IStream(Commit.type, streamlen, stream)) - assert istream.hexsha == cm.hexsha - - nc = Commit(rwrepo, Commit.NULL_BIN_SHA, cm.tree, - cm.author, cm.authored_date, cm.author_tz_offset, - cm.committer, cm.committed_date, cm.committer_tz_offset, - cm.message, cm.parents, cm.encoding) - - assert nc.parents == cm.parents - stream = StringIO() - nc._serialize(stream) - ns += 1 - streamlen = stream.tell() - stream.seek(0) - - # reuse istream - istream.size = streamlen - istream.stream = stream - istream.binsha = None - nc.binsha = rwrepo.odb.store(istream).binsha - - # if it worked, we have exactly the same contents ! - assert nc.hexsha == cm.hexsha - # END check commits - elapsed = time.time() - st - - if print_performance_info: - print >> sys.stderr, "Serialized %i and deserialized %i commits in %f s ( (%f, %f) commits / s" % (ns, nds, elapsed, ns/elapsed, nds/elapsed) - # END handle performance info - - -class TestCommit(TestBase): - - def test_bake(self): - - commit = self.rorepo.commit('2454ae89983a4496a445ce347d7a41c0bb0ea7ae') - # commits have no dict - self.failUnlessRaises(AttributeError, setattr, commit, 'someattr', 1) - commit.author # bake - - assert_equal("Sebastian Thiel", commit.author.name) - assert_equal("byronimo@gmail.com", commit.author.email) - assert commit.author == commit.committer - assert isinstance(commit.authored_date, int) and isinstance(commit.committed_date, int) - assert isinstance(commit.author_tz_offset, int) and isinstance(commit.committer_tz_offset, int) - assert commit.message == "Added missing information to docstrings of commit and stats module\n" - - - def test_stats(self): - commit = self.rorepo.commit('33ebe7acec14b25c5f84f35a664803fcab2f7781') - stats = commit.stats - - def check_entries(d): - assert isinstance(d, dict) - for key in ("insertions", "deletions", "lines"): - assert key in d - # END assertion helper - assert stats.files - assert stats.total - - check_entries(stats.total) - assert "files" in stats.total - - for filepath, d in stats.files.items(): - check_entries(d) - # END for each stated file - - # assure data is parsed properly - michael = Actor._from_string("Michael Trier <mtrier@gmail.com>") - assert commit.author == michael - assert commit.committer == michael - assert commit.authored_date == 1210193388 - assert commit.committed_date == 1210193388 - assert commit.author_tz_offset == 14400, commit.author_tz_offset - assert commit.committer_tz_offset == 14400, commit.committer_tz_offset - assert commit.message == "initial project\n" - - def test_unicode_actor(self): - # assure we can parse unicode actors correctly - name = "Üäöß ÄußÉ".decode("utf-8") - assert len(name) == 9 - special = Actor._from_string(u"%s <something@this.com>" % name) - assert special.name == name - assert isinstance(special.name, unicode) - - def test_traversal(self): - start = self.rorepo.commit("a4d06724202afccd2b5c54f81bcf2bf26dea7fff") - first = self.rorepo.commit("33ebe7acec14b25c5f84f35a664803fcab2f7781") - p0 = start.parents[0] - p1 = start.parents[1] - p00 = p0.parents[0] - p10 = p1.parents[0] - - # basic branch first, depth first - dfirst = start.traverse(branch_first=False) - bfirst = start.traverse(branch_first=True) - assert dfirst.next() == p0 - assert dfirst.next() == p00 - - assert bfirst.next() == p0 - assert bfirst.next() == p1 - assert bfirst.next() == p00 - assert bfirst.next() == p10 - - # at some point, both iterations should stop - assert list(bfirst)[-1] == first - stoptraverse = self.rorepo.commit("254d04aa3180eb8b8daf7b7ff25f010cd69b4e7d").traverse(as_edge=True) - l = list(stoptraverse) - assert len(l[0]) == 2 - - # ignore self - assert start.traverse(ignore_self=False).next() == start - - # depth - assert len(list(start.traverse(ignore_self=False, depth=0))) == 1 - - # prune - assert start.traverse(branch_first=1, prune=lambda i,d: i==p0).next() == p1 - - # predicate - assert start.traverse(branch_first=1, predicate=lambda i,d: i==p1).next() == p1 - - # traversal should stop when the beginning is reached - self.failUnlessRaises(StopIteration, first.traverse().next) - - # parents of the first commit should be empty ( as the only parent has a null - # sha ) - assert len(first.parents) == 0 - - def test_iteration(self): - # we can iterate commits - all_commits = Commit.list_items(self.rorepo, self.rorepo.head) - assert all_commits - assert all_commits == list(self.rorepo.iter_commits()) - - # this includes merge commits - mcomit = self.rorepo.commit('d884adc80c80300b4cc05321494713904ef1df2d') - assert mcomit in all_commits - - # we can limit the result to paths - ltd_commits = list(self.rorepo.iter_commits(paths='CHANGES')) - assert ltd_commits and len(ltd_commits) < len(all_commits) - - # show commits of multiple paths, resulting in a union of commits - less_ltd_commits = list(Commit.iter_items(self.rorepo, 'master', paths=('CHANGES', 'AUTHORS'))) - assert len(ltd_commits) < len(less_ltd_commits) - - def test_iter_items(self): - # pretty not allowed - self.failUnlessRaises(ValueError, Commit.iter_items, self.rorepo, 'master', pretty="raw") - - def test_rev_list_bisect_all(self): - """ - 'git rev-list --bisect-all' returns additional information - in the commit header. This test ensures that we properly parse it. - """ - revs = self.rorepo.git.rev_list('933d23bf95a5bd1624fbcdf328d904e1fa173474', - first_parent=True, - bisect_all=True) - - commits = Commit._iter_from_process_or_stream(self.rorepo, StringProcessAdapter(revs)) - expected_ids = ( - '7156cece3c49544abb6bf7a0c218eb36646fad6d', - '1f66cfbbce58b4b552b041707a12d437cc5f400a', - '33ebe7acec14b25c5f84f35a664803fcab2f7781', - '933d23bf95a5bd1624fbcdf328d904e1fa173474' - ) - for sha1, commit in zip(expected_ids, commits): - assert_equal(sha1, commit.hexsha) - - def test_count(self): - assert self.rorepo.tag('refs/tags/0.1.5').commit.count( ) == 143 - - def test_list(self): - assert isinstance(Commit.list_items(self.rorepo, '0.1.5', max_count=5)[hex_to_bin('5117c9c8a4d3af19a9958677e45cda9269de1541')], Commit) - - def test_str(self): - commit = Commit(self.rorepo, Commit.NULL_BIN_SHA) - assert_equal(Commit.NULL_HEX_SHA, str(commit)) - - def test_repr(self): - commit = Commit(self.rorepo, Commit.NULL_BIN_SHA) - assert_equal('<git.Commit "%s">' % Commit.NULL_HEX_SHA, repr(commit)) - - def test_equality(self): - commit1 = Commit(self.rorepo, Commit.NULL_BIN_SHA) - commit2 = Commit(self.rorepo, Commit.NULL_BIN_SHA) - commit3 = Commit(self.rorepo, "\1"*20) - assert_equal(commit1, commit2) - assert_not_equal(commit2, commit3) - - def test_iter_parents(self): - # should return all but ourselves, even if skip is defined - c = self.rorepo.commit('0.1.5') - for skip in (0, 1): - piter = c.iter_parents(skip=skip) - first_parent = piter.next() - assert first_parent != c - assert first_parent == c.parents[0] - # END for each - - def test_base(self): - name_rev = self.rorepo.head.commit.name_rev - assert isinstance(name_rev, basestring) - - @with_rw_repo('HEAD', bare=True) - def test_serialization(self, rwrepo): - # create all commits of our repo - assert_commit_serialization(rwrepo, '0.1.6') - - def test_serialization_unicode_support(self): - assert Commit.default_encoding.lower() == 'utf-8' - - # create a commit with unicode in the message, and the author's name - # Verify its serialization and deserialization - cmt = self.rorepo.commit('0.1.6') - assert isinstance(cmt.message, unicode) # it automatically decodes it as such - assert isinstance(cmt.author.name, unicode) # same here - - cmt.message = "üäêèß".decode("utf-8") - assert len(cmt.message) == 5 - - cmt.author.name = "äüß".decode("utf-8") - assert len(cmt.author.name) == 3 - - cstream = StringIO() - cmt._serialize(cstream) - cstream.seek(0) - assert len(cstream.getvalue()) - - ncmt = Commit(self.rorepo, cmt.binsha) - ncmt._deserialize(cstream) - - assert cmt.author.name == ncmt.author.name - assert cmt.message == ncmt.message - # actually, it can't be printed in a shell as repr wants to have ascii only - # it appears - cmt.author.__repr__() - diff --git a/gitdb/test/object/test_tree.py b/gitdb/test/object/test_tree.py index ec10e96..8b13789 100644 --- a/gitdb/test/object/test_tree.py +++ b/gitdb/test/object/test_tree.py @@ -1,144 +1 @@ -# test_tree.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -import os -from git.test.lib import * -from git import * -from git.objects.fun import ( - traverse_tree_recursive, - traverse_trees_recursive - ) -from cStringIO import StringIO - -class TestTree(TestBase): - - def test_serializable(self): - # tree at the given commit contains a submodule as well - roottree = self.rorepo.tree('6c1faef799095f3990e9970bc2cb10aa0221cf9c') - for item in roottree.traverse(ignore_self=False): - if item.type != Tree.type: - continue - # END skip non-trees - tree = item - # trees have no dict - self.failUnlessRaises(AttributeError, setattr, tree, 'someattr', 1) - - orig_data = tree.data_stream.read() - orig_cache = tree._cache - - stream = StringIO() - tree._serialize(stream) - assert stream.getvalue() == orig_data - - stream.seek(0) - testtree = Tree(self.rorepo, Tree.NULL_BIN_SHA, 0, '') - testtree._deserialize(stream) - assert testtree._cache == orig_cache - - - # TEST CACHE MUTATOR - mod = testtree.cache - self.failUnlessRaises(ValueError, mod.add, "invalid sha", 0, "name") - self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, 0, "invalid mode") - self.failUnlessRaises(ValueError, mod.add, Tree.NULL_HEX_SHA, tree.mode, "invalid/name") - - # add new item - name = "fake_dir" - mod.add(testtree.NULL_HEX_SHA, tree.mode, name) - assert name in testtree - - # its available in the tree immediately - assert isinstance(testtree[name], Tree) - - # adding it again will not cause multiple of them to be presents - cur_count = len(testtree) - mod.add(testtree.NULL_HEX_SHA, tree.mode, name) - assert len(testtree) == cur_count - - # fails with a different sha - name exists - hexsha = "1"*40 - self.failUnlessRaises(ValueError, mod.add, hexsha, tree.mode, name) - - # force it - replace existing one - mod.add(hexsha, tree.mode, name, force=True) - assert testtree[name].hexsha == hexsha - assert len(testtree) == cur_count - - # unchecked addition always works, even with invalid items - invalid_name = "hi/there" - mod.add_unchecked(hexsha, 0, invalid_name) - assert len(testtree) == cur_count + 1 - - del(mod[invalid_name]) - assert len(testtree) == cur_count - # del again, its fine - del(mod[invalid_name]) - - # have added one item, we are done - mod.set_done() - mod.set_done() # multiple times are okay - - # serialize, its different now - stream = StringIO() - testtree._serialize(stream) - stream.seek(0) - assert stream.getvalue() != orig_data - - # replaces cache, but we make sure of it - del(testtree._cache) - testtree._deserialize(stream) - assert name in testtree - assert invalid_name not in testtree - # END for each item in tree - - def test_traverse(self): - root = self.rorepo.tree('0.1.6') - num_recursive = 0 - all_items = list() - for obj in root.traverse(): - if "/" in obj.path: - num_recursive += 1 - - assert isinstance(obj, (Blob, Tree)) - all_items.append(obj) - # END for each object - assert all_items == root.list_traverse() - - # limit recursion level to 0 - should be same as default iteration - assert all_items - assert 'CHANGES' in root - assert len(list(root)) == len(list(root.traverse(depth=1))) - - # only choose trees - trees_only = lambda i,d: i.type == "tree" - trees = list(root.traverse(predicate = trees_only)) - assert len(trees) == len(list( i for i in root.traverse() if trees_only(i,0) )) - - # test prune - lib_folder = lambda t,d: t.path == "lib" - pruned_trees = list(root.traverse(predicate = trees_only,prune = lib_folder)) - assert len(pruned_trees) < len(trees) - - # trees and blobs - assert len(set(trees)|set(root.trees)) == len(trees) - assert len(set(b for b in root if isinstance(b, Blob)) | set(root.blobs)) == len( root.blobs ) - subitem = trees[0][0] - assert "/" in subitem.path - assert subitem.name == os.path.basename(subitem.path) - - # assure that at some point the traversed paths have a slash in them - found_slash = False - for item in root.traverse(): - assert os.path.isabs(item.abspath) - if '/' in item.path: - found_slash = True - # END check for slash - - # slashes in paths are supported as well - assert root[item.path] == item == root/item.path - # END for each item - assert found_slash - diff --git a/gitdb/test/performance/lib.py b/gitdb/test/performance/lib.py index 761113d..cb6303f 100644 --- a/gitdb/test/performance/lib.py +++ b/gitdb/test/performance/lib.py @@ -3,52 +3,4 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Contains library functions""" -import os -from gitdb.test.lib import * -import shutil -import tempfile - -#{ Invvariants -k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE" -#} END invariants - - -#{ Utilities -def resolve_or_fail(env_var): - """:return: resolved environment variable or raise EnvironmentError""" - try: - return os.environ[env_var] - except KeyError: - raise EnvironmentError("Please set the %r envrionment variable and retry" % env_var) - # END exception handling - -#} END utilities - - -#{ Base Classes - -class TestBigRepoR(TestBase): - """TestCase providing access to readonly 'big' repositories using the following - member variables: - - * gitrepopath - - * read-only base path of the git source repository, i.e. .../git/.git""" - - #{ Invariants - head_sha_2k = '235d521da60e4699e5bd59ac658b5b48bd76ddca' - head_sha_50 = '32347c375250fd470973a5d76185cac718955fd5' - #} END invariants - - @classmethod - def setUpAll(cls): - try: - super(TestBigRepoR, cls).setUpAll() - except AttributeError: - pass - cls.gitrepopath = resolve_or_fail(k_env_git_repo) - assert cls.gitrepopath.endswith('.git') - - -#} END base classes diff --git a/gitdb/test/performance/test_pack.py b/gitdb/test/performance/test_pack.py index da952b1..8b13789 100644 --- a/gitdb/test/performance/test_pack.py +++ b/gitdb/test/performance/test_pack.py @@ -1,90 +1 @@ -# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors -# -# This module is part of GitDB and is released under -# the New BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Performance tests for object store""" -from lib import ( - TestBigRepoR - ) -from gitdb.exc import UnsupportedOperation -from gitdb.db.pack import PackedDB - -import sys -import os -from time import time -import random - -class TestPackedDBPerformance(TestBigRepoR): - - def _test_pack_random_access(self): - pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack")) - - # sha lookup - st = time() - sha_list = list(pdb.sha_iter()) - elapsed = time() - st - ns = len(sha_list) - print >> sys.stderr, "PDB: looked up %i shas by index in %f s ( %f shas/s )" % (ns, elapsed, ns / elapsed) - - # sha lookup: best-case and worst case access - pdb_pack_info = pdb._pack_info - # END shuffle shas - st = time() - for sha in sha_list: - pdb_pack_info(sha) - # END for each sha to look up - elapsed = time() - st - - # discard cache - del(pdb._entities) - pdb.entities() - print >> sys.stderr, "PDB: looked up %i sha in %i packs in %f s ( %f shas/s )" % (ns, len(pdb.entities()), elapsed, ns / elapsed) - # END for each random mode - - # query info and streams only - max_items = 10000 # can wait longer when testing memory - for pdb_fun in (pdb.info, pdb.stream): - st = time() - for sha in sha_list[:max_items]: - pdb_fun(sha) - elapsed = time() - st - print >> sys.stderr, "PDB: Obtained %i object %s by sha in %f s ( %f items/s )" % (max_items, pdb_fun.__name__.upper(), elapsed, max_items / elapsed) - # END for each function - - # retrieve stream and read all - max_items = 5000 - pdb_stream = pdb.stream - total_size = 0 - st = time() - for sha in sha_list[:max_items]: - stream = pdb_stream(sha) - stream.read() - total_size += stream.size - elapsed = time() - st - total_kib = total_size / 1000 - print >> sys.stderr, "PDB: Obtained %i streams by sha and read all bytes totallying %i KiB ( %f KiB / s ) in %f s ( %f streams/s )" % (max_items, total_kib, total_kib/elapsed , elapsed, max_items / elapsed) - - def test_correctness(self): - pdb = PackedDB(os.path.join(self.gitrepopath, "objects/pack")) - # disabled for now as it used to work perfectly, checking big repositories takes a long time - print >> sys.stderr, "Endurance run: verify streaming of objects (crc and sha)" - for crc in range(2): - count = 0 - st = time() - for entity in pdb.entities(): - pack_verify = entity.is_valid_stream - sha_by_index = entity.index().sha - for index in xrange(entity.index().size()): - try: - assert pack_verify(sha_by_index(index), use_crc=crc) - count += 1 - except UnsupportedOperation: - pass - # END ignore old indices - # END for each index - # END for each entity - elapsed = time() - st - print >> sys.stderr, "PDB: verified %i objects (crc=%i) in %f s ( %f objects/s )" % (count, crc, elapsed, count / elapsed) - # END for each verify mode - diff --git a/gitdb/test/performance/test_stream.py b/gitdb/test/performance/test_stream.py index 0e47484..d64346e 100644 --- a/gitdb/test/performance/test_stream.py +++ b/gitdb/test/performance/test_stream.py @@ -3,193 +3,4 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Performance data streaming performance""" -from lib import TestBigRepoR -from gitdb.db.py import * -from gitdb.base import * -from gitdb.stream import * -from gitdb.util import ( - pool, - bin_to_hex - ) -from gitdb.typ import str_blob_type -from gitdb.fun import chunk_size -from async import ( - IteratorReader, - ChannelThreadTask, - ) - -from cStringIO import StringIO -from time import time -import os -import sys -import stat -import subprocess - - -from lib import ( - TestBigRepoR, - make_memory_file, - with_rw_directory - ) - - -#{ Utilities -def read_chunked_stream(stream): - total = 0 - while True: - chunk = stream.read(chunk_size) - total += len(chunk) - if len(chunk) < chunk_size: - break - # END read stream loop - assert total == stream.size - return stream - - -class TestStreamReader(ChannelThreadTask): - """Expects input streams and reads them in chunks. It will read one at a time, - requireing a queue chunk of size 1""" - def __init__(self, *args): - super(TestStreamReader, self).__init__(*args) - self.fun = read_chunked_stream - self.max_chunksize = 1 - - -#} END utilities - -class TestObjDBPerformance(TestBigRepoR): - - large_data_size_bytes = 1000*1000*50 # some MiB should do it - moderate_data_size_bytes = 1000*1000*1 # just 1 MiB - - @with_rw_directory - def test_large_data_streaming(self, path): - ldb = PureLooseObjectODB(path) - string_ios = list() # list of streams we previously created - - # serial mode - for randomize in range(2): - desc = (randomize and 'random ') or '' - print >> sys.stderr, "Creating %s data ..." % desc - st = time() - size, stream = make_memory_file(self.large_data_size_bytes, randomize) - elapsed = time() - st - print >> sys.stderr, "Done (in %f s)" % elapsed - string_ios.append(stream) - - # writing - due to the compression it will seem faster than it is - st = time() - sha = ldb.store(IStream('blob', size, stream)).binsha - elapsed_add = time() - st - assert ldb.has_object(sha) - db_file = ldb.readable_db_object_path(bin_to_hex(sha)) - fsize_kib = os.path.getsize(db_file) / 1000 - - - size_kib = size / 1000 - print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add) - - # reading all at once - st = time() - ostream = ldb.stream(sha) - shadata = ostream.read() - elapsed_readall = time() - st - - stream.seek(0) - assert shadata == stream.getvalue() - print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall) - - - # reading in chunks of 1 MiB - cs = 512*1000 - chunks = list() - st = time() - ostream = ldb.stream(sha) - while True: - data = ostream.read(cs) - chunks.append(data) - if len(data) < cs: - break - # END read in chunks - elapsed_readchunks = time() - st - - stream.seek(0) - assert ''.join(chunks) == stream.getvalue() - - cs_kib = cs / 1000 - print >> sys.stderr, "Read %i KiB of %s data in %i KiB chunks from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, cs_kib, elapsed_readchunks, size_kib / elapsed_readchunks) - - # del db file so we keep something to do - os.remove(db_file) - # END for each randomization factor - - - # multi-threaded mode - # want two, should be supported by most of todays cpus - pool.set_size(2) - total_kib = 0 - nsios = len(string_ios) - for stream in string_ios: - stream.seek(0) - total_kib += len(stream.getvalue()) / 1000 - # END rewind - - def istream_iter(): - for stream in string_ios: - stream.seek(0) - yield IStream(str_blob_type, len(stream.getvalue()), stream) - # END for each stream - # END util - - # write multiple objects at once, involving concurrent compression - reader = IteratorReader(istream_iter()) - istream_reader = ldb.store_async(reader) - istream_reader.task().max_chunksize = 1 - - st = time() - istreams = istream_reader.read(nsios) - assert len(istreams) == nsios - elapsed = time() - st - - print >> sys.stderr, "Threads(%i): Compressed %i KiB of data in loose odb in %f s ( %f Write KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed) - - # decompress multiple at once, by reading them - # chunk size is not important as the stream will not really be decompressed - - # until its read - istream_reader = IteratorReader(iter([ i.binsha for i in istreams ])) - ostream_reader = ldb.stream_async(istream_reader) - - chunk_task = TestStreamReader(ostream_reader, "chunker", None) - output_reader = pool.add_task(chunk_task) - output_reader.task().max_chunksize = 1 - - st = time() - assert len(output_reader.read(nsios)) == nsios - elapsed = time() - st - - print >> sys.stderr, "Threads(%i): Decompressed %i KiB of data in loose odb in %f s ( %f Read KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed) - - # store the files, and read them back. For the reading, we use a task - # as well which is chunked into one item per task. Reading all will - # very quickly result in two threads handling two bytestreams of - # chained compression/decompression streams - reader = IteratorReader(istream_iter()) - istream_reader = ldb.store_async(reader) - istream_reader.task().max_chunksize = 1 - - istream_to_sha = lambda items: [ i.binsha for i in items ] - istream_reader.set_post_cb(istream_to_sha) - - ostream_reader = ldb.stream_async(istream_reader) - - chunk_task = TestStreamReader(ostream_reader, "chunker", None) - output_reader = pool.add_task(chunk_task) - output_reader.max_chunksize = 1 - - st = time() - assert len(output_reader.read(nsios)) == nsios - elapsed = time() - st - - print >> sys.stderr, "Threads(%i): Compressed and decompressed and read %i KiB of data in loose odb in %f s ( %f Combined KiB / s)" % (pool.size(), total_kib, elapsed, total_kib / elapsed) diff --git a/gitdb/test/test_base.py b/gitdb/test/test_base.py index 1b20faf..fbe0135 100644 --- a/gitdb/test/test_base.py +++ b/gitdb/test/test_base.py @@ -3,96 +3,5 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Test for object db""" -from lib import ( - TestBase, - DummyStream, - DeriveTest, - ) -from gitdb import * -from gitdb.util import ( - NULL_BIN_SHA - ) -from gitdb.typ import ( - str_blob_type - ) - - -class TestBaseTypes(TestBase): - - def test_streams(self): - # test info - sha = NULL_BIN_SHA - s = 20 - blob_id = 3 - - info = OInfo(sha, str_blob_type, s) - assert info.binsha == sha - assert info.type == str_blob_type - assert info.type_id == blob_id - assert info.size == s - - # test pack info - # provides type_id - pinfo = OPackInfo(0, blob_id, s) - assert pinfo.type == str_blob_type - assert pinfo.type_id == blob_id - assert pinfo.pack_offset == 0 - - dpinfo = ODeltaPackInfo(0, blob_id, s, sha) - assert dpinfo.type == str_blob_type - assert dpinfo.type_id == blob_id - assert dpinfo.delta_info == sha - assert dpinfo.pack_offset == 0 - - - # test ostream - stream = DummyStream() - ostream = OStream(*(info + (stream, ))) - assert ostream.stream is stream - ostream.read(15) - stream._assert() - assert stream.bytes == 15 - ostream.read(20) - assert stream.bytes == 20 - - # test packstream - postream = OPackStream(*(pinfo + (stream, ))) - assert postream.stream is stream - postream.read(10) - stream._assert() - assert stream.bytes == 10 - - # test deltapackstream - dpostream = ODeltaPackStream(*(dpinfo + (stream, ))) - dpostream.stream is stream - dpostream.read(5) - stream._assert() - assert stream.bytes == 5 - - # derive with own args - DeriveTest(sha, str_blob_type, s, stream, 'mine',myarg = 3)._assert() - - # test istream - istream = IStream(str_blob_type, s, stream) - assert istream.binsha == None - istream.binsha = sha - assert istream.binsha == sha - - assert len(istream.binsha) == 20 - assert len(istream.hexsha) == 40 - - assert istream.size == s - istream.size = s * 2 - istream.size == s * 2 - assert istream.type == str_blob_type - istream.type = "something" - assert istream.type == "something" - assert istream.stream is stream - istream.stream = None - assert istream.stream is None - - assert istream.error is None - istream.error = Exception() - assert isinstance(istream.error, Exception) diff --git a/gitdb/test/test_config.py b/gitdb/test/test_config.py index 205f3c2..8b13789 100644 --- a/gitdb/test/test_config.py +++ b/gitdb/test/test_config.py @@ -1,102 +1 @@ -# test_config.py -# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors -# -# This module is part of GitPython and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php -from gitdb.test.lib import * -from gitdb.config import * -import StringIO -from copy import copy -from ConfigParser import NoSectionError - -class TestBase(TestBase): - - def _to_memcache(self, file_path): - fp = open(file_path, "r") - sio = StringIO.StringIO(fp.read()) - sio.name = file_path - return sio - - def _parsers_equal_or_raise(self, lhs, rhs): - pass - - def test_read_write(self): - # writer must create the exact same file as the one read before - for filename in ("git_config", "git_config_global"): - file_obj = self._to_memcache(fixture_path(filename)) - file_obj_orig = copy(file_obj) - w_config = GitConfigParser(file_obj, read_only = False) - w_config.read() # enforce reading - assert w_config._sections - w_config.write() # enforce writing - assert file_obj.getvalue() == file_obj_orig.getvalue() - - # creating an additional config writer must fail due to exclusive access - self.failUnlessRaises(IOError, GitConfigParser, file_obj, read_only = False) - - # should still have a lock and be able to make changes - assert w_config._lock._has_lock() - - # changes should be written right away - sname = "my_section" - oname = "mykey" - val = "myvalue" - w_config.add_section(sname) - assert w_config.has_section(sname) - w_config.set(sname, oname, val) - assert w_config.has_option(sname,oname) - assert w_config.get(sname, oname) == val - - sname_new = "new_section" - oname_new = "new_key" - ival = 10 - w_config.set_value(sname_new, oname_new, ival) - assert w_config.get_value(sname_new, oname_new) == ival - - file_obj.seek(0) - r_config = GitConfigParser(file_obj, read_only=True) - assert r_config.has_section(sname) - assert r_config.has_option(sname, oname) - assert r_config.get(sname, oname) == val - - # END for each filename - - def test_base(self): - path_repo = fixture_path("git_config") - path_global = fixture_path("git_config_global") - r_config = GitConfigParser([path_repo, path_global], read_only=True) - assert r_config.read_only - num_sections = 0 - num_options = 0 - - # test reader methods - assert r_config._is_initialized == False - for section in r_config.sections(): - num_sections += 1 - for option in r_config.options(section): - num_options += 1 - val = r_config.get(section, option) - val_typed = r_config.get_value(section, option) - assert isinstance(val_typed, (bool, long, float, basestring)) - assert val - assert "\n" not in option - assert "\n" not in val - - # writing must fail - self.failUnlessRaises(IOError, r_config.set, section, option, None) - self.failUnlessRaises(IOError, r_config.remove_option, section, option ) - # END for each option - self.failUnlessRaises(IOError, r_config.remove_section, section) - # END for each section - assert num_sections and num_options - assert r_config._is_initialized == True - - # get value which doesnt exist, with default - default = "my default value" - assert r_config.get_value("doesnt", "exist", default) == default - - # it raises if there is no default though - self.failUnlessRaises(NoSectionError, r_config.get_value, "doesnt", "exist") - - diff --git a/gitdb/test/test_example.py b/gitdb/test/test_example.py index c2e7840..8b13789 100644 --- a/gitdb/test/test_example.py +++ b/gitdb/test/test_example.py @@ -1,64 +1 @@ -# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors -# -# This module is part of GitDB and is released under -# the New BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Module with examples from the tutorial section of the docs""" -from lib import * -from gitdb import IStream -from gitdb.db.py import PureLooseObjectODB -from gitdb.util import pool - -from cStringIO import StringIO -from async import IteratorReader - -class TestExamples(TestBase): - - def test_base(self): - ldb = PureLooseObjectODB(fixture_path("../../../.git/objects")) - - for sha1 in ldb.sha_iter(): - oinfo = ldb.info(sha1) - ostream = ldb.stream(sha1) - assert oinfo[:3] == ostream[:3] - - assert len(ostream.read()) == ostream.size - assert ldb.has_object(oinfo.binsha) - # END for each sha in database - # assure we close all files - try: - del(ostream) - del(oinfo) - except UnboundLocalError: - pass - # END ignore exception if there are no loose objects - - data = "my data" - istream = IStream("blob", len(data), StringIO(data)) - - # the object does not yet have a sha - assert istream.binsha is None - ldb.store(istream) - # now the sha is set - assert len(istream.binsha) == 20 - assert ldb.has_object(istream.binsha) - - - # async operation - # Create a reader from an iterator - reader = IteratorReader(ldb.sha_iter()) - - # get reader for object streams - info_reader = ldb.stream_async(reader) - - # read one - info = info_reader.read(1)[0] - - # read all the rest until depletion - ostreams = info_reader.read() - - # set the pool to use two threads - pool.set_size(2) - - # synchronize the mode of operation - pool.set_size(0) diff --git a/gitdb/test/test_pack.py b/gitdb/test/test_pack.py index 4a7f1ca..8b13789 100644 --- a/gitdb/test/test_pack.py +++ b/gitdb/test/test_pack.py @@ -1,247 +1 @@ -# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors -# -# This module is part of GitDB and is released under -# the New BSD License: http://www.opensource.org/licenses/bsd-license.php -"""Test everything about packs reading and writing""" -from lib import ( - TestBase, - with_rw_directory, - with_packs_rw, - fixture_path - ) -from gitdb.stream import DeltaApplyReader -from gitdb.pack import ( - PackEntity, - PackIndexFile, - PackFile - ) - -from gitdb.base import ( - OInfo, - OStream, - ) - -from gitdb.fun import delta_types -from gitdb.exc import UnsupportedOperation -from gitdb.util import to_bin_sha -from itertools import izip, chain -from nose import SkipTest - -import os -import sys -import tempfile - - -#{ Utilities -def bin_sha_from_filename(filename): - return to_bin_sha(os.path.splitext(os.path.basename(filename))[0][5:]) -#} END utilities - -class TestPack(TestBase): - - packindexfile_v1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.idx'), 1, 67) - packindexfile_v2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.idx'), 2, 30) - packindexfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.idx'), 2, 42) - packfile_v2_1 = (fixture_path('packs/pack-c0438c19fb16422b6bbcce24387b3264416d485b.pack'), 2, packindexfile_v1[2]) - packfile_v2_2 = (fixture_path('packs/pack-11fdfa9e156ab73caae3b6da867192221f2089c2.pack'), 2, packindexfile_v2[2]) - packfile_v2_3_ascii = (fixture_path('packs/pack-a2bf8e71d8c18879e499335762dd95119d93d9f1.pack'), 2, packindexfile_v2_3_ascii[2]) - - - def _assert_index_file(self, index, version, size): - assert index.packfile_checksum() != index.indexfile_checksum() - assert len(index.packfile_checksum()) == 20 - assert len(index.indexfile_checksum()) == 20 - assert index.version() == version - assert index.size() == size - assert len(index.offsets()) == size - - # get all data of all objects - for oidx in xrange(index.size()): - sha = index.sha(oidx) - assert oidx == index.sha_to_index(sha) - - entry = index.entry(oidx) - assert len(entry) == 3 - - assert entry[0] == index.offset(oidx) - assert entry[1] == sha - assert entry[2] == index.crc(oidx) - - # verify partial sha - for l in (4,8,11,17,20): - assert index.partial_sha_to_index(sha[:l], l*2) == oidx - - # END for each object index in indexfile - self.failUnlessRaises(ValueError, index.partial_sha_to_index, "\0", 2) - - - def _assert_pack_file(self, pack, version, size): - assert pack.version() == 2 - assert pack.size() == size - assert len(pack.checksum()) == 20 - - num_obj = 0 - for obj in pack.stream_iter(): - num_obj += 1 - info = pack.info(obj.pack_offset) - stream = pack.stream(obj.pack_offset) - - assert info.pack_offset == stream.pack_offset - assert info.type_id == stream.type_id - assert hasattr(stream, 'read') - - # it should be possible to read from both streams - assert obj.read() == stream.read() - - streams = pack.collect_streams(obj.pack_offset) - assert streams - - # read the stream - try: - dstream = DeltaApplyReader.new(streams) - except ValueError: - # ignore these, old git versions use only ref deltas, - # which we havent resolved ( as we are without an index ) - # Also ignore non-delta streams - continue - # END get deltastream - - # read all - data = dstream.read() - assert len(data) == dstream.size - - # test seek - dstream.seek(0) - assert dstream.read() == data - - - # read chunks - # NOTE: the current implementation is safe, it basically transfers - # all calls to the underlying memory map - - # END for each object - assert num_obj == size - - - def test_pack_index(self): - # check version 1 and 2 - for indexfile, version, size in (self.packindexfile_v1, self.packindexfile_v2): - index = PackIndexFile(indexfile) - self._assert_index_file(index, version, size) - # END run tests - - def test_pack(self): - # there is this special version 3, but apparently its like 2 ... - for packfile, version, size in (self.packfile_v2_3_ascii, self.packfile_v2_1, self.packfile_v2_2): - pack = PackFile(packfile) - self._assert_pack_file(pack, version, size) - # END for each pack to test - - @with_rw_directory - def test_pack_entity(self, rw_dir): - pack_objs = list() - for packinfo, indexinfo in ( (self.packfile_v2_1, self.packindexfile_v1), - (self.packfile_v2_2, self.packindexfile_v2), - (self.packfile_v2_3_ascii, self.packindexfile_v2_3_ascii)): - packfile, version, size = packinfo - indexfile, version, size = indexinfo - entity = PackEntity(packfile) - assert entity.pack().path() == packfile - assert entity.index().path() == indexfile - pack_objs.extend(entity.stream_iter()) - - count = 0 - for info, stream in izip(entity.info_iter(), entity.stream_iter()): - count += 1 - assert info.binsha == stream.binsha - assert len(info.binsha) == 20 - assert info.type_id == stream.type_id - assert info.size == stream.size - - # we return fully resolved items, which is implied by the sha centric access - assert not info.type_id in delta_types - - # try all calls - assert len(entity.collect_streams(info.binsha)) - oinfo = entity.info(info.binsha) - assert isinstance(oinfo, OInfo) - assert oinfo.binsha is not None - ostream = entity.stream(info.binsha) - assert isinstance(ostream, OStream) - assert ostream.binsha is not None - - # verify the stream - try: - assert entity.is_valid_stream(info.binsha, use_crc=True) - except UnsupportedOperation: - pass - # END ignore version issues - assert entity.is_valid_stream(info.binsha, use_crc=False) - # END for each info, stream tuple - assert count == size - - # END for each entity - - # pack writing - write all packs into one - # index path can be None - pack_path = tempfile.mktemp('', "pack", rw_dir) - index_path = tempfile.mktemp('', 'index', rw_dir) - iteration = 0 - def rewind_streams(): - for obj in pack_objs: - obj.stream.seek(0) - #END utility - for ppath, ipath, num_obj in zip((pack_path, )*2, (index_path, None), (len(pack_objs), None)): - pfile = open(ppath, 'wb') - iwrite = None - if ipath: - ifile = open(ipath, 'wb') - iwrite = ifile.write - #END handle ip - - # make sure we rewind the streams ... we work on the same objects over and over again - if iteration > 0: - rewind_streams() - #END rewind streams - iteration += 1 - - pack_sha, index_sha = PackEntity.write_pack(pack_objs, pfile.write, iwrite, object_count=num_obj) - pfile.close() - assert os.path.getsize(ppath) > 100 - - # verify pack - pf = PackFile(ppath) - assert pf.size() == len(pack_objs) - assert pf.version() == PackFile.pack_version_default - assert pf.checksum() == pack_sha - - # verify index - if ipath is not None: - ifile.close() - assert os.path.getsize(ipath) > 100 - idx = PackIndexFile(ipath) - assert idx.version() == PackIndexFile.index_version_default - assert idx.packfile_checksum() == pack_sha - assert idx.indexfile_checksum() == index_sha - assert idx.size() == len(pack_objs) - #END verify files exist - #END for each packpath, indexpath pair - - # verify the packs throughly - rewind_streams() - entity = PackEntity.create(pack_objs, rw_dir) - count = 0 - for info in entity.info_iter(): - count += 1 - for use_crc in range(2): - assert entity.is_valid_stream(info.binsha, use_crc) - # END for each crc mode - #END for each info - assert count == len(pack_objs) - - - def test_pack_64(self): - # TODO: hex-edit a pack helping us to verify that we can handle 64 byte offsets - # of course without really needing such a huge pack - raise SkipTest() diff --git a/gitdb/test/test_refs.py b/gitdb/test/test_refs.py index 2c571b7..649542f 100644 --- a/gitdb/test/test_refs.py +++ b/gitdb/test/test_refs.py @@ -158,26 +158,25 @@ class TestRefs(TestBase): cur_head = HEAD(rw_repo) old_head_commit = cur_head.commit new_head_commit = cur_head.ref.commit.parents[0] - if False: #TODO get reset checking back into the game - cur_head.reset(new_head_commit, index=True) # index only - assert cur_head.reference.commit == new_head_commit - - self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True) - new_head_commit = new_head_commit.parents[0] - cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt - assert cur_head.reference.commit == new_head_commit - - # paths - make sure we have something to do - rw_repo.index.reset(old_head_commit.parents[0]) - cur_head.reset(cur_head, paths = "test") - cur_head.reset(new_head_commit, paths = "lib") - # hard resets with paths don't work, its all or nothing - self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib") - - # we can do a mixed reset, and then checkout from the index though - cur_head.reset(new_head_commit) - rw_repo.index.checkout(["lib"], force=True)# - #END ignore block + + cur_head.reset(new_head_commit, index=True) # index only + assert cur_head.reference.commit == new_head_commit + + self.failUnlessRaises(ValueError, cur_head.reset, new_head_commit, index=False, working_tree=True) + new_head_commit = new_head_commit.parents[0] + cur_head.reset(new_head_commit, index=True, working_tree=True) # index + wt + assert cur_head.reference.commit == new_head_commit + + # paths - make sure we have something to do + rw_repo.index.reset(old_head_commit.parents[0]) + cur_head.reset(cur_head, paths = "test") + cur_head.reset(new_head_commit, paths = "lib") + # hard resets with paths don't work, its all or nothing + self.failUnlessRaises(GitCommandError, cur_head.reset, new_head_commit, working_tree=True, paths = "lib") + + # we can do a mixed reset, and then checkout from the index though + cur_head.reset(new_head_commit) + rw_repo.index.checkout(["lib"], force=True)# # now that we have a write write repo, change the HEAD reference - its # like git-reset --soft diff --git a/gitdb/test/test_util.py b/gitdb/test/test_util.py index 0847ab5..53e2569 100644 --- a/gitdb/test/test_util.py +++ b/gitdb/test/test_util.py @@ -3,131 +3,8 @@ # This module is part of GitDB and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Test for object db""" -import tempfile -import os -from lib import TestBase -from gitdb.util import ( - to_hex_sha, - to_bin_sha, - NULL_HEX_SHA, - LockedFD, - Actor - ) class TestUtils(TestBase): - def test_basics(self): - assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA - assert len(to_bin_sha(NULL_HEX_SHA)) == 20 - assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA - - def _cmp_contents(self, file_path, data): - # raise if data from file at file_path - # does not match data string - fp = open(file_path, "rb") - try: - assert fp.read() == data - finally: - fp.close() - - def test_lockedfd(self): - my_file = tempfile.mktemp() - orig_data = "hello" - new_data = "world" - my_file_fp = open(my_file, "wb") - my_file_fp.write(orig_data) - my_file_fp.close() - - try: - lfd = LockedFD(my_file) - lockfilepath = lfd._lockfilepath() - - # cannot end before it was started - self.failUnlessRaises(AssertionError, lfd.rollback) - self.failUnlessRaises(AssertionError, lfd.commit) - - # open for writing - assert not os.path.isfile(lockfilepath) - wfd = lfd.open(write=True) - assert lfd._fd is wfd - assert os.path.isfile(lockfilepath) - - # write data and fail - os.write(wfd, new_data) - lfd.rollback() - assert lfd._fd is None - self._cmp_contents(my_file, orig_data) - assert not os.path.isfile(lockfilepath) - - # additional call doesnt fail - lfd.commit() - lfd.rollback() - - # test reading - lfd = LockedFD(my_file) - rfd = lfd.open(write=False) - assert os.read(rfd, len(orig_data)) == orig_data - - assert os.path.isfile(lockfilepath) - # deletion rolls back - del(lfd) - assert not os.path.isfile(lockfilepath) - - - # write data - concurrently - lfd = LockedFD(my_file) - olfd = LockedFD(my_file) - assert not os.path.isfile(lockfilepath) - wfdstream = lfd.open(write=True, stream=True) # this time as stream - assert os.path.isfile(lockfilepath) - # another one fails - self.failUnlessRaises(IOError, olfd.open) - - wfdstream.write(new_data) - lfd.commit() - assert not os.path.isfile(lockfilepath) - self._cmp_contents(my_file, new_data) - - # could test automatic _end_writing on destruction - finally: - os.remove(my_file) - # END final cleanup - - # try non-existing file for reading - lfd = LockedFD(tempfile.mktemp()) - try: - lfd.open(write=False) - except OSError: - assert not os.path.exists(lfd._lockfilepath()) - else: - self.fail("expected OSError") - # END handle exceptions - - -class TestActor(TestBase): - def test_from_string_should_separate_name_and_email(self): - a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert "Michael Trier" == a.name - assert "mtrier@example.com" == a.email - - # base type capabilities - assert a == a - assert not ( a != a ) - m = set() - m.add(a) - m.add(a) - assert len(m) == 1 - - def test_from_string_should_handle_just_name(self): - a = Actor._from_string("Michael Trier") - assert "Michael Trier" == a.name - assert None == a.email - - def test_should_display_representation(self): - a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert '<git.Actor "Michael Trier <mtrier@example.com>">' == repr(a) - - def test_str_should_alias_name(self): - a = Actor._from_string("Michael Trier <mtrier@example.com>") - assert a.name == str(a) + |