diff options
author | Sebastian Thiel <byronimo@gmail.com> | 2014-02-09 21:23:51 +0100 |
---|---|---|
committer | Sebastian Thiel <byronimo@gmail.com> | 2014-02-09 21:23:51 +0100 |
commit | 15dd52cd578691930cea194e003fa80dd02f40eb (patch) | |
tree | ef4c9c5f705dd1ca743b7ceefe5b91b11ad15010 /git/test/lib/base.py | |
parent | 660bdca125aa9dcca7a7730535bec433edb8ba02 (diff) | |
download | gitpython-15dd52cd578691930cea194e003fa80dd02f40eb.tar.gz |
tabs to 4 spaces - overall state of this branch is desolate, but fixable. Needs plenty of work
Diffstat (limited to 'git/test/lib/base.py')
-rw-r--r-- | git/test/lib/base.py | 272 |
1 files changed, 136 insertions, 136 deletions
diff --git a/git/test/lib/base.py b/git/test/lib/base.py index bc160783..298e8e05 100644 --- a/git/test/lib/base.py +++ b/git/test/lib/base.py @@ -5,14 +5,14 @@ """Utilities used in ODB testing""" from git.base import OStream from git.stream import ( - Sha1Writer, - ZippedStoreShaWriter - ) + Sha1Writer, + ZippedStoreShaWriter + ) from git.util import ( - zlib, - dirname - ) + zlib, + dirname + ) import sys import random @@ -30,171 +30,171 @@ import gc #{ Decorators def with_rw_directory(func): - """Create a temporary directory which can be written to, remove it if the - test suceeds, but leave it otherwise to aid additional debugging""" - def wrapper(self): - path = maketemp(prefix=func.__name__) - os.mkdir(path) - keep = False - try: - try: - return func(self, path) - except Exception: - print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path) - keep = True - raise - finally: - # Need to collect here to be sure all handles have been closed. It appears - # a windows-only issue. In fact things should be deleted, as well as - # memory maps closed, once objects go out of scope. For some reason - # though this is not the case here unless we collect explicitly. - if not keep: - gc.collect() - shutil.rmtree(path) - # END handle exception - # END wrapper - - wrapper.__name__ = func.__name__ - return wrapper + """Create a temporary directory which can be written to, remove it if the + test suceeds, but leave it otherwise to aid additional debugging""" + def wrapper(self): + path = maketemp(prefix=func.__name__) + os.mkdir(path) + keep = False + try: + try: + return func(self, path) + except Exception: + print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path) + keep = True + raise + finally: + # Need to collect here to be sure all handles have been closed. It appears + # a windows-only issue. In fact things should be deleted, as well as + # memory maps closed, once objects go out of scope. For some reason + # though this is not the case here unless we collect explicitly. + if not keep: + gc.collect() + shutil.rmtree(path) + # END handle exception + # END wrapper + + wrapper.__name__ = func.__name__ + return wrapper def with_rw_repo(func): - """Create a copy of our repository and put it into a writable location. It will - be removed if the test doesn't result in an error. - As we can currently only copy the fully working tree, tests must not rely on - being on a certain branch or on anything really except for the default tags - that should exist - Wrapped function obtains a git repository """ - def wrapper(self, path): - src_dir = dirname(dirname(dirname(__file__))) - assert(os.path.isdir(path)) - os.rmdir(path) # created by wrapper, but must not exist for copy operation - shutil.copytree(src_dir, path) - target_gitdir = os.path.join(path, '.git') - assert os.path.isdir(target_gitdir) - return func(self, self.RepoCls(target_gitdir)) - #END wrapper - wrapper.__name__ = func.__name__ - return with_rw_directory(wrapper) - + """Create a copy of our repository and put it into a writable location. It will + be removed if the test doesn't result in an error. + As we can currently only copy the fully working tree, tests must not rely on + being on a certain branch or on anything really except for the default tags + that should exist + Wrapped function obtains a git repository """ + def wrapper(self, path): + src_dir = dirname(dirname(dirname(__file__))) + assert(os.path.isdir(path)) + os.rmdir(path) # created by wrapper, but must not exist for copy operation + shutil.copytree(src_dir, path) + target_gitdir = os.path.join(path, '.git') + assert os.path.isdir(target_gitdir) + return func(self, self.RepoCls(target_gitdir)) + #END wrapper + wrapper.__name__ = func.__name__ + return with_rw_directory(wrapper) + def with_packs_rw(func): - """Function that provides a path into which the packs for testing should be - copied. Will pass on the path to the actual function afterwards - - :note: needs with_rw_directory wrapped around it""" - def wrapper(self, path): - src_pack_glob = fixture_path('packs/*') - print src_pack_glob - copy_files_globbed(src_pack_glob, path, hard_link_ok=True) - return func(self, path) - # END wrapper - - wrapper.__name__ = func.__name__ - return with_rw_directory(wrapper) + """Function that provides a path into which the packs for testing should be + copied. Will pass on the path to the actual function afterwards + + :note: needs with_rw_directory wrapped around it""" + def wrapper(self, path): + src_pack_glob = fixture_path('packs/*') + print src_pack_glob + copy_files_globbed(src_pack_glob, path, hard_link_ok=True) + return func(self, path) + # END wrapper + + wrapper.__name__ = func.__name__ + return with_rw_directory(wrapper) #} END decorators #{ Routines def rorepo_dir(): - """:return: path to our own repository, being our own .git directory. - :note: doesn't work in bare repositories""" - base = os.path.join(dirname(dirname(dirname(dirname(__file__)))), '.git') - assert os.path.isdir(base) - return base + """:return: path to our own repository, being our own .git directory. + :note: doesn't work in bare repositories""" + base = os.path.join(dirname(dirname(dirname(dirname(__file__)))), '.git') + assert os.path.isdir(base) + return base def maketemp(*args, **kwargs): - """Wrapper around default tempfile.mktemp to fix an osx issue""" - tdir = tempfile.mktemp(*args, **kwargs) - if sys.platform == 'darwin': - tdir = '/private' + tdir - return tdir + """Wrapper around default tempfile.mktemp to fix an osx issue""" + tdir = tempfile.mktemp(*args, **kwargs) + if sys.platform == 'darwin': + tdir = '/private' + tdir + return tdir def fixture_path(relapath=''): - """:return: absolute path into the fixture directory - :param relapath: relative path into the fixtures directory, or '' - to obtain the fixture directory itself""" - test_dir = os.path.dirname(os.path.dirname(__file__)) - return os.path.join(test_dir, "fixtures", relapath) - + """:return: absolute path into the fixture directory + :param relapath: relative path into the fixtures directory, or '' + to obtain the fixture directory itself""" + test_dir = os.path.dirname(os.path.dirname(__file__)) + return os.path.join(test_dir, "fixtures", relapath) + def fixture(name): - return open(fixture_path(name), 'rb').read() + return open(fixture_path(name), 'rb').read() def absolute_project_path(): - return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): - """Copy all files found according to the given source glob into the target directory - :param hard_link_ok: if True, hard links will be created if possible. Otherwise - the files will be copied""" - for src_file in glob.glob(source_glob): - if hard_link_ok and hasattr(os, 'link'): - target = os.path.join(target_dir, os.path.basename(src_file)) - try: - os.link(src_file, target) - except OSError: - shutil.copy(src_file, target_dir) - # END handle cross device links ( and resulting failure ) - else: - shutil.copy(src_file, target_dir) - # END try hard link - # END for each file to copy - + """Copy all files found according to the given source glob into the target directory + :param hard_link_ok: if True, hard links will be created if possible. Otherwise + the files will be copied""" + for src_file in glob.glob(source_glob): + if hard_link_ok and hasattr(os, 'link'): + target = os.path.join(target_dir, os.path.basename(src_file)) + try: + os.link(src_file, target) + except OSError: + shutil.copy(src_file, target_dir) + # END handle cross device links ( and resulting failure ) + else: + shutil.copy(src_file, target_dir) + # END try hard link + # END for each file to copy + def make_bytes(size_in_bytes, randomize=False): - """:return: string with given size in bytes - :param randomize: try to produce a very random stream""" - actual_size = size_in_bytes / 4 - producer = xrange(actual_size) - if randomize: - producer = list(producer) - random.shuffle(producer) - # END randomize - a = array('i', producer) - return a.tostring() + """:return: string with given size in bytes + :param randomize: try to produce a very random stream""" + actual_size = size_in_bytes / 4 + producer = xrange(actual_size) + if randomize: + producer = list(producer) + random.shuffle(producer) + # END randomize + a = array('i', producer) + return a.tostring() def make_object(type, data): - """:return: bytes resembling an uncompressed object""" - odata = "blob %i\0" % len(data) - return odata + data - + """:return: bytes resembling an uncompressed object""" + odata = "blob %i\0" % len(data) + return odata + data + def make_memory_file(size_in_bytes, randomize=False): - """:return: tuple(size_of_stream, stream) - :param randomize: try to produce a very random stream""" - d = make_bytes(size_in_bytes, randomize) - return len(d), StringIO(d) + """:return: tuple(size_of_stream, stream) + :param randomize: try to produce a very random stream""" + d = make_bytes(size_in_bytes, randomize) + return len(d), StringIO(d) #} END routines #{ Stream Utilities class DummyStream(object): - def __init__(self): - self.was_read = False - self.bytes = 0 - self.closed = False - - def read(self, size): - self.was_read = True - self.bytes = size - - def close(self): - self.closed = True - - def _assert(self): - assert self.was_read + def __init__(self): + self.was_read = False + self.bytes = 0 + self.closed = False + + def read(self, size): + self.was_read = True + self.bytes = size + + def close(self): + self.closed = True + + def _assert(self): + assert self.was_read class DeriveTest(OStream): - def __init__(self, sha, type, size, stream, *args, **kwargs): - self.myarg = kwargs.pop('myarg') - self.args = args - - def _assert(self): - assert self.args - assert self.myarg + def __init__(self, sha, type, size, stream, *args, **kwargs): + self.myarg = kwargs.pop('myarg') + self.args = args + + def _assert(self): + assert self.args + assert self.myarg #} END stream utilitiess |