summaryrefslogtreecommitdiff
path: root/git/test/lib/base.py
diff options
context:
space:
mode:
authorSebastian Thiel <byronimo@gmail.com>2014-02-09 21:23:51 +0100
committerSebastian Thiel <byronimo@gmail.com>2014-02-09 21:23:51 +0100
commit15dd52cd578691930cea194e003fa80dd02f40eb (patch)
treeef4c9c5f705dd1ca743b7ceefe5b91b11ad15010 /git/test/lib/base.py
parent660bdca125aa9dcca7a7730535bec433edb8ba02 (diff)
downloadgitpython-15dd52cd578691930cea194e003fa80dd02f40eb.tar.gz
tabs to 4 spaces - overall state of this branch is desolate, but fixable. Needs plenty of work
Diffstat (limited to 'git/test/lib/base.py')
-rw-r--r--git/test/lib/base.py272
1 files changed, 136 insertions, 136 deletions
diff --git a/git/test/lib/base.py b/git/test/lib/base.py
index bc160783..298e8e05 100644
--- a/git/test/lib/base.py
+++ b/git/test/lib/base.py
@@ -5,14 +5,14 @@
"""Utilities used in ODB testing"""
from git.base import OStream
from git.stream import (
- Sha1Writer,
- ZippedStoreShaWriter
- )
+ Sha1Writer,
+ ZippedStoreShaWriter
+ )
from git.util import (
- zlib,
- dirname
- )
+ zlib,
+ dirname
+ )
import sys
import random
@@ -30,171 +30,171 @@ import gc
#{ Decorators
def with_rw_directory(func):
- """Create a temporary directory which can be written to, remove it if the
- test suceeds, but leave it otherwise to aid additional debugging"""
- def wrapper(self):
- path = maketemp(prefix=func.__name__)
- os.mkdir(path)
- keep = False
- try:
- try:
- return func(self, path)
- except Exception:
- print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
- keep = True
- raise
- finally:
- # Need to collect here to be sure all handles have been closed. It appears
- # a windows-only issue. In fact things should be deleted, as well as
- # memory maps closed, once objects go out of scope. For some reason
- # though this is not the case here unless we collect explicitly.
- if not keep:
- gc.collect()
- shutil.rmtree(path)
- # END handle exception
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return wrapper
+ """Create a temporary directory which can be written to, remove it if the
+ test suceeds, but leave it otherwise to aid additional debugging"""
+ def wrapper(self):
+ path = maketemp(prefix=func.__name__)
+ os.mkdir(path)
+ keep = False
+ try:
+ try:
+ return func(self, path)
+ except Exception:
+ print >> sys.stderr, "Test %s.%s failed, output is at %r" % (type(self).__name__, func.__name__, path)
+ keep = True
+ raise
+ finally:
+ # Need to collect here to be sure all handles have been closed. It appears
+ # a windows-only issue. In fact things should be deleted, as well as
+ # memory maps closed, once objects go out of scope. For some reason
+ # though this is not the case here unless we collect explicitly.
+ if not keep:
+ gc.collect()
+ shutil.rmtree(path)
+ # END handle exception
+ # END wrapper
+
+ wrapper.__name__ = func.__name__
+ return wrapper
def with_rw_repo(func):
- """Create a copy of our repository and put it into a writable location. It will
- be removed if the test doesn't result in an error.
- As we can currently only copy the fully working tree, tests must not rely on
- being on a certain branch or on anything really except for the default tags
- that should exist
- Wrapped function obtains a git repository """
- def wrapper(self, path):
- src_dir = dirname(dirname(dirname(__file__)))
- assert(os.path.isdir(path))
- os.rmdir(path) # created by wrapper, but must not exist for copy operation
- shutil.copytree(src_dir, path)
- target_gitdir = os.path.join(path, '.git')
- assert os.path.isdir(target_gitdir)
- return func(self, self.RepoCls(target_gitdir))
- #END wrapper
- wrapper.__name__ = func.__name__
- return with_rw_directory(wrapper)
-
+ """Create a copy of our repository and put it into a writable location. It will
+ be removed if the test doesn't result in an error.
+ As we can currently only copy the fully working tree, tests must not rely on
+ being on a certain branch or on anything really except for the default tags
+ that should exist
+ Wrapped function obtains a git repository """
+ def wrapper(self, path):
+ src_dir = dirname(dirname(dirname(__file__)))
+ assert(os.path.isdir(path))
+ os.rmdir(path) # created by wrapper, but must not exist for copy operation
+ shutil.copytree(src_dir, path)
+ target_gitdir = os.path.join(path, '.git')
+ assert os.path.isdir(target_gitdir)
+ return func(self, self.RepoCls(target_gitdir))
+ #END wrapper
+ wrapper.__name__ = func.__name__
+ return with_rw_directory(wrapper)
+
def with_packs_rw(func):
- """Function that provides a path into which the packs for testing should be
- copied. Will pass on the path to the actual function afterwards
-
- :note: needs with_rw_directory wrapped around it"""
- def wrapper(self, path):
- src_pack_glob = fixture_path('packs/*')
- print src_pack_glob
- copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
- return func(self, path)
- # END wrapper
-
- wrapper.__name__ = func.__name__
- return with_rw_directory(wrapper)
+ """Function that provides a path into which the packs for testing should be
+ copied. Will pass on the path to the actual function afterwards
+
+ :note: needs with_rw_directory wrapped around it"""
+ def wrapper(self, path):
+ src_pack_glob = fixture_path('packs/*')
+ print src_pack_glob
+ copy_files_globbed(src_pack_glob, path, hard_link_ok=True)
+ return func(self, path)
+ # END wrapper
+
+ wrapper.__name__ = func.__name__
+ return with_rw_directory(wrapper)
#} END decorators
#{ Routines
def rorepo_dir():
- """:return: path to our own repository, being our own .git directory.
- :note: doesn't work in bare repositories"""
- base = os.path.join(dirname(dirname(dirname(dirname(__file__)))), '.git')
- assert os.path.isdir(base)
- return base
+ """:return: path to our own repository, being our own .git directory.
+ :note: doesn't work in bare repositories"""
+ base = os.path.join(dirname(dirname(dirname(dirname(__file__)))), '.git')
+ assert os.path.isdir(base)
+ return base
def maketemp(*args, **kwargs):
- """Wrapper around default tempfile.mktemp to fix an osx issue"""
- tdir = tempfile.mktemp(*args, **kwargs)
- if sys.platform == 'darwin':
- tdir = '/private' + tdir
- return tdir
+ """Wrapper around default tempfile.mktemp to fix an osx issue"""
+ tdir = tempfile.mktemp(*args, **kwargs)
+ if sys.platform == 'darwin':
+ tdir = '/private' + tdir
+ return tdir
def fixture_path(relapath=''):
- """:return: absolute path into the fixture directory
- :param relapath: relative path into the fixtures directory, or ''
- to obtain the fixture directory itself"""
- test_dir = os.path.dirname(os.path.dirname(__file__))
- return os.path.join(test_dir, "fixtures", relapath)
-
+ """:return: absolute path into the fixture directory
+ :param relapath: relative path into the fixtures directory, or ''
+ to obtain the fixture directory itself"""
+ test_dir = os.path.dirname(os.path.dirname(__file__))
+ return os.path.join(test_dir, "fixtures", relapath)
+
def fixture(name):
- return open(fixture_path(name), 'rb').read()
+ return open(fixture_path(name), 'rb').read()
def absolute_project_path():
- return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
+ return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
def copy_files_globbed(source_glob, target_dir, hard_link_ok=False):
- """Copy all files found according to the given source glob into the target directory
- :param hard_link_ok: if True, hard links will be created if possible. Otherwise
- the files will be copied"""
- for src_file in glob.glob(source_glob):
- if hard_link_ok and hasattr(os, 'link'):
- target = os.path.join(target_dir, os.path.basename(src_file))
- try:
- os.link(src_file, target)
- except OSError:
- shutil.copy(src_file, target_dir)
- # END handle cross device links ( and resulting failure )
- else:
- shutil.copy(src_file, target_dir)
- # END try hard link
- # END for each file to copy
-
+ """Copy all files found according to the given source glob into the target directory
+ :param hard_link_ok: if True, hard links will be created if possible. Otherwise
+ the files will be copied"""
+ for src_file in glob.glob(source_glob):
+ if hard_link_ok and hasattr(os, 'link'):
+ target = os.path.join(target_dir, os.path.basename(src_file))
+ try:
+ os.link(src_file, target)
+ except OSError:
+ shutil.copy(src_file, target_dir)
+ # END handle cross device links ( and resulting failure )
+ else:
+ shutil.copy(src_file, target_dir)
+ # END try hard link
+ # END for each file to copy
+
def make_bytes(size_in_bytes, randomize=False):
- """:return: string with given size in bytes
- :param randomize: try to produce a very random stream"""
- actual_size = size_in_bytes / 4
- producer = xrange(actual_size)
- if randomize:
- producer = list(producer)
- random.shuffle(producer)
- # END randomize
- a = array('i', producer)
- return a.tostring()
+ """:return: string with given size in bytes
+ :param randomize: try to produce a very random stream"""
+ actual_size = size_in_bytes / 4
+ producer = xrange(actual_size)
+ if randomize:
+ producer = list(producer)
+ random.shuffle(producer)
+ # END randomize
+ a = array('i', producer)
+ return a.tostring()
def make_object(type, data):
- """:return: bytes resembling an uncompressed object"""
- odata = "blob %i\0" % len(data)
- return odata + data
-
+ """:return: bytes resembling an uncompressed object"""
+ odata = "blob %i\0" % len(data)
+ return odata + data
+
def make_memory_file(size_in_bytes, randomize=False):
- """:return: tuple(size_of_stream, stream)
- :param randomize: try to produce a very random stream"""
- d = make_bytes(size_in_bytes, randomize)
- return len(d), StringIO(d)
+ """:return: tuple(size_of_stream, stream)
+ :param randomize: try to produce a very random stream"""
+ d = make_bytes(size_in_bytes, randomize)
+ return len(d), StringIO(d)
#} END routines
#{ Stream Utilities
class DummyStream(object):
- def __init__(self):
- self.was_read = False
- self.bytes = 0
- self.closed = False
-
- def read(self, size):
- self.was_read = True
- self.bytes = size
-
- def close(self):
- self.closed = True
-
- def _assert(self):
- assert self.was_read
+ def __init__(self):
+ self.was_read = False
+ self.bytes = 0
+ self.closed = False
+
+ def read(self, size):
+ self.was_read = True
+ self.bytes = size
+
+ def close(self):
+ self.closed = True
+
+ def _assert(self):
+ assert self.was_read
class DeriveTest(OStream):
- def __init__(self, sha, type, size, stream, *args, **kwargs):
- self.myarg = kwargs.pop('myarg')
- self.args = args
-
- def _assert(self):
- assert self.args
- assert self.myarg
+ def __init__(self, sha, type, size, stream, *args, **kwargs):
+ self.myarg = kwargs.pop('myarg')
+ self.args = args
+
+ def _assert(self):
+ assert self.args
+ assert self.myarg
#} END stream utilitiess